From c3a255ab7f267f36ba2b760948e8212539aef4ef Mon Sep 17 00:00:00 2001 From: Zelimir Fedoran Date: Wed, 27 Aug 2025 14:28:31 -0400 Subject: [PATCH 1/5] initial working tape merkle tree calc --- network/src/archive/pack.rs | 245 +++++++++++++++++++++--------------- network/src/store/consts.rs | 3 +- 2 files changed, 144 insertions(+), 104 deletions(-) diff --git a/network/src/archive/pack.rs b/network/src/archive/pack.rs index 60892711..96182061 100644 --- a/network/src/archive/pack.rs +++ b/network/src/archive/pack.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use anyhow::{anyhow, Result}; use solana_sdk::pubkey::Pubkey; -use brine_tree::{Leaf, Hash}; +use brine_tree::{Leaf, Hash, MerkleTree}; use tape_api::prelude::*; use tape_client::get_epoch_account; use solana_client::nonblocking::rpc_client::RpcClient; @@ -9,8 +9,6 @@ use solana_client::nonblocking::rpc_client::RpcClient; use crate::store::*; use super::queue::Rx; -const LAYER_NUMBER: u8 = 10; - /// Orchestrator Task C – CPU-heavy preprocessing (packx) pub async fn run(rpc: Arc, mut rx: Rx, miner: Pubkey, store: Arc) -> Result<()> { let epoch = get_epoch_account(&rpc).await?.0; @@ -39,6 +37,8 @@ pub async fn run(rpc: Arc, mut rx: Rx, miner: Pubkey, store: Arc Result> { let miner_address: [u8; 32] = miner_address.to_bytes(); let canonical_segment = padded_array::(segment); @@ -54,22 +54,72 @@ pub fn pack_segment(miner_address: &Pubkey, segment: &[u8], packing_difficulty: Ok(segment_bytes.to_vec()) } +/// Computes the Merkle root for the entire tape by using a cached canopy of sector roots. +pub fn get_tape_root( + store: &Arc, + tape_address: &Pubkey, +) -> Result { + // canopy height = number of levels above the sector layer up to the root + const CANOPY_HEIGHT: usize = SEGMENT_TREE_HEIGHT - SECTOR_TREE_HEIGHT; + + // All zero values for the full-height segment tree + let zeros_full = get_or_create_empty_hashes(store, tape_address)?; + if zeros_full.len() != SEGMENT_TREE_HEIGHT { + return Err(anyhow!( + "Invalid zero_values len: expected {}, got {}", + SEGMENT_TREE_HEIGHT, + zeros_full.len() + )); + } + + // Zero values for the canopy tree (start at the sector layer) + let canopy_zeros: [Hash; CANOPY_HEIGHT] = zeros_full + [SECTOR_TREE_HEIGHT .. SEGMENT_TREE_HEIGHT] + .try_into() + .map_err(|_| anyhow!( + "Invalid canopy zeros slice: expected {}, got {}", + CANOPY_HEIGHT, + zeros_full.len().saturating_sub(SECTOR_TREE_HEIGHT) + ))?; + + // Build canopy over sector roots + type CanopyTree = MerkleTree<{ SEGMENT_TREE_HEIGHT - SECTOR_TREE_HEIGHT }>; + let mut canopy = CanopyTree::from_zeros(canopy_zeros); + + // Load sector roots cached at the sector layer + let sector_roots = store.get_layer(tape_address, SECTOR_TREE_HEIGHT as u8)?; + for root_bytes in sector_roots.iter() { + let leaf = Leaf::from(*root_bytes); + canopy.try_add_leaf(leaf).expect("Failed to add sector root"); + } + + Ok(canopy.get_root()) +} + +/// Updates the Merkle subtree for a given segment number. +pub fn update_segment_root( + store: &Arc, + tape_address: &Pubkey, + segment_number: u64, +) -> Result<()> { + let sector_number = segment_number / SECTOR_LEAVES as u64; + update_sector_root(store, tape_address, sector_number) +} + /// Updates the Merkle subtree for a given sector number. -pub async fn update_sector_root( +pub fn update_sector_root( store: &Arc, tape_address: &Pubkey, sector_number: u64, ) -> Result<()> { - let empty_hashes = get_or_create_empty_hashes(store, tape_address).await?; + let empty_hashes = get_or_create_empty_hashes(store, tape_address)?; let empty_leaf = empty_hashes.first().unwrap().as_leaf(); let leaves = compute_sector_leaves(store, tape_address, sector_number, empty_leaf)?; - let root = compute_sector_root(&leaves, &empty_hashes, LAYER_NUMBER)?; - - update_layer(store, tape_address, LAYER_NUMBER, sector_number, root)?; + let root = compute_sector_root(&leaves, &empty_hashes)?; - Ok(()) + update_sector_canopy(store, tape_address, sector_number, root) } /// Computes leaves for a sector from the store, filling with empty_leaf as needed. @@ -98,7 +148,6 @@ pub fn compute_sector_leaves( pub fn compute_sector_root( leaves: &[Leaf], empty_hashes: &[Hash], - layer_number: u8, ) -> Result { let mut tree = SegmentTree::from_zeros( empty_hashes @@ -113,7 +162,7 @@ pub fn compute_sector_root( tree.next_index = leaves.len() as u64; - let layer_nodes = tree.get_layer_nodes(leaves, layer_number as usize); + let layer_nodes = tree.get_layer_nodes(leaves, SECTOR_TREE_HEIGHT); if layer_nodes.len() != 1 { return Err(anyhow!( "Invalid layer nodes length: expected 1, got {}", @@ -124,15 +173,15 @@ pub fn compute_sector_root( Ok(*layer_nodes.first().unwrap()) } -/// Updates the layer in the store with the new sector root. -pub fn update_layer( +/// Updates the layer in the store with the new sector root. +pub fn update_sector_canopy( store: &TapeStore, tape_address: &Pubkey, - layer_number: u8, sector_number: u64, root: Hash, ) -> Result<()> { - let mut layer = match store.get_layer(tape_address, layer_number) { + + let mut layer = match store.get_layer(tape_address, SECTOR_TREE_HEIGHT as u8) { Ok(layer) => layer, Err(_) => vec![[0u8; 32]; (sector_number + 1) as usize], }; @@ -142,18 +191,16 @@ pub fn update_layer( } layer[sector_number as usize] = root.to_bytes(); - store.put_layer(tape_address, layer_number, &layer)?; + store.put_layer(tape_address, SECTOR_TREE_HEIGHT as u8, &layer)?; + Ok(()) } /// Helper to create or init the zero values for a tape -/// Note: This only needs to be done once per tape, the zero values are the height of the tree and -/// are calculated from the tape's merkle seed -pub async fn get_or_create_empty_hashes( +pub fn get_or_create_empty_hashes( store: &Arc, tape_address: &Pubkey, ) -> Result> { - const H: usize = SEGMENT_TREE_HEIGHT; let empty_values = match store.get_zero_values(tape_address) { Ok(empty_values) => empty_values, @@ -172,8 +219,12 @@ pub async fn get_or_create_empty_hashes( } }; - if empty_values.len() != H { - return Err(anyhow!("Invalid number of zero values: expected {}, got {}", H, empty_values.len())); + if empty_values.len() != SEGMENT_TREE_HEIGHT { + return Err( + anyhow!("Invalid number of zero values: expected {}, got {}", + SEGMENT_TREE_HEIGHT, + empty_values.len() + )); } // Convert empty values to Hash type @@ -191,12 +242,12 @@ mod tests { use super::*; use solana_sdk::pubkey::Pubkey; use tempdir::TempDir; - use brine_tree::{Leaf, Hash}; + use brine_tree::{Leaf, get_cached_merkle_proof}; - fn setup_store() -> Result<(TapeStore, TempDir), StoreError> { + fn setup_store() -> Result<(Arc, TempDir), StoreError> { let temp_dir = TempDir::new("rocksdb_test").map_err(StoreError::IoError)?; let store = TapeStore::new(temp_dir.path())?; - Ok((store, temp_dir)) + Ok((Arc::new(store), temp_dir)) } fn create_segment_data(marker: u8, miner: &Pubkey) -> Vec { @@ -211,30 +262,24 @@ mod tests { solution.to_bytes().to_vec() } - fn create_empty_hashes(seed: &[u8]) -> Vec { - let tree = SegmentTree::new(&[seed]); - tree.zero_values.iter().copied().collect() - } - #[test] fn test_subtree_update() -> Result<()> { // Setup store and tape address let (store, _temp_dir) = setup_store()?; - let tape_address = Pubkey::new_unique(); + + // Setup our tape and miner let miner_address = Pubkey::new_unique(); + let tape_address = Pubkey::new_unique(); + let mut tape_tree = SegmentTree::new(&[tape_address.as_ref()]); + let mut leaves = vec![]; - // Create empty hashes and store them - let seed = b"test_seed"; - let empty_hashes = create_empty_hashes(seed); - let empty_hashes_bytes = empty_hashes.iter().map(|h| h.to_bytes()).collect::>(); - store.put_zero_values(&tape_address, &empty_hashes_bytes)?; - let empty_leaf = empty_hashes.first().unwrap().as_leaf(); + let empty_values = get_or_create_empty_hashes(&store, &tape_address)?; + let empty_leaf = empty_values[0].as_leaf(); + assert_eq!(empty_values.len(), SEGMENT_TREE_HEIGHT); + assert_eq!(empty_values, empty_values); - // Create a bunch of leaves (fill 2.5 sectors) + // Fill the tape with some segments (2.5 sectors worth) let leaf_count = (SECTOR_LEAVES as f64 * 2.5) as usize; - let mut tree = SegmentTree::new(&[seed]); - let mut leaves = vec![]; - for i in 0..leaf_count { let segment_id = i as u64; let segment_data = create_segment_data(1, &miner_address); @@ -243,71 +288,65 @@ mod tests { &segment_data ]); - // Add leaf to the tree - tree.try_add_leaf(leaf).expect("Failed to add leaf"); - - // Store the segment in a sector - let sector_number = segment_id / SECTOR_LEAVES as u64; - let local_seg_idx = (segment_id % SECTOR_LEAVES as u64) as usize; - let mut sector = store - .get_sector(&tape_address, sector_number) - .unwrap_or_else(|_| Sector::new()); - sector.set_segment(local_seg_idx, &segment_data); - store.put_sector(&tape_address, sector_number, §or)?; - - // Collect leaves so we can verify later + tape_tree.try_add_leaf(leaf).expect("Failed to add leaf"); + store.put_segment(&tape_address, segment_id, segment_data)?; leaves.push(leaf); } - // Verify sector count - assert_eq!(store.get_sector_count(&tape_address)?, 3); - - // Add a new leaf and test subtree update - let segment_number = leaves.len() as u64; - let sector_number = segment_number / SECTOR_LEAVES as u64; - let segment_data = create_segment_data(42, &miner_address); - let new_leaf = Leaf::new(&[ - &segment_number.to_le_bytes(), - &segment_data - ]); - - tree.try_add_leaf(new_leaf).expect("Failed to add new leaf"); - leaves.push(new_leaf); - - // Store the leaf so that compute_sector_leaves can find it - let mut sector = store - .get_sector(&tape_address, sector_number) - .unwrap_or_else(|_| Sector::new()); - let local_seg_idx = (segment_number % SECTOR_LEAVES as u64) as usize; - sector.set_segment(local_seg_idx, &segment_data); - store.put_sector(&tape_address, sector_number, §or)?; - - // Compute leaves for the sector - let sector_leaves = compute_sector_leaves(&store, &tape_address, sector_number, empty_leaf)?; - - assert_eq!(sector_leaves.len(), SECTOR_LEAVES, "Incorrect number of leaves"); - assert_eq!(sector_leaves[local_seg_idx], new_leaf); - - // Compute the sector root - let root = compute_sector_root(§or_leaves, &empty_hashes, 10)?; - - // Verify root using the existing tree - let layer_nodes = tree.get_layer_nodes(&leaves, 10); - - assert_eq!(layer_nodes.len(), 3); - assert_eq!(root, layer_nodes[sector_number as usize]); - - // Update the layer - update_layer(&store, &tape_address, 10, sector_number, root)?; - - // Verify the layer - let layer = store.get_layer(&tape_address, 10)?; - assert_eq!(layer.len(), sector_number as usize + 1); - - // The first two will be empty because we never called update_layer for them - assert_eq!(layer[0], [0u8; 32]); - assert_eq!(layer[1], [0u8; 32]); - assert_eq!(layer[sector_number as usize], root.to_bytes()); + let expected_vals = tape_tree.get_layer_nodes(&leaves, SECTOR_TREE_HEIGHT); + let expected_canopy = expected_vals + .iter() + .map(|h| h.to_bytes()) + .collect::>(); + + // Calculate sector roots and update the stored canopy values + update_sector_root(&store, &tape_address, 0)?; + update_sector_root(&store, &tape_address, 1)?; + update_sector_root(&store, &tape_address, 2)?; + + let actual_canopy = store.get_layer(&tape_address, SECTOR_TREE_HEIGHT as u8)?; + assert_eq!(expected_canopy.len(), actual_canopy.len()); + assert_eq!(expected_canopy, actual_canopy); + + // Lets try a Merkle proof for a segment + let segment_number : usize = 1234; + let sector_number = (segment_number as u64) / SECTOR_LEAVES as u64; + let sector = store.get_sector(&tape_address, sector_number)?; + + let expected_proof = tape_tree.get_proof(&leaves, segment_number); // <- Requires all leaves in memory (bad) + let actual_proof = get_cached_merkle_proof( // <- Only one sector in memory (good) + &tape_tree, + segment_number, + SECTOR_TREE_HEIGHT, + &expected_vals, + |i| { + let local_idx = i % SECTOR_LEAVES; + match sector.get_segment(local_idx) { // <- Look ma, no store access here! + Some(data) => Some( + Leaf::new(&[ + &(i as u64).to_le_bytes(), + &data + ])), + None => Some(empty_leaf), + } + + // Same as above, but with store access (slower) + // match store.get_segment(&tape_address, i as u64) { + // Ok(data) => Some( + // Leaf::new(&[ + // &(i as u64).to_le_bytes(), + // &data + // ])), + // Err(_) => Some(empty_leaf), + // } + } + ); + + assert_eq!(expected_proof, actual_proof); + + //Verify the tape root matches our computed root + let computed_root = get_tape_root(&store, &tape_address)?; + assert_eq!(computed_root, tape_tree.get_root()); Ok(()) } diff --git a/network/src/store/consts.rs b/network/src/store/consts.rs index fb9c1100..ef46750c 100644 --- a/network/src/store/consts.rs +++ b/network/src/store/consts.rs @@ -1,4 +1,5 @@ -pub const SECTOR_LEAVES: usize = 1 << 10; +pub const SECTOR_TREE_HEIGHT: usize = 10; +pub const SECTOR_LEAVES: usize = 1 << SECTOR_TREE_HEIGHT; // 1024 leaves pub const SECTOR_BITMAP_BYTES: usize = SECTOR_LEAVES / 8; pub const SECTOR_HEADER_BYTES: usize = SECTOR_BITMAP_BYTES + 32; From a8676cbf1697e74959143db6abc11d02589081ac Mon Sep 17 00:00:00 2001 From: Zelimir Fedoran Date: Wed, 27 Aug 2025 14:49:32 -0400 Subject: [PATCH 2/5] minor cleanup --- network/src/archive/pack.rs | 52 ++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/network/src/archive/pack.rs b/network/src/archive/pack.rs index 96182061..1448f90a 100644 --- a/network/src/archive/pack.rs +++ b/network/src/archive/pack.rs @@ -54,6 +54,32 @@ pub fn pack_segment(miner_address: &Pubkey, segment: &[u8], packing_difficulty: Ok(segment_bytes.to_vec()) } +/// Updates the Merkle subtree for a given segment number. +pub fn update_segment_root( + store: &Arc, + tape_address: &Pubkey, + segment_number: u64, +) -> Result<()> { + let sector_number = segment_number / SECTOR_LEAVES as u64; + update_sector_root(store, tape_address, sector_number) +} + +/// Updates the Merkle subtree for a given sector number. +pub fn update_sector_root( + store: &Arc, + tape_address: &Pubkey, + sector_number: u64, +) -> Result<()> { + + let empty_hashes = get_or_create_empty_hashes(store, tape_address)?; + let empty_leaf = empty_hashes.first().unwrap().as_leaf(); + + let leaves = compute_sector_leaves(store, tape_address, sector_number, empty_leaf)?; + let root = compute_sector_root(&leaves, &empty_hashes)?; + + update_sector_canopy(store, tape_address, sector_number, root) +} + /// Computes the Merkle root for the entire tape by using a cached canopy of sector roots. pub fn get_tape_root( store: &Arc, @@ -96,32 +122,6 @@ pub fn get_tape_root( Ok(canopy.get_root()) } -/// Updates the Merkle subtree for a given segment number. -pub fn update_segment_root( - store: &Arc, - tape_address: &Pubkey, - segment_number: u64, -) -> Result<()> { - let sector_number = segment_number / SECTOR_LEAVES as u64; - update_sector_root(store, tape_address, sector_number) -} - -/// Updates the Merkle subtree for a given sector number. -pub fn update_sector_root( - store: &Arc, - tape_address: &Pubkey, - sector_number: u64, -) -> Result<()> { - - let empty_hashes = get_or_create_empty_hashes(store, tape_address)?; - let empty_leaf = empty_hashes.first().unwrap().as_leaf(); - - let leaves = compute_sector_leaves(store, tape_address, sector_number, empty_leaf)?; - let root = compute_sector_root(&leaves, &empty_hashes)?; - - update_sector_canopy(store, tape_address, sector_number, root) -} - /// Computes leaves for a sector from the store, filling with empty_leaf as needed. pub fn compute_sector_leaves( store: &TapeStore, From c0e0390a2dc4f1613faeba3c1a25b2e2cce2ea99 Mon Sep 17 00:00:00 2001 From: Zelimir Fedoran Date: Wed, 27 Aug 2025 16:13:26 -0400 Subject: [PATCH 3/5] merkle cache column clean up --- network/src/archive/pack.rs | 43 +++-- network/src/store/column/merkle.rs | 254 ++++++++++++++++++++++++----- network/src/store/column/mod.rs | 2 +- 3 files changed, 250 insertions(+), 49 deletions(-) diff --git a/network/src/archive/pack.rs b/network/src/archive/pack.rs index 1448f90a..923fd166 100644 --- a/network/src/archive/pack.rs +++ b/network/src/archive/pack.rs @@ -26,8 +26,7 @@ pub async fn run(rpc: Arc, mut rx: Rx, miner: Pubkey, store: Arc Result<()> { - let mut layer = match store.get_layer(tape_address, SECTOR_TREE_HEIGHT as u8) { + let key = MerkleCacheKey::UnpackedTapeLayer { + address: *tape_address, + layer: SECTOR_TREE_HEIGHT as u8 + }; + + let mut layer = match store.get_merkle_cache(&key) { Ok(layer) => layer, Err(_) => vec![[0u8; 32]; (sector_number + 1) as usize], }; @@ -191,7 +201,7 @@ pub fn update_sector_canopy( } layer[sector_number as usize] = root.to_bytes(); - store.put_layer(tape_address, SECTOR_TREE_HEIGHT as u8, &layer)?; + store.put_merkle_cache(&key, &layer)?; Ok(()) } @@ -202,11 +212,15 @@ pub fn get_or_create_empty_hashes( tape_address: &Pubkey, ) -> Result> { - let empty_values = match store.get_zero_values(tape_address) { + let empty_values = match store.get_merkle_cache( + &MerkleCacheKey::ZeroValues { + address: *tape_address + } + ) { Ok(empty_values) => empty_values, Err(_) => { // Create an empty SegmentTree to get the zero values - let tree = SegmentTree::new(&[&tape_address.as_ref()]); + let tree = SegmentTree::new(&[tape_address.as_ref()]); let empty_values = tree.zero_values; let seeds_bytes = empty_values .into_iter() @@ -214,7 +228,10 @@ pub fn get_or_create_empty_hashes( .collect::>(); // Throw the empty_values into the store for future use - store.put_zero_values(tape_address, &seeds_bytes)?; + store.put_merkle_cache( + &MerkleCacheKey::ZeroValues { address: *tape_address }, + &seeds_bytes + )?; seeds_bytes } }; @@ -304,7 +321,13 @@ mod tests { update_sector_root(&store, &tape_address, 1)?; update_sector_root(&store, &tape_address, 2)?; - let actual_canopy = store.get_layer(&tape_address, SECTOR_TREE_HEIGHT as u8)?; + let actual_canopy = store.get_merkle_cache( + &MerkleCacheKey::UnpackedTapeLayer { + address: tape_address, + layer: SECTOR_TREE_HEIGHT as u8 + } + )?; + assert_eq!(expected_canopy.len(), actual_canopy.len()); assert_eq!(expected_canopy, actual_canopy); @@ -325,7 +348,7 @@ mod tests { Some(data) => Some( Leaf::new(&[ &(i as u64).to_le_bytes(), - &data + data ])), None => Some(empty_leaf), } diff --git a/network/src/store/column/merkle.rs b/network/src/store/column/merkle.rs index 771a9894..46f3cca7 100644 --- a/network/src/store/column/merkle.rs +++ b/network/src/store/column/merkle.rs @@ -1,68 +1,246 @@ use solana_sdk::pubkey::Pubkey; use crate::store::*; +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum MerkleCacheKey { + ZeroValues { address: Pubkey }, // Merkle tree zero hashes + UnpackedTapeLayer { address: Pubkey, layer: u8 }, // Unpacked tape layer nodes + PackedTapeLayer { address: Pubkey, layer: u8 }, // Packed tape layer nodes + Raw36([u8; 36]), +} + +impl MerkleCacheKey { + #[inline(always)] + fn to_vec(&self) -> Vec { + match *self { + MerkleCacheKey::ZeroValues { address } => + build_key(&address, MERKLE_ZEROS, 0), + + MerkleCacheKey::UnpackedTapeLayer { address, layer } => + build_key(&address, TAPE_LAYER, layer), + + MerkleCacheKey::PackedTapeLayer { address, layer } => + build_key(&address, MINER_LAYER, layer), + + MerkleCacheKey::Raw36(bytes) => bytes.to_vec(), + } + } +} + pub trait MerkleOps { - fn get_hash_values(&self, key: Vec) -> Result, StoreError>; - fn put_hash_values(&self, key: Vec, values: &[[u8; 32]]) -> Result<(), StoreError>; + fn get_merkle_cache(&self, key: &MerkleCacheKey) -> Result, StoreError>; + fn put_merkle_cache(&self, key: &MerkleCacheKey, values: &[[u8; 32]]) -> Result<(), StoreError>; - fn get_zero_values(&self, address: &Pubkey) -> Result, StoreError>; - fn put_zero_values(&self, address: &Pubkey, values: &[[u8; 32]]) -> Result<(), StoreError>; + /// Optional: compile-time length check, returns a plain array. + fn get_merkle_cache_fixed(&self, key: &MerkleCacheKey) + -> Result<[[u8; 32]; N], StoreError> + { + let v = self.get_merkle_cache(key)?; + if v.len() != N { + return Err(StoreError::InvalidHashSize(v.len() * 32)); + } - fn get_layer(&self, tape_address: &Pubkey, layer:u8) -> Result, StoreError>; - fn put_layer(&self, tape_address: &Pubkey, layer:u8, values: &[[u8; 32]]) -> Result<(), StoreError>; + let arr: [[u8; 32]; N] = v + .try_into() + .map_err(|vv: Vec<[u8; 32]>| StoreError::InvalidHashSize(vv.len() * 32))?; + Ok(arr) + } } impl MerkleOps for TapeStore { - fn get_hash_values(&self, key: Vec) -> Result, StoreError> { + fn get_merkle_cache(&self, key: &MerkleCacheKey) -> Result, StoreError> { let cf = self.get_cf_handle(ColumnFamily::MerkleHashes)?; - let data = self - .db - .get_cf(&cf, &key)? - .ok_or(StoreError::HashNotFound)?; - - let mut result = vec![]; + let k = key.to_vec(); + let data = self.db.get_cf(&cf, &k)?.ok_or(StoreError::HashNotFound)?; + if data.len() % 32 != 0 { + return Err(StoreError::InvalidHashSize(data.len())); + } + let mut out = Vec::with_capacity(data.len() / 32); for chunk in data.chunks_exact(32) { let mut arr = [0u8; 32]; arr.copy_from_slice(chunk); - result.push(arr); + out.push(arr); } - Ok(result) + Ok(out) } - fn put_hash_values(&self, key: Vec, values: &[[u8; 32]]) -> Result<(), StoreError> { + fn put_merkle_cache(&self, key: &MerkleCacheKey, values: &[[u8; 32]]) -> Result<(), StoreError> { let cf = self.get_cf_handle(ColumnFamily::MerkleHashes)?; + let k = key.to_vec(); + let data: Vec = values.iter().flatten().copied().collect(); + self.db.put_cf(&cf, &k, &data)?; + Ok(()) + } +} + +#[inline(always)] +fn build_key(address: &Pubkey, layer_type: u8, layer_id: u8) -> Vec { + let mut key = Vec::with_capacity(36); + key.extend_from_slice(&address.to_bytes()); // 32 + key.push(layer_id); // 1 + key.extend_from_slice(&[layer_type, 0, 0]); // 3 (type + padding) + key +} + + +#[cfg(test)] +mod tests { + use super::*; + use tempdir::TempDir; + + fn setup_store() -> Result<(TapeStore, TempDir), StoreError> { + let tmp = TempDir::new("rocksdb_test").map_err(StoreError::IoError)?; + let store = TapeStore::new(tmp.path())?; + Ok((store, tmp)) + } - let data = values.iter().flatten().copied().collect::>(); - self.db.put_cf(&cf, &key, &data)?; + fn h(i: u8) -> [u8; 32] { + [i; 32] + } + + #[test] + fn merkle_zero_values_roundtrip_fixed() -> Result<(), StoreError> { + let (store, _tmp) = setup_store()?; + let addr = Pubkey::new_unique(); + + // Prepare exactly SECTOR_TREE_HEIGHT hashes + let mut vals = Vec::<[u8; 32]>::with_capacity(SECTOR_TREE_HEIGHT); + for i in 0..SECTOR_TREE_HEIGHT { + vals.push(h(i as u8)); + } + + // Put + store.put_merkle_cache( + &MerkleCacheKey::ZeroValues { address: addr }, + &vals, + )?; + + // Get + let got: [[u8; 32]; SECTOR_TREE_HEIGHT] = store.get_merkle_cache_fixed( + &MerkleCacheKey::ZeroValues { address: addr }, + )?; + + assert_eq!(got.as_slice(), vals.as_slice()); Ok(()) } - fn get_zero_values(&self, address: &Pubkey) -> Result, StoreError> { - let key = build_key(address, MERKLE_ZEROS, 0); - self.get_hash_values(key) + #[test] + fn merkle_unpacked_layer_roundtrip_vec() -> Result<(), StoreError> { + let (store, _tmp) = setup_store()?; + let addr = Pubkey::new_unique(); + let layer = SECTOR_TREE_HEIGHT as u8; + + // Arbitrary set of hashes for the layer + let vals = vec![h(1), h(2), h(3), h(4)]; + + store.put_merkle_cache( + &MerkleCacheKey::UnpackedTapeLayer { address: addr, layer }, + &vals, + )?; + + let got = store.get_merkle_cache( + &MerkleCacheKey::UnpackedTapeLayer { address: addr, layer }, + )?; + + assert_eq!(got, vals); + Ok(()) } - fn put_zero_values(&self, address: &Pubkey, values: &[[u8; 32]]) -> Result<(), StoreError> { - let key = build_key(address, MERKLE_ZEROS, 0); - self.put_hash_values(key, values) + #[test] + fn merkle_packed_layer_roundtrip_vec() -> Result<(), StoreError> { + let (store, _tmp) = setup_store()?; + let addr = Pubkey::new_unique(); + let layer = (SECTOR_TREE_HEIGHT as u8).saturating_add(1); + + let vals = vec![h(9), h(8)]; + + store.put_merkle_cache( + &MerkleCacheKey::PackedTapeLayer { address: addr, layer }, + &vals, + )?; + + let got = store.get_merkle_cache( + &MerkleCacheKey::PackedTapeLayer { address: addr, layer }, + )?; + + assert_eq!(got, vals); + Ok(()) } - fn get_layer(&self, address: &Pubkey, layer: u8) -> Result, StoreError> { - let key = build_key(address, TAPE_LAYER, layer); - self.get_hash_values(key) + #[test] + fn merkle_raw36_key_roundtrip() -> Result<(), StoreError> { + let (store, _tmp) = setup_store()?; + let addr = Pubkey::new_unique(); + let raw_key: [u8; 36] = { + let v = build_key(&addr, TAPE_LAYER, 7); + let mut arr = [0u8; 36]; + arr.copy_from_slice(&v); + arr + }; + + let vals = vec![h(42), h(43), h(44)]; + + store.put_merkle_cache(&MerkleCacheKey::Raw36(raw_key), &vals)?; + let got = store.get_merkle_cache(&MerkleCacheKey::Raw36(raw_key))?; + + assert_eq!(got, vals); + Ok(()) } - fn put_layer(&self, address: &Pubkey, layer: u8, values: &[[u8; 32]]) -> Result<(), StoreError> { - let key = build_key(address, TAPE_LAYER, layer); - self.put_hash_values(key, values) + #[test] + fn merkle_not_found_returns_error() -> Result<(), StoreError> { + let (store, _tmp) = setup_store()?; + let addr = Pubkey::new_unique(); + + let res = store.get_merkle_cache( + &MerkleCacheKey::UnpackedTapeLayer { address: addr, layer: 0 }, + ); + + assert!(matches!(res, Err(StoreError::HashNotFound))); + Ok(()) } -} -#[inline(always)] -fn build_key(address: &Pubkey, layer_type: u8, layer_id: u8) -> Vec { - let mut key = Vec::with_capacity(36); - key.extend_from_slice(&address.to_bytes()); - key.push(layer_id); - key.extend_from_slice(&[layer_type, 0, 0]); - key + #[test] + fn merkle_invalid_stored_length_is_rejected() -> Result<(), StoreError> { + let (store, _tmp) = setup_store()?; + let addr = Pubkey::new_unique(); + let key_vec = build_key(&addr, TAPE_LAYER, 3); + + // Manually write an invalid-length value (not a multiple of 32) + let cf = store.get_cf_handle(ColumnFamily::MerkleHashes)?; + store.db.put_cf(&cf, &key_vec, &[0u8; 5])?; + + let res = store.get_merkle_cache( + &MerkleCacheKey::UnpackedTapeLayer { address: addr, layer: 3 }, + ); + + match res { + Err(StoreError::InvalidHashSize(len)) => { + assert_eq!(len, 5); + } + other => panic!("expected InvalidHashSize, got {:?}", other), + } + Ok(()) + } + + #[test] + fn merkle_fixed_len_mismatch_is_rejected() -> Result<(), StoreError> { + let (store, _tmp) = setup_store()?; + let addr = Pubkey::new_unique(); + + // Store 3 hashes + let vals = vec![h(1), h(2), h(3)]; + store.put_merkle_cache( + &MerkleCacheKey::PackedTapeLayer { address: addr, layer: 1 }, + &vals, + )?; + + // Ask for a different fixed size (e.g., 4) => should error + let res = store.get_merkle_cache_fixed::<4>( + &MerkleCacheKey::PackedTapeLayer { address: addr, layer: 1 }, + ); + + assert!(matches!(res, Err(StoreError::InvalidHashSize(96)))); + Ok(()) + } } diff --git a/network/src/store/column/mod.rs b/network/src/store/column/mod.rs index 96fd70fb..9a124e24 100644 --- a/network/src/store/column/mod.rs +++ b/network/src/store/column/mod.rs @@ -9,5 +9,5 @@ pub use health::{StoreStaticKeys, HealthOps}; pub use tape::TapeOps; pub use segment::SegmentOps; pub use sector::{SectorOps, Sector}; -pub use merkle::MerkleOps; +pub use merkle::{MerkleOps, MerkleCacheKey}; pub use stats::{LocalStats, StatsOps}; From f6470e6ff8d15827f4551449d88a94a72f769d18 Mon Sep 17 00:00:00 2001 From: Zelimir Fedoran Date: Wed, 27 Aug 2025 19:04:29 -0400 Subject: [PATCH 4/5] hmm --- network/src/archive/pack.rs | 291 +++++++++++++++++++++++++----------- 1 file changed, 202 insertions(+), 89 deletions(-) diff --git a/network/src/archive/pack.rs b/network/src/archive/pack.rs index 923fd166..5bb4b2a1 100644 --- a/network/src/archive/pack.rs +++ b/network/src/archive/pack.rs @@ -9,6 +9,8 @@ use solana_client::nonblocking::rpc_client::RpcClient; use crate::store::*; use super::queue::Rx; +type CanopyTree = MerkleTree<{ SEGMENT_TREE_HEIGHT - SECTOR_TREE_HEIGHT }>; + /// Orchestrator Task C – CPU-heavy preprocessing (packx) pub async fn run(rpc: Arc, mut rx: Rx, miner: Pubkey, store: Arc) -> Result<()> { let epoch = get_epoch_account(&rpc).await?.0; @@ -21,12 +23,68 @@ pub async fn run(rpc: Arc, mut rx: Rx, miner: Pubkey, store: Arc anyhow::Result<()> { log::info!("packx: tape={} seg={}", job.tape, job.seg_no); + let sector_number = job.seg_no / SECTOR_LEAVES as u64; + let local_idx = (job.seg_no % SECTOR_LEAVES as u64) as usize; + + let mut sector = get_or_create_sector(&store, &job.tape, sector_number)?; // <- should be get_or_create + + // Pack the segment, and update the PackedTapeLayer sector root let solved = pack_segment(&miner, &job.data, packing_difficulty)?; - store.put_segment(&job.tape, job.seg_no, solved)?; - // TODO: need a way to check if we need to update the sector root + // Setting the segment here so that we have no special cases + sector.set_segment(local_idx, &solved); + + // Build leaves from the *unpacked* data. The database holds packed bytes, so + // we unpack each present segment. For the incoming segment we use the freshly + // provided unpacked bytes from the job. + let empty_hashes = get_or_create_empty_hashes(&store, &job.tape)?; + let empty_leaf = empty_hashes.first().unwrap().as_leaf(); + let miner_bytes = miner.to_bytes(); + + // Unpacked leaves + let leaves_unpacked = compute_sector_leaves_unpacked( + §or, + sector_number, + &miner_bytes, + empty_leaf, + )?; - update_sector_root(&store, &job.tape, job.seg_no)?; + // Update the PackedTapeLayer sector root + + // Packed leaves + let leaves_packed = compute_sector_leaves_packed( + §or, + sector_number, + empty_leaf, + )?; + + // Compute both roots with the same zero vector + let root_unpacked = compute_sector_root(&leaves_unpacked, &empty_hashes)?; + let root_packed = compute_sector_root(&leaves_packed, &empty_hashes)?; + + // Write both caches: UnpackedTapeLayer and PackedTapeLayer + update_sector_canopy_with_key( + &store, + sector_number, + root_unpacked, + MerkleCacheKey::UnpackedTapeLayer { + address: job.tape, + layer: SECTOR_TREE_HEIGHT as u8, + }, + )?; + + update_sector_canopy_with_key( + &store, + sector_number, + root_packed, + MerkleCacheKey::PackedTapeLayer { + address: job.tape, + layer: SECTOR_TREE_HEIGHT as u8, + }, + )?; + + // Finally, persist the newly packed segment into the sector itself. + store.put_sector(&job.tape, sector_number, §or)?; Ok(()) }) @@ -53,32 +111,6 @@ pub fn pack_segment(miner_address: &Pubkey, segment: &[u8], packing_difficulty: Ok(segment_bytes.to_vec()) } -/// Updates the Merkle subtree for a given segment number. -pub fn update_segment_root( - store: &Arc, - tape_address: &Pubkey, - segment_number: u64, -) -> Result<()> { - let sector_number = segment_number / SECTOR_LEAVES as u64; - update_sector_root(store, tape_address, sector_number) -} - -/// Updates the Merkle subtree for a given sector number. -pub fn update_sector_root( - store: &Arc, - tape_address: &Pubkey, - sector_number: u64, -) -> Result<()> { - - let empty_hashes = get_or_create_empty_hashes(store, tape_address)?; - let empty_leaf = empty_hashes.first().unwrap().as_leaf(); - - let leaves = compute_sector_leaves(store, tape_address, sector_number, empty_leaf)?; - let root = compute_sector_root(&leaves, &empty_hashes)?; - - update_sector_canopy(store, tape_address, sector_number, root) -} - /// Computes the Merkle root for the entire tape by using a cached canopy of sector roots. pub fn get_tape_root( store: &Arc, @@ -108,7 +140,6 @@ pub fn get_tape_root( ))?; // Build canopy over sector roots - type CanopyTree = MerkleTree<{ SEGMENT_TREE_HEIGHT - SECTOR_TREE_HEIGHT }>; let mut canopy = CanopyTree::from_zeros(canopy_zeros); // Load sector roots cached at the sector layer @@ -127,22 +158,44 @@ pub fn get_tape_root( Ok(canopy.get_root()) } -/// Computes leaves for a sector from the store, filling with empty_leaf as needed. -pub fn compute_sector_leaves( - store: &TapeStore, - tape_address: &Pubkey, +/// Computes packed leaves (stored solution bytes). +pub fn compute_sector_leaves_packed( + sector: &Sector, sector_number: u64, empty_leaf: Leaf, ) -> Result> { - let sector = store.get_sector(tape_address, sector_number)?; + let mut leaves = vec![empty_leaf; SECTOR_LEAVES]; + for i in 0..SECTOR_LEAVES { + if let Some(packed) = sector.get_segment(i) { + let segment_id = (sector_number * SECTOR_LEAVES as u64) + i as u64; + leaves[i] = Leaf::new(&[ + &segment_id.to_le_bytes(), + packed, + ]); + } + } + Ok(leaves) +} +/// Computes unpacked leaves (reconstructed 128-byte data from solutions). +pub fn compute_sector_leaves_unpacked( + sector: &Sector, + sector_number: u64, + miner_bytes: &[u8; 32], + empty_leaf: Leaf, +) -> Result> { let mut leaves = vec![empty_leaf; SECTOR_LEAVES]; - for (i, leaf) in leaves.iter_mut().enumerate() { - if let Some(segment) = sector.get_segment(i) { + for i in 0..SECTOR_LEAVES { + if let Some(packed) = sector.get_segment(i) { + let mut arr = [0u8; PACKED_SEGMENT_SIZE]; + arr.copy_from_slice(&packed[..PACKED_SEGMENT_SIZE]); + let sol = packx::Solution::from_bytes(&arr); + let data_unpacked = sol.unpack(miner_bytes); + let segment_id = (sector_number * SECTOR_LEAVES as u64) + i as u64; - *leaf = Leaf::new(&[ + leaves[i] = Leaf::new(&[ &segment_id.to_le_bytes(), - segment + &data_unpacked, ]); } } @@ -178,19 +231,13 @@ pub fn compute_sector_root( Ok(*layer_nodes.first().unwrap()) } -/// Updates the layer in the store with the new sector root. -pub fn update_sector_canopy( +/// Helper to update a specific MerkleCacheKey layer with the new sector root. +fn update_sector_canopy_with_key( store: &TapeStore, - tape_address: &Pubkey, sector_number: u64, root: Hash, + key: MerkleCacheKey, ) -> Result<()> { - - let key = MerkleCacheKey::UnpackedTapeLayer { - address: *tape_address, - layer: SECTOR_TREE_HEIGHT as u8 - }; - let mut layer = match store.get_merkle_cache(&key) { Ok(layer) => layer, Err(_) => vec![[0u8; 32]; (sector_number + 1) as usize], @@ -202,7 +249,6 @@ pub fn update_sector_canopy( layer[sector_number as usize] = root.to_bytes(); store.put_merkle_cache(&key, &layer)?; - Ok(()) } @@ -253,6 +299,17 @@ pub fn get_or_create_empty_hashes( Ok(empty_hashes) } +fn get_or_create_sector( + store: &TapeStore, + tape_address: &Pubkey, + sector_number: u64, +) -> Result { + match store.get_sector(tape_address, sector_number) { + Ok(s) => Ok(s), + Err(StoreError::SectorNotFoundForAddress(_, _)) => Ok(Sector::new()), + Err(e) => Err(e), + } +} #[cfg(test)] mod tests { @@ -279,8 +336,7 @@ mod tests { solution.to_bytes().to_vec() } - #[test] - fn test_subtree_update() -> Result<()> { + fn test_with_larger_stack() -> Result<()> { // Setup store and tape address let (store, _temp_dir) = setup_store()?; @@ -288,89 +344,146 @@ mod tests { let miner_address = Pubkey::new_unique(); let tape_address = Pubkey::new_unique(); let mut tape_tree = SegmentTree::new(&[tape_address.as_ref()]); - let mut leaves = vec![]; + let mut leaves_unpacked_all = vec![]; + // Get tree zero values let empty_values = get_or_create_empty_hashes(&store, &tape_address)?; let empty_leaf = empty_values[0].as_leaf(); assert_eq!(empty_values.len(), SEGMENT_TREE_HEIGHT); assert_eq!(empty_values, empty_values); // Fill the tape with some segments (2.5 sectors worth) + let miner_bytes = miner_address.to_bytes(); let leaf_count = (SECTOR_LEAVES as f64 * 2.5) as usize; for i in 0..leaf_count { let segment_id = i as u64; - let segment_data = create_segment_data(1, &miner_address); - let leaf = Leaf::new(&[ + + // create packed solution bytes (what DB stores) + let segment_data_packed = create_segment_data(1, &miner_address); + + // derive UNPACKED bytes for the expected (in-memory) tree + let mut sol_bytes = [0u8; PACKED_SEGMENT_SIZE]; + sol_bytes.copy_from_slice(&segment_data_packed[..PACKED_SEGMENT_SIZE]); + let sol = packx::Solution::from_bytes(&sol_bytes); + let data_unpacked = sol.unpack(&miner_bytes); + + let leaf_unpacked = Leaf::new(&[ &segment_id.to_le_bytes(), - &segment_data + &data_unpacked ]); - tape_tree.try_add_leaf(leaf).expect("Failed to add leaf"); - store.put_segment(&tape_address, segment_id, segment_data)?; - leaves.push(leaf); + tape_tree.try_add_leaf(leaf_unpacked).expect("Failed to add leaf"); + store.put_segment(&tape_address, segment_id, segment_data_packed)?; + leaves_unpacked_all.push(leaf_unpacked); } - let expected_vals = tape_tree.get_layer_nodes(&leaves, SECTOR_TREE_HEIGHT); - let expected_canopy = expected_vals + // Expected canopy from UNPACKED data + let expected_vals_unpacked = tape_tree.get_layer_nodes(&leaves_unpacked_all, SECTOR_TREE_HEIGHT); + let expected_canopy_unpacked = expected_vals_unpacked .iter() .map(|h| h.to_bytes()) .collect::>(); - // Calculate sector roots and update the stored canopy values - update_sector_root(&store, &tape_address, 0)?; - update_sector_root(&store, &tape_address, 1)?; - update_sector_root(&store, &tape_address, 2)?; + // For the cached layers, compute **both** packed and unpacked sector roots and store them. + for sector_number in 0..=2 { + let sector = store.get_sector(&tape_address, sector_number)?; + + // packed + let leaves_packed = compute_sector_leaves_packed(§or, sector_number, empty_leaf)?; + let root_packed = compute_sector_root(&leaves_packed, &empty_values)?; + + update_sector_canopy_with_key( + &store, + sector_number, + root_packed, + MerkleCacheKey::PackedTapeLayer { + address: tape_address, + layer: SECTOR_TREE_HEIGHT as u8, + }, + )?; - let actual_canopy = store.get_merkle_cache( + // unpacked + let leaves_unpacked = compute_sector_leaves_unpacked(§or, sector_number, &miner_bytes, empty_leaf)?; + let root_unpacked = compute_sector_root(&leaves_unpacked, &empty_values)?; + + update_sector_canopy_with_key( + &store, + sector_number, + root_unpacked, + MerkleCacheKey::UnpackedTapeLayer { + address: tape_address, + layer: SECTOR_TREE_HEIGHT as u8, + }, + )?; + } + + // Validate UNPACKED cache matches the unpacked expected canopy + let actual_canopy_unpacked = store.get_merkle_cache( &MerkleCacheKey::UnpackedTapeLayer { address: tape_address, - layer: SECTOR_TREE_HEIGHT as u8 + layer: SECTOR_TREE_HEIGHT as u8 } )?; - - assert_eq!(expected_canopy.len(), actual_canopy.len()); - assert_eq!(expected_canopy, actual_canopy); + assert_eq!(expected_canopy_unpacked.len(), actual_canopy_unpacked.len()); + assert_eq!(expected_canopy_unpacked, actual_canopy_unpacked); // Lets try a Merkle proof for a segment let segment_number : usize = 1234; let sector_number = (segment_number as u64) / SECTOR_LEAVES as u64; let sector = store.get_sector(&tape_address, sector_number)?; - let expected_proof = tape_tree.get_proof(&leaves, segment_number); // <- Requires all leaves in memory (bad) - let actual_proof = get_cached_merkle_proof( // <- Only one sector in memory (good) + // expected proof from the UNPACKED full tree/leaves + let expected_proof = tape_tree.get_proof(&leaves_unpacked_all, segment_number); + + // actual proof using cached canopy + sector callback (UNPACKED) + let actual_proof = get_cached_merkle_proof( &tape_tree, segment_number, SECTOR_TREE_HEIGHT, - &expected_vals, - |i| { + &expected_vals_unpacked, + |i| { let local_idx = i % SECTOR_LEAVES; - match sector.get_segment(local_idx) { // <- Look ma, no store access here! - Some(data) => Some( - Leaf::new(&[ + match sector.get_segment(local_idx) { + Some(packed) => { + // unpack for UNPACKED leaf + let mut arr = [0u8; PACKED_SEGMENT_SIZE]; + arr.copy_from_slice(&packed[..PACKED_SEGMENT_SIZE]); + let sol = packx::Solution::from_bytes(&arr); + let data_unpacked = sol.unpack(&miner_bytes); + + Some(Leaf::new(&[ &(i as u64).to_le_bytes(), - data - ])), + &data_unpacked, + ])) + } None => Some(empty_leaf), } - - // Same as above, but with store access (slower) - // match store.get_segment(&tape_address, i as u64) { - // Ok(data) => Some( - // Leaf::new(&[ - // &(i as u64).to_le_bytes(), - // &data - // ])), - // Err(_) => Some(empty_leaf), - // } } ); assert_eq!(expected_proof, actual_proof); - //Verify the tape root matches our computed root + //Verify the tape root matches our computed root (using the UNPACKED canopy as in production) let computed_root = get_tape_root(&store, &tape_address)?; + + // Now that both sides are UNPACKED, this is a strong equality check. assert_eq!(computed_root, tape_tree.get_root()); Ok(()) } + + #[test] + fn test_subtree_update() -> Result<()> { + // TODO: get to the bottom of what is eating the stack space + + let _ = std::thread::Builder::new() + .name("larger_stack".into()) + .stack_size(4 * 1024 * 1024) + .spawn(|| test_with_larger_stack()) + .unwrap() + .join() + .unwrap(); + + Ok(()) + } } From 05f4579b765a5990c857fd8726280eab82a06765 Mon Sep 17 00:00:00 2001 From: Zelimir Fedoran Date: Thu, 28 Aug 2025 00:03:11 -0400 Subject: [PATCH 5/5] added canopy root calculation to job --- network/src/archive/pack.rs | 207 +++++++++++++++++------------------- 1 file changed, 95 insertions(+), 112 deletions(-) diff --git a/network/src/archive/pack.rs b/network/src/archive/pack.rs index 5bb4b2a1..11fe95ec 100644 --- a/network/src/archive/pack.rs +++ b/network/src/archive/pack.rs @@ -7,7 +7,7 @@ use tape_client::get_epoch_account; use solana_client::nonblocking::rpc_client::RpcClient; use crate::store::*; -use super::queue::Rx; +use super::queue::{Rx, SegmentJob}; type CanopyTree = MerkleTree<{ SEGMENT_TREE_HEIGHT - SECTOR_TREE_HEIGHT }>; @@ -18,78 +18,79 @@ pub async fn run(rpc: Arc, mut rx: Rx, miner: Pubkey, store: Arc anyhow::Result<()> { log::info!("packx: tape={} seg={}", job.tape, job.seg_no); - let sector_number = job.seg_no / SECTOR_LEAVES as u64; - let local_idx = (job.seg_no % SECTOR_LEAVES as u64) as usize; - - let mut sector = get_or_create_sector(&store, &job.tape, sector_number)?; // <- should be get_or_create - - // Pack the segment, and update the PackedTapeLayer sector root - let solved = pack_segment(&miner, &job.data, packing_difficulty)?; - - // Setting the segment here so that we have no special cases - sector.set_segment(local_idx, &solved); - - // Build leaves from the *unpacked* data. The database holds packed bytes, so - // we unpack each present segment. For the incoming segment we use the freshly - // provided unpacked bytes from the job. - let empty_hashes = get_or_create_empty_hashes(&store, &job.tape)?; - let empty_leaf = empty_hashes.first().unwrap().as_leaf(); - let miner_bytes = miner.to_bytes(); - - // Unpacked leaves - let leaves_unpacked = compute_sector_leaves_unpacked( - §or, - sector_number, - &miner_bytes, - empty_leaf, - )?; - - // Update the PackedTapeLayer sector root - - // Packed leaves - let leaves_packed = compute_sector_leaves_packed( - §or, - sector_number, - empty_leaf, - )?; + process_poa_job(&store, miner, packing_difficulty, job) + }) + .await??; + } - // Compute both roots with the same zero vector - let root_unpacked = compute_sector_root(&leaves_unpacked, &empty_hashes)?; - let root_packed = compute_sector_root(&leaves_packed, &empty_hashes)?; + Ok(()) +} - // Write both caches: UnpackedTapeLayer and PackedTapeLayer - update_sector_canopy_with_key( - &store, - sector_number, - root_unpacked, - MerkleCacheKey::UnpackedTapeLayer { - address: job.tape, - layer: SECTOR_TREE_HEIGHT as u8, - }, - )?; +/// Does a single segment pack + updates both packed/unpacked sector roots + persists sector. +fn process_poa_job( + store: &Arc, + miner: Pubkey, + packing_difficulty: u64, + job: SegmentJob, +) -> anyhow::Result<()> { + let sector_number = job.seg_no / SECTOR_LEAVES as u64; + let local_idx = (job.seg_no % SECTOR_LEAVES as u64) as usize; + + // TODO: rate limit or find a way to determine if this is the last segment in a sector + // (might not be the last segment index for tapes that don't fill the entire last sector). + + let mut sector = get_or_create_sector(store.as_ref(), &job.tape, sector_number)?; + + // Pack the segment, and update the PackedTapeLayer sector root + let solved = pack_segment(&miner, &job.data, packing_difficulty)?; + sector.set_segment(local_idx, &solved); + + let empty_hashes = get_or_create_empty_hashes(store, &job.tape)?; + let empty_leaf = empty_hashes.first().unwrap().as_leaf(); + let miner_bytes = miner.to_bytes(); + + let leaves_unpacked = compute_sector_leaves_unpacked( + §or, + sector_number, + &miner_bytes, + empty_leaf, + )?; + let leaves_packed = compute_sector_leaves_packed( + §or, + sector_number, + empty_leaf, + )?; - update_sector_canopy_with_key( - &store, - sector_number, - root_packed, - MerkleCacheKey::PackedTapeLayer { - address: job.tape, - layer: SECTOR_TREE_HEIGHT as u8, - }, - )?; + // Compute both roots with the same zero vector + let root_unpacked = compute_sector_root(&leaves_unpacked, &empty_hashes)?; + let root_packed = compute_sector_root(&leaves_packed, &empty_hashes)?; + + update_sector_canopy_with_key( + store.as_ref(), + sector_number, + root_unpacked, + MerkleCacheKey::UnpackedTapeLayer { + address: job.tape, + layer: SECTOR_TREE_HEIGHT as u8, + }, + )?; - // Finally, persist the newly packed segment into the sector itself. - store.put_sector(&job.tape, sector_number, §or)?; + update_sector_canopy_with_key( + store.as_ref(), + sector_number, + root_packed, + MerkleCacheKey::PackedTapeLayer { + address: job.tape, + layer: SECTOR_TREE_HEIGHT as u8, + }, + )?; - Ok(()) - }) - .await??; - } + // Finally, add the newly packed segment into the sector itself. + store.put_sector(&job.tape, sector_number, §or)?; Ok(()) } @@ -232,7 +233,7 @@ pub fn compute_sector_root( } /// Helper to update a specific MerkleCacheKey layer with the new sector root. -fn update_sector_canopy_with_key( +pub fn update_sector_canopy_with_key( store: &TapeStore, sector_number: u64, root: Hash, @@ -325,13 +326,15 @@ mod tests { } fn create_segment_data(marker: u8, miner: &Pubkey) -> Vec { + const TEST_DIFFICULTY: u32 = 0; + let data = &[marker; SEGMENT_SIZE]; let canonical_segment = padded_array::(data); let solution = packx::solve( &miner.to_bytes(), &canonical_segment, - 0 - ).expect("Failed to pack segment"); + TEST_DIFFICULTY + ).expect("Failed to pack segment"); solution.to_bytes().to_vec() } @@ -344,9 +347,8 @@ mod tests { let miner_address = Pubkey::new_unique(); let tape_address = Pubkey::new_unique(); let mut tape_tree = SegmentTree::new(&[tape_address.as_ref()]); - let mut leaves_unpacked_all = vec![]; + let mut leaves = vec![]; - // Get tree zero values let empty_values = get_or_create_empty_hashes(&store, &tape_address)?; let empty_leaf = empty_values[0].as_leaf(); assert_eq!(empty_values.len(), SEGMENT_TREE_HEIGHT); @@ -357,53 +359,40 @@ mod tests { let leaf_count = (SECTOR_LEAVES as f64 * 2.5) as usize; for i in 0..leaf_count { let segment_id = i as u64; - - // create packed solution bytes (what DB stores) let segment_data_packed = create_segment_data(1, &miner_address); - // derive UNPACKED bytes for the expected (in-memory) tree + // For the in-memory expected tree, use UNPACKED bytes let mut sol_bytes = [0u8; PACKED_SEGMENT_SIZE]; sol_bytes.copy_from_slice(&segment_data_packed[..PACKED_SEGMENT_SIZE]); let sol = packx::Solution::from_bytes(&sol_bytes); let data_unpacked = sol.unpack(&miner_bytes); - let leaf_unpacked = Leaf::new(&[ + let leaf = Leaf::new(&[ &segment_id.to_le_bytes(), &data_unpacked ]); - tape_tree.try_add_leaf(leaf_unpacked).expect("Failed to add leaf"); + tape_tree.try_add_leaf(leaf).expect("Failed to add leaf"); store.put_segment(&tape_address, segment_id, segment_data_packed)?; - leaves_unpacked_all.push(leaf_unpacked); + leaves.push(leaf); } - // Expected canopy from UNPACKED data - let expected_vals_unpacked = tape_tree.get_layer_nodes(&leaves_unpacked_all, SECTOR_TREE_HEIGHT); - let expected_canopy_unpacked = expected_vals_unpacked + let expected_vals = tape_tree.get_layer_nodes(&leaves, SECTOR_TREE_HEIGHT); + let expected_canopy = expected_vals .iter() .map(|h| h.to_bytes()) .collect::>(); - // For the cached layers, compute **both** packed and unpacked sector roots and store them. + // Calculate sector roots and update the stored canopy values for sector_number in 0..=2 { let sector = store.get_sector(&tape_address, sector_number)?; - // packed - let leaves_packed = compute_sector_leaves_packed(§or, sector_number, empty_leaf)?; - let root_packed = compute_sector_root(&leaves_packed, &empty_values)?; - - update_sector_canopy_with_key( - &store, + let leaves_unpacked = compute_sector_leaves_unpacked( + §or, sector_number, - root_packed, - MerkleCacheKey::PackedTapeLayer { - address: tape_address, - layer: SECTOR_TREE_HEIGHT as u8, - }, + &miner_bytes, + empty_leaf )?; - - // unpacked - let leaves_unpacked = compute_sector_leaves_unpacked(§or, sector_number, &miner_bytes, empty_leaf)?; let root_unpacked = compute_sector_root(&leaves_unpacked, &empty_values)?; update_sector_canopy_with_key( @@ -417,35 +406,31 @@ mod tests { )?; } - // Validate UNPACKED cache matches the unpacked expected canopy - let actual_canopy_unpacked = store.get_merkle_cache( + let actual_canopy = store.get_merkle_cache( &MerkleCacheKey::UnpackedTapeLayer { address: tape_address, - layer: SECTOR_TREE_HEIGHT as u8 + layer: SECTOR_TREE_HEIGHT as u8 } )?; - assert_eq!(expected_canopy_unpacked.len(), actual_canopy_unpacked.len()); - assert_eq!(expected_canopy_unpacked, actual_canopy_unpacked); + + assert_eq!(expected_canopy.len(), actual_canopy.len()); + assert_eq!(expected_canopy, actual_canopy); // Lets try a Merkle proof for a segment let segment_number : usize = 1234; let sector_number = (segment_number as u64) / SECTOR_LEAVES as u64; let sector = store.get_sector(&tape_address, sector_number)?; - // expected proof from the UNPACKED full tree/leaves - let expected_proof = tape_tree.get_proof(&leaves_unpacked_all, segment_number); - - // actual proof using cached canopy + sector callback (UNPACKED) - let actual_proof = get_cached_merkle_proof( + let expected_proof = tape_tree.get_proof(&leaves, segment_number); // <- Requires all leaves in memory (bad) + let actual_proof = get_cached_merkle_proof( // <- Only one sector in memory (good) &tape_tree, segment_number, SECTOR_TREE_HEIGHT, - &expected_vals_unpacked, - |i| { + &expected_vals, + |i| { let local_idx = i % SECTOR_LEAVES; - match sector.get_segment(local_idx) { + match sector.get_segment(local_idx) { // <- Look ma, no store access here! Some(packed) => { - // unpack for UNPACKED leaf let mut arr = [0u8; PACKED_SEGMENT_SIZE]; arr.copy_from_slice(&packed[..PACKED_SEGMENT_SIZE]); let sol = packx::Solution::from_bytes(&arr); @@ -453,7 +438,7 @@ mod tests { Some(Leaf::new(&[ &(i as u64).to_le_bytes(), - &data_unpacked, + &data_unpacked ])) } None => Some(empty_leaf), @@ -463,10 +448,8 @@ mod tests { assert_eq!(expected_proof, actual_proof); - //Verify the tape root matches our computed root (using the UNPACKED canopy as in production) + // Verify the tape root matches our computed root let computed_root = get_tape_root(&store, &tape_address)?; - - // Now that both sides are UNPACKED, this is a strong equality check. assert_eq!(computed_root, tape_tree.get_root()); Ok(()) @@ -474,7 +457,7 @@ mod tests { #[test] fn test_subtree_update() -> Result<()> { - // TODO: get to the bottom of what is eating the stack space + // TODO: get to the bottom of what is eating the stack space in this test let _ = std::thread::Builder::new() .name("larger_stack".into())