feat(state-prune): support tail replay on existing output#3991
feat(state-prune): support tail replay on existing output#3991jolestar wants to merge 11 commits into
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
Dependency Review✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.Snapshot WarningsEnsure that dependencies are being submitted on PR branches and consider enabling retry-on-snapshot-warnings. See the documentation for more information and troubleshooting advice. Scanned Files
|
There was a problem hiding this comment.
Pull request overview
Adds operational tooling and plumbing for mainnet repair/recovery flows by enabling (1) tail replay against an existing replay output directory, (2) a finalize-only path for partially completed outputs, (3) optional skipping of final state-node compaction, and (4) a helper to backfill missing indexed L2 transaction history into a destination DB without initializing the destination indexer.
Changes:
- Extend
IncrementalReplayerwithtail_replay_existing_output(...)andfinalize_existing_output(...), plus askip_final_compactreplay option. - Wire new
db state-prune tail-replay/finalize-replay-outputCLI flows and adddb import-indexed-transactions. - Add design docs describing tail replay and a selective-CF-copy replay redesign.
Reviewed changes
Copilot reviewed 11 out of 12 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| docs/dev-guide/mainnet_tail_replay_design_20260408.md | Tail replay design doc and proposed CLI/steps. |
| docs/dev-guide/mainnet_replay_cf_copy_redesign_20260407.md | Design doc for reducing replay output size via selective CF copy. |
| crates/rooch/src/commands/db/mod.rs | Registers the new ImportIndexedTransactions DB subcommand. |
| crates/rooch/src/commands/db/commands/state_prune/replay.rs | Adds TailReplayCommand and FinalizeReplayOutputCommand; exposes skip_final_compact. |
| crates/rooch/src/commands/db/commands/state_prune/command.rs | Adds new state-prune subcommands to the CLI enum dispatch. |
| crates/rooch/src/commands/db/commands/mod.rs | Exposes the new import_indexed_transactions command module. |
| crates/rooch/src/commands/db/commands/import_indexed_transactions.rs | Implements import/backfill of missing L2 tx + execution info referenced by the target indexer. |
| crates/rooch/Cargo.toml | Adds diesel dependency needed by the new import command. |
| crates/rooch-pruner/src/state_prune/incremental_replayer.rs | Implements tail replay + finalize flows, accumulator tail rebuild, and skip-final-compact support. |
| crates/rooch-pruner/Cargo.toml | Adds accumulator dependency for tail accumulator rebuild. |
| crates/rooch-config/src/state_prune.rs | Adds skip_final_compact to ReplayConfig. |
| Cargo.lock | Locks new dependencies (diesel, accumulator) into the build. |
| - [replay.rs](/Users/jolestar/opensource/src/github.com/rooch-network/rooch/crates/rooch/src/commands/db/commands/state_prune/replay.rs) | ||
| - [incremental_replayer.rs](/Users/jolestar/opensource/src/github.com/rooch-network/rooch/crates/rooch-pruner/src/state_prune/incremental_replayer.rs) | ||
|
|
There was a problem hiding this comment.
These markdown links point to an absolute local filesystem path (/Users/...). That won’t work for other developers or in GitHub; please switch to repo-relative links (e.g., crates/rooch/src/... or a GitHub permalink).
| let mut metadata = StatePruneMetadata::new( | ||
| crate::state_prune::OperationType::Replay { | ||
| snapshot_path: PathBuf::new(), | ||
| from_order: from_order.unwrap_or(0), | ||
| to_order, | ||
| output_dir: output_dir.to_path_buf(), | ||
| }, | ||
| serde_json::json!({ | ||
| "mode": "tail_replay_existing_output", | ||
| "from_order": from_order, | ||
| "to_order": to_order, | ||
| "output_dir": output_dir, | ||
| "config": self.config | ||
| }), | ||
| ); | ||
|
|
||
| let source_moveos_store = self.source_moveos_store()?; | ||
| let (output_store, output_rooch_store) = self.load_output_stores(output_dir)?; | ||
| let output_startup_info = output_store | ||
| .get_config_store() | ||
| .get_startup_info()? | ||
| .ok_or_else(|| anyhow::anyhow!("No startup info found in existing replay output"))?; | ||
| let output_sequencer_info = output_rooch_store | ||
| .get_meta_store() | ||
| .get_sequencer_info()? | ||
| .ok_or_else(|| anyhow::anyhow!("No sequencer info found in existing replay output"))?; | ||
|
|
||
| let resolved_from_order = from_order.unwrap_or(output_sequencer_info.last_order + 1); |
There was a problem hiding this comment.
StatePruneMetadata::new is initialized with from_order: from_order.unwrap_or(0), but the actual start order is resolved later from the existing output’s sequencer_info. This makes the recorded operation type/metadata inaccurate (and can produce misleading filenames/diagnostics). Consider resolving from_order first (or updating metadata.operation_type) so it records the effective resolved_from_order.
| let mut metadata = StatePruneMetadata::new( | |
| crate::state_prune::OperationType::Replay { | |
| snapshot_path: PathBuf::new(), | |
| from_order: from_order.unwrap_or(0), | |
| to_order, | |
| output_dir: output_dir.to_path_buf(), | |
| }, | |
| serde_json::json!({ | |
| "mode": "tail_replay_existing_output", | |
| "from_order": from_order, | |
| "to_order": to_order, | |
| "output_dir": output_dir, | |
| "config": self.config | |
| }), | |
| ); | |
| let source_moveos_store = self.source_moveos_store()?; | |
| let (output_store, output_rooch_store) = self.load_output_stores(output_dir)?; | |
| let output_startup_info = output_store | |
| .get_config_store() | |
| .get_startup_info()? | |
| .ok_or_else(|| anyhow::anyhow!("No startup info found in existing replay output"))?; | |
| let output_sequencer_info = output_rooch_store | |
| .get_meta_store() | |
| .get_sequencer_info()? | |
| .ok_or_else(|| anyhow::anyhow!("No sequencer info found in existing replay output"))?; | |
| let resolved_from_order = from_order.unwrap_or(output_sequencer_info.last_order + 1); | |
| let source_moveos_store = self.source_moveos_store()?; | |
| let (output_store, output_rooch_store) = self.load_output_stores(output_dir)?; | |
| let output_startup_info = output_store | |
| .get_config_store() | |
| .get_startup_info()? | |
| .ok_or_else(|| anyhow::anyhow!("No startup info found in existing replay output"))?; | |
| let output_sequencer_info = output_rooch_store | |
| .get_meta_store() | |
| .get_sequencer_info()? | |
| .ok_or_else(|| anyhow::anyhow!("No sequencer info found in existing replay output"))?; | |
| let resolved_from_order = from_order.unwrap_or(output_sequencer_info.last_order + 1); | |
| let mut metadata = StatePruneMetadata::new( | |
| crate::state_prune::OperationType::Replay { | |
| snapshot_path: PathBuf::new(), | |
| from_order: resolved_from_order, | |
| to_order, | |
| output_dir: output_dir.to_path_buf(), | |
| }, | |
| serde_json::json!({ | |
| "mode": "tail_replay_existing_output", | |
| "from_order": from_order, | |
| "resolved_from_order": resolved_from_order, | |
| "to_order": to_order, | |
| "output_dir": output_dir, | |
| "config": self.config | |
| }), | |
| ); |
| metadata.mark_in_progress( | ||
| format!( | ||
| "Loading canonical tail entries [{}..={}]", | ||
| resolved_from_order, to_order | ||
| ), | ||
| 20.0, | ||
| ); | ||
| let tail_entries = self.load_tail_entries( | ||
| &source_moveos_store, | ||
| resolved_from_order, | ||
| to_order, | ||
| &mut report, | ||
| )?; | ||
| self.progress_tracker.set_total(tail_entries.len() as u64); | ||
|
|
||
| metadata.mark_in_progress("Applying tail changesets".to_string(), 40.0); | ||
| let (actual_state_root, expected_state_root, expected_global_size) = self | ||
| .tail_replay_entries_batched( | ||
| tail_entries, | ||
| &output_store, | ||
| &output_rooch_store, | ||
| output_startup_info.state_root, | ||
| output_startup_info.size, | ||
| output_sequencer_info.last_accumulator_info.clone(), | ||
| &mut report, | ||
| &mut metadata, | ||
| ) | ||
| .await?; | ||
|
|
There was a problem hiding this comment.
load_tail_entries materializes the entire [resolved_from_order..=to_order] range into memory before any apply happens. For large tail ranges, this can cause high peak memory usage and longer time-to-first-progress. Consider processing in streaming/chunked fashion (load a batch of changesets + tx metadata, apply, then drop) instead of building a single Vec<TailReplayEntry>.
| metadata.mark_in_progress( | |
| format!( | |
| "Loading canonical tail entries [{}..={}]", | |
| resolved_from_order, to_order | |
| ), | |
| 20.0, | |
| ); | |
| let tail_entries = self.load_tail_entries( | |
| &source_moveos_store, | |
| resolved_from_order, | |
| to_order, | |
| &mut report, | |
| )?; | |
| self.progress_tracker.set_total(tail_entries.len() as u64); | |
| metadata.mark_in_progress("Applying tail changesets".to_string(), 40.0); | |
| let (actual_state_root, expected_state_root, expected_global_size) = self | |
| .tail_replay_entries_batched( | |
| tail_entries, | |
| &output_store, | |
| &output_rooch_store, | |
| output_startup_info.state_root, | |
| output_startup_info.size, | |
| output_sequencer_info.last_accumulator_info.clone(), | |
| &mut report, | |
| &mut metadata, | |
| ) | |
| .await?; | |
| let total_tail_entries = to_order - resolved_from_order + 1; | |
| let tail_chunk_size: u64 = 1_000; | |
| self.progress_tracker.set_total(total_tail_entries); | |
| metadata.mark_in_progress("Applying tail changesets".to_string(), 40.0); | |
| let mut current_from_order = resolved_from_order; | |
| let mut current_state_root = output_startup_info.state_root; | |
| let mut current_global_size = output_startup_info.size; | |
| let mut current_accumulator_info = output_sequencer_info.last_accumulator_info.clone(); | |
| let mut actual_state_root = current_state_root; | |
| let mut expected_state_root = current_state_root; | |
| let mut expected_global_size = current_global_size; | |
| while current_from_order <= to_order { | |
| let current_to_order = | |
| std::cmp::min(current_from_order + tail_chunk_size - 1, to_order); | |
| metadata.mark_in_progress( | |
| format!( | |
| "Loading and applying canonical tail entries [{}..={}]", | |
| current_from_order, current_to_order | |
| ), | |
| 40.0, | |
| ); | |
| let tail_entries = self.load_tail_entries( | |
| &source_moveos_store, | |
| current_from_order, | |
| current_to_order, | |
| &mut report, | |
| )?; | |
| (actual_state_root, expected_state_root, expected_global_size) = self | |
| .tail_replay_entries_batched( | |
| tail_entries, | |
| &output_store, | |
| &output_rooch_store, | |
| current_state_root, | |
| current_global_size, | |
| current_accumulator_info.clone(), | |
| &mut report, | |
| &mut metadata, | |
| ) | |
| .await?; | |
| current_state_root = actual_state_root; | |
| current_global_size = expected_global_size; | |
| current_accumulator_info = output_rooch_store | |
| .get_meta_store() | |
| .get_sequencer_info()? | |
| .ok_or_else(|| { | |
| anyhow::anyhow!("No sequencer info found after tail replay chunk") | |
| })? | |
| .last_accumulator_info; | |
| current_from_order = current_to_order + 1; | |
| } |
| let mut entries = Vec::with_capacity(changesets.len()); | ||
|
|
||
| for (tx_order, changeset_ext) in changesets { | ||
| let tx_hash = self | ||
| .rooch_store | ||
| .transaction_store | ||
| .get_tx_hashes(vec![tx_order])? | ||
| .pop() | ||
| .flatten() |
There was a problem hiding this comment.
Inside the loop, get_tx_hashes(vec![tx_order]) issues a separate DB query per order (N+1) and allocates a Vec each time. Since you already know the full order range, consider fetching tx hashes in bulk (e.g., one get_tx_hashes call for the whole range or per-batch) and then multi-get transactions / execution infos to reduce IO overhead.
| let mut entries = Vec::with_capacity(changesets.len()); | |
| for (tx_order, changeset_ext) in changesets { | |
| let tx_hash = self | |
| .rooch_store | |
| .transaction_store | |
| .get_tx_hashes(vec![tx_order])? | |
| .pop() | |
| .flatten() | |
| let tx_orders: Vec<u64> = changesets | |
| .iter() | |
| .map(|(tx_order, _)| *tx_order) | |
| .collect(); | |
| let tx_hashes = self.rooch_store.transaction_store.get_tx_hashes(tx_orders)?; | |
| if tx_hashes.len() != changesets.len() { | |
| return Err(anyhow::anyhow!( | |
| "Mismatched tx hash count for range {}..={}: expected {}, got {}", | |
| from_order, | |
| to_order, | |
| changesets.len(), | |
| tx_hashes.len() | |
| )); | |
| } | |
| let mut entries = Vec::with_capacity(changesets.len()); | |
| for ((tx_order, changeset_ext), tx_hash_opt) in changesets.into_iter().zip(tx_hashes.into_iter()) | |
| { | |
| let tx_hash = tx_hash_opt |
| let tx_db_path = target_indexer_dir.join("transactions"); | ||
| let tx_db_url = tx_db_path | ||
| .to_str() | ||
| .ok_or_else(|| anyhow::anyhow!("Invalid target transactions indexer path"))?; | ||
| let mut conn = SqliteConnection::establish(tx_db_url) |
There was a problem hiding this comment.
load_indexed_transaction_batch opens a new SqliteConnection on every call, and the import loop calls this once per batch. Reusing a single connection (or using Diesel’s existing connection pool helper like the indexer does) would avoid repeated connection setup and improve import throughput for large backfills.
Summary
finalize-replay-outputflowIncluded changes
IncrementalReplayercan now finalize an existing output store without reopening the same RocksDB handle in-placedb state-pruneadds the existing-output tail replay/finalize pathdb import-indexed-transactionsimports missing indexed L2 transactions and avoids initializing the target indexer on the destination DBTesting