diff --git a/Cargo.lock b/Cargo.lock index dd4522fd..8b08f200 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -769,7 +769,6 @@ dependencies = [ "anyhow", "borsh", "sled", - "ssz", "strata-asm-common", "strata-asm-worker", "strata-identifiers", @@ -4308,7 +4307,9 @@ name = "moho-types" version = "0.1.0" source = "git+https://github.com/alpenlabs/moho?tag=v0.1-alpha.8#9499bce0ed87d6d2d84bc694ade6867858808244" dependencies = [ + "const-hex", "hex", + "serde", "sha2 0.11.0", "ssz", "ssz_codegen", @@ -7707,6 +7708,46 @@ dependencies = [ "tree_hash_derive", ] +[[package]] +name = "strata-asm-moho-storage" +version = "0.1.0" +dependencies = [ + "anyhow", + "moho-types", + "proptest", + "sled", + "ssz", + "strata-identifiers", + "strata-merkle", + "strata-merkle-node-store", + "strata-predicate", + "tempfile", + "tokio", +] + +[[package]] +name = "strata-asm-moho-worker" +version = "0.1.0" +dependencies = [ + "anyhow", + "moho-runtime-interface", + "moho-types", + "serde", + "strata-asm-common", + "strata-asm-logs", + "strata-asm-params", + "strata-asm-proof-impl", + "strata-asm-spec", + "strata-asm-worker", + "strata-identifiers", + "strata-predicate", + "strata-service", + "strata-tasks", + "strata-test-utils-arb", + "thiserror 2.0.18", + "tracing", +] + [[package]] name = "strata-asm-params" version = "0.1.0" @@ -7732,13 +7773,10 @@ name = "strata-asm-proof-db" version = "0.1.0" dependencies = [ "borsh", - "moho-types", "proptest", "sled", - "ssz", "strata-asm-proof-types", "strata-identifiers", - "strata-predicate", "tempfile", "tokio", "zkaleido", @@ -8056,7 +8094,6 @@ dependencies = [ "k256", "moho-recursive-proof", "moho-runtime-impl", - "moho-runtime-interface", "moho-types", "serde", "serde_json", @@ -8065,7 +8102,8 @@ dependencies = [ "sp1-verifier", "ssz", "strata-asm-common", - "strata-asm-logs", + "strata-asm-moho-storage", + "strata-asm-moho-worker", "strata-asm-params", "strata-asm-proof-db", "strata-asm-proof-impl", diff --git a/Cargo.toml b/Cargo.toml index fbbe00c0..f0c4cea7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,10 @@ members = [ "crates/worker", "crates/storage", + # extensions + "crates/extensions/moho/storage", + "crates/extensions/moho/worker", + # tests "tests", @@ -61,6 +65,8 @@ strata-ams-test-utils = { path = "crates/test-utils-btcio" } strata-asm-common = { path = "crates/common" } strata-asm-logs = { path = "crates/logs" } strata-asm-manifest-types = { path = "crates/manifest-types" } +strata-asm-moho-storage = { path = "crates/extensions/moho/storage" } +strata-asm-moho-worker = { path = "crates/extensions/moho/worker" } strata-asm-params = { path = "crates/params" } strata-asm-proof-db = { path = "crates/proof/db" } strata-asm-proof-impl = { path = "crates/proof/statements" } diff --git a/bin/asm-runner/Cargo.toml b/bin/asm-runner/Cargo.toml index dda2c6d0..8c62c9ec 100644 --- a/bin/asm-runner/Cargo.toml +++ b/bin/asm-runner/Cargo.toml @@ -9,13 +9,13 @@ workspace = true [dependencies] moho-recursive-proof.workspace = true moho-runtime-impl.workspace = true -moho-runtime-interface.workspace = true moho-types.workspace = true strata-asm-rpc.workspace = true strata-btc-verification.workspace = true strata-asm-common.workspace = true -strata-asm-logs.workspace = true +strata-asm-moho-storage.workspace = true +strata-asm-moho-worker.workspace = true strata-asm-params.workspace = true strata-asm-proof-db.workspace = true strata-asm-proof-impl.workspace = true diff --git a/bin/asm-runner/src/bootstrap.rs b/bin/asm-runner/src/bootstrap.rs index 43c8c5e4..b273cab9 100644 --- a/bin/asm-runner/src/bootstrap.rs +++ b/bin/asm-runner/src/bootstrap.rs @@ -2,8 +2,10 @@ use std::sync::Arc; use anyhow::Result; use bitcoind_async_client::{Auth, Client}; +use strata_asm_moho_storage::SledMohoStateDb; +use strata_asm_moho_worker::MohoWorkerBuilder; use strata_asm_params::AsmParams; -use strata_asm_proof_db::{SledMohoStateDb, SledProofDb}; +use strata_asm_proof_db::SledProofDb; use strata_asm_spec::StrataAsmSpec; use strata_asm_worker::AsmWorkerBuilder; use strata_tasks::TaskExecutor; @@ -16,10 +18,11 @@ use tokio::{ use crate::{ block_watcher::drive_asm_from_bitcoin, config::{AsmRpcConfig, BitcoinConfig}, + moho_context::MohoWorkerContextImpl, prover::{InputBuilder, ProofBackend, ProofOrchestrator}, rpc_server::{AsmProofRpcDeps, run_rpc_server}, storage::create_storage, - worker_context::{AsmWorkerContext, MohoStorage}, + worker_context::AsmWorkerContext, }; pub(crate) async fn bootstrap( config: AsmRpcConfig, @@ -33,8 +36,7 @@ pub(crate) async fn bootstrap( let bitcoin_client = Arc::new(connect_bitcoin(&config.bitcoin).await?); // 3. If the orchestrator is configured, open proof storage and build the proof backend up front - // so the worker can receive the moho-state db and the asm predicate. The worker owns - // moho-state writes (including the genesis seed) — see [`MohoStorage`]. + // so the Moho worker and orchestrator can receive the moho-state db and the asm predicate. let runtime_handle = Handle::current(); let orch_prep = if let Some(orch_config) = config.orchestrator { let sled_db = sled::open(&orch_config.proof_db_path)?; @@ -46,14 +48,9 @@ pub(crate) async fn bootstrap( None }; - // 4. Create the worker context, wiring moho storage when available. - let moho_storage = orch_prep.as_ref().map(|(_, _, db, backend)| MohoStorage { - db: db.clone(), - asm_predicate: backend.asm_predicate.clone(), - }); - let export_entries_for_worker = orch_prep.as_ref().map(|_| export_entries_db.clone()); - let genesis_height = params.anchor.block.height() as u64; - + // 4. Create the ASM worker context. Moho state and the export-entries index are no longer + // materialized here; a dedicated Moho worker derives both from each ASM commit (step 7). + // // The worker aligns the DB-side ASM manifest MMR with L1 heights during // startup (`ManifestMmrStore::prefill_manifest_mmr`), so no prefill is // needed here. @@ -63,9 +60,6 @@ pub(crate) async fn bootstrap( &config.bitcoin.retry_config, state_db.clone(), mmr_db.clone(), - export_entries_for_worker, - moho_storage, - genesis_height, ); // 5. Launch ASM worker @@ -100,6 +94,30 @@ pub(crate) async fn bootstrap( moho_predicate, } = backend; + // Spin the Moho worker off onto its own service task, driven by the ASM + // worker's per-block commit stream. It derives each block's MohoState + // (and the export-entry leaves its ExportState MMR commits to) from the + // anchor state the ASM worker committed, and persists both to the same + // stores the orchestrator and RPC read. Subscribe before the block + // watcher is spawned (step 8): the subscription has no replay, so a later + // subscriber would miss already-committed blocks. The genesis Moho state + // is seeded from the ASM genesis anchor during launch. + let moho_context = MohoWorkerContextImpl::new( + runtime_handle.clone(), + bitcoin_client.clone(), + &config.bitcoin.retry_config, + state_db.clone(), + moho_state_db.clone(), + export_entries_db.clone(), + ); + let _moho_worker = MohoWorkerBuilder::new() + .with_context(moho_context) + .with_subscription(asm_worker.subscribe_blocks()) + .with_genesis_block(params.anchor.block) + .with_asm_predicate(asm_predicate.clone()) + .launch(&executor) + .await?; + let input_builder = InputBuilder::new( state_db.clone(), bitcoin_client.clone(), diff --git a/bin/asm-runner/src/main.rs b/bin/asm-runner/src/main.rs index f88ddfa8..e3e67a20 100644 --- a/bin/asm-runner/src/main.rs +++ b/bin/asm-runner/src/main.rs @@ -6,6 +6,7 @@ mod block_watcher; mod bootstrap; mod config; +mod moho_context; mod prover; mod retry; mod rpc_server; diff --git a/bin/asm-runner/src/moho_context.rs b/bin/asm-runner/src/moho_context.rs new file mode 100644 index 00000000..59a3afa6 --- /dev/null +++ b/bin/asm-runner/src/moho_context.rs @@ -0,0 +1,155 @@ +//! Moho worker-context implementation for the ASM runner. +//! +//! [`MohoWorkerContextImpl`] backs the three concern traits the Moho worker +//! interfaces through ([`AsmStateProvider`], [`L1ProviderContext`], +//! [`MohoStateStore`]). It reads the ASM anchor states and logs the ASM worker +//! already committed (via [`AsmStateDb`]), resolves L1 parents from the Bitcoin +//! node, and persists derived Moho states via [`SledMohoStateDb`]. +//! +//! Unlike the ASM worker — which runs on its own thread and can block on Bitcoin +//! RPC directly — the Moho worker runs as an async service. The +//! [`MohoWorkerContext`](strata_asm_moho_worker::MohoWorkerContext) traits are +//! synchronous, so parent resolution bridges to the async client via +//! [`block_in_place`](task::block_in_place); see +//! [`MohoWorkerContextImpl::get_parent_block`]. + +use std::sync::Arc; + +use asm_storage::AsmStateDb; +use bitcoin::BlockHash; +use bitcoind_async_client::{Client, error::ClientError, traits::Reader}; +use moho_types::MohoState; +use strata_asm_common::{AnchorState, AsmLogEntry}; +use strata_asm_moho_storage::{SledExportEntriesDb, SledMohoStateDb}; +use strata_asm_moho_worker::{ + AsmStateProvider, ExportEntryStore, L1ProviderContext, MohoStateStore, MohoWorkerError, + MohoWorkerResult, +}; +use strata_asm_worker::AsmState; +use strata_btc_types::{BlockHashExt, L1BlockIdBitcoinExt}; +use strata_identifiers::L1BlockCommitment; +use tokio::{runtime::Handle, task}; + +use crate::retry::{ExponentialBackoff, RetryConfig, retry_with_backoff_async}; + +/// Storage and L1 access the Moho worker derives per-block Moho states from. +pub(crate) struct MohoWorkerContextImpl { + runtime_handle: Handle, + bitcoin_client: Arc, + /// Backoff schedule for Bitcoin RPC calls. + rpc_backoff: ExponentialBackoff, + /// Maximum retry attempts per Bitcoin RPC call. + rpc_max_retries: u16, + /// ASM anchor states and logs the Moho state is derived from, committed by + /// the ASM worker. + state_db: Arc, + /// Persistence for the derived per-block Moho states. + moho_state_db: SledMohoStateDb, + /// Persistence for the per-container export-entry leaves the Moho state's + /// `ExportState` MMR commits to. + export_entries_db: SledExportEntriesDb, +} + +impl MohoWorkerContextImpl { + pub(crate) fn new( + runtime_handle: Handle, + bitcoin_client: Arc, + retry: &RetryConfig, + state_db: Arc, + moho_state_db: SledMohoStateDb, + export_entries_db: SledExportEntriesDb, + ) -> Self { + Self { + runtime_handle, + bitcoin_client, + rpc_backoff: retry.backoff(), + rpc_max_retries: retry.max_retries, + state_db, + moho_state_db, + export_entries_db, + } + } + + /// Reads the ASM state the ASM worker committed for `blockid`, mapping a + /// miss to [`MohoWorkerError::MissingAsmState`]. + fn anchor(&self, blockid: &L1BlockCommitment) -> MohoWorkerResult { + self.state_db + .get(blockid) + .map_err(|e| MohoWorkerError::Storage(e.to_string()))? + .ok_or(MohoWorkerError::MissingAsmState(*blockid)) + } +} + +impl AsmStateProvider for MohoWorkerContextImpl { + fn get_anchor_state(&self, blockid: &L1BlockCommitment) -> MohoWorkerResult { + Ok(self.anchor(blockid)?.state().clone()) + } + + fn get_anchor_logs(&self, blockid: &L1BlockCommitment) -> MohoWorkerResult> { + Ok(self.anchor(blockid)?.logs().clone()) + } +} + +impl L1ProviderContext for MohoWorkerContextImpl { + fn get_parent_block(&self, block: &L1BlockCommitment) -> MohoWorkerResult { + let block_hash: BlockHash = block.blkid().to_block_hash(); + let client = &self.bitcoin_client; + + // The context traits are synchronous but the Bitcoin RPC is async, and + // the Moho worker runs as an async service — a nested `Handle::block_on` + // would panic. `block_in_place` releases the current worker thread for + // the blocking call so the runtime keeps making progress; it requires + // the multi-threaded runtime the runner builds. + let header = task::block_in_place(|| { + self.runtime_handle.block_on(retry_with_backoff_async( + "btc_get_block_header", + self.rpc_max_retries, + &self.rpc_backoff, + || async { client.get_block_header(&block_hash).await }, + )) + }) + .map_err(|_: ClientError| MohoWorkerError::MissingParentBlock(*block))?; + + let parent_id = header.prev_blockhash.to_l1_block_id(); + Ok(L1BlockCommitment::new(block.height() - 1, parent_id)) + } +} + +impl MohoStateStore for MohoWorkerContextImpl { + fn get_latest_moho_state(&self) -> MohoWorkerResult> { + self.moho_state_db + .get_latest() + .map_err(|e| MohoWorkerError::Storage(e.to_string())) + } + + fn get_moho_state(&self, blockid: &L1BlockCommitment) -> MohoWorkerResult { + self.moho_state_db + .get(*blockid) + .map_err(|e| MohoWorkerError::Storage(e.to_string()))? + .ok_or(MohoWorkerError::MissingMohoState(*blockid)) + } + + fn store_moho_state( + &self, + blockid: &L1BlockCommitment, + state: &MohoState, + ) -> MohoWorkerResult<()> { + self.moho_state_db + .store(*blockid, state.clone()) + .map_err(|e| MohoWorkerError::Storage(e.to_string())) + } +} + +impl ExportEntryStore for MohoWorkerContextImpl { + fn append_export_entry( + &self, + container_id: u8, + height: u32, + entry: [u8; 32], + ) -> MohoWorkerResult<()> { + self.export_entries_db + .append(container_id, height, entry) + .map(|_index| ()) + .map_err(|e| MohoWorkerError::Storage(e.to_string())) + } +} diff --git a/bin/asm-runner/src/prover/input.rs b/bin/asm-runner/src/prover/input.rs index 7d6e289c..a2d1092b 100644 --- a/bin/asm-runner/src/prover/input.rs +++ b/bin/asm-runner/src/prover/input.rs @@ -11,7 +11,8 @@ use moho_recursive_proof::{MohoRecursiveInput, MohoRecursiveOutput}; use moho_runtime_impl::RuntimeInput; use moho_types::{MohoState, RecursiveMohoProof, StepMohoAttestation, StepMohoProof}; use ssz::{Decode, Encode}; -use strata_asm_proof_db::{MohoStateDb, ProofDb, SledMohoStateDb, SledProofDb}; +use strata_asm_moho_storage::{MohoStateDb, SledMohoStateDb}; +use strata_asm_proof_db::{ProofDb, SledProofDb}; use strata_asm_proof_impl::moho_program::input::AsmStepInput; use strata_asm_proof_types::L1Range; use strata_btc_types::{BlockHashExt, L1BlockIdBitcoinExt}; diff --git a/bin/asm-runner/src/rpc_server.rs b/bin/asm-runner/src/rpc_server.rs index d5103f12..3f364bc3 100644 --- a/bin/asm-runner/src/rpc_server.rs +++ b/bin/asm-runner/src/rpc_server.rs @@ -3,7 +3,7 @@ use std::{fmt::Display, sync::Arc, time::Instant}; use anyhow::Result; -use asm_storage::{AsmStateDb, ExportEntriesDb}; +use asm_storage::AsmStateDb; use async_trait::async_trait; use bitcoin::BlockHash; use bitcoind_async_client::{Client, traits::Reader}; @@ -13,7 +13,8 @@ use jsonrpsee::{ types::{ErrorObject, ErrorObjectOwned}, }; use ssz::{Decode, Encode}; -use strata_asm_proof_db::{ProofDb, SledMohoStateDb, SledProofDb}; +use strata_asm_moho_storage::{SledExportEntriesDb, SledMohoStateDb}; +use strata_asm_proof_db::{ProofDb, SledProofDb}; use strata_asm_proof_types::{AsmProof, L1Range, MohoProof}; use strata_asm_proto_bridge_v1::{AssignmentEntry, BridgeV1State, DepositEntry}; use strata_asm_proto_bridge_v1_txs::BRIDGE_V1_SUBPROTOCOL_ID; @@ -167,7 +168,7 @@ impl AsmStateApiServer for AsmRpcServer { pub(crate) struct AsmProofRpcDeps { pub proof_db: SledProofDb, pub moho_state_db: SledMohoStateDb, - pub export_entries_db: ExportEntriesDb, + pub export_entries_db: SledExportEntriesDb, } /// RPC handlers serving ASM and Moho proofs plus the per-block Moho state they're built on. @@ -175,7 +176,7 @@ pub(crate) struct AsmProofRpcServer { bitcoin_client: Arc, proof_db: SledProofDb, moho_state_db: SledMohoStateDb, - export_entries_db: ExportEntriesDb, + export_entries_db: SledExportEntriesDb, } impl AsmProofRpcServer { @@ -263,7 +264,7 @@ enum MmrProofError { /// for bad input or storage failures. fn build_export_entry_mmr_proof( moho_state_db: &SledMohoStateDb, - export_entries_db: &ExportEntriesDb, + export_entries_db: &SledExportEntriesDb, commitment: L1BlockCommitment, container_id: u8, leaf: &[u8], @@ -349,7 +350,7 @@ pub(crate) async fn run_rpc_server( mod tests { //! Tests for [`build_export_entry_mmr_proof`] against real sled storage. //! Mirrors the worker's invariant: each `NewExportEntry` hits both `ExportState` and - //! `ExportEntriesDb` in order. + //! `SledExportEntriesDb` in order. use moho_types::{ExportState, InnerStateCommitment, MohoState}; use ssz::Decode; use strata_identifiers::{Buf32, L1BlockCommitment, L1BlockId}; @@ -365,13 +366,13 @@ mod tests { fn temp_dbs() -> ( sled::Db, SledMohoStateDb, - ExportEntriesDb, + SledExportEntriesDb, tempfile::TempDir, ) { let dir = tempfile::tempdir().unwrap(); let sled_db = sled::open(dir.path()).unwrap(); let moho_state_db = SledMohoStateDb::open(&sled_db).unwrap(); - let export_entries_db = ExportEntriesDb::open(&sled_db).unwrap(); + let export_entries_db = SledExportEntriesDb::open(&sled_db).unwrap(); (sled_db, moho_state_db, export_entries_db, dir) } @@ -384,10 +385,10 @@ mod tests { } /// Same dual-write the worker does per block: each entry hits both the - /// `ExportState` MMR and the `ExportEntriesDb` leaf log. + /// `ExportState` MMR and the `SledExportEntriesDb` leaf log. fn apply_block( moho: &SledMohoStateDb, - idx: &ExportEntriesDb, + idx: &SledExportEntriesDb, prev: MohoState, at: L1BlockCommitment, entries: &[(u8, [u8; 32])], diff --git a/bin/asm-runner/src/storage.rs b/bin/asm-runner/src/storage.rs index d1b2b154..3e6474c0 100644 --- a/bin/asm-runner/src/storage.rs +++ b/bin/asm-runner/src/storage.rs @@ -3,17 +3,18 @@ use std::sync::Arc; use anyhow::Result; -use asm_storage::{AsmStateDb, ExportEntriesDb, MmrDb}; +use asm_storage::{AsmStateDb, MmrDb}; +use strata_asm_moho_storage::SledExportEntriesDb; use crate::config::DatabaseConfig; /// Create storage backends for the ASM runner. pub(crate) fn create_storage( config: &DatabaseConfig, -) -> Result<(Arc, Arc, ExportEntriesDb)> { +) -> Result<(Arc, Arc, SledExportEntriesDb)> { let db = sled::open(&config.path)?; let state_db = Arc::new(AsmStateDb::open(&db)?); let mmr_db = Arc::new(MmrDb::open(&db)?); - let export_entries_db = ExportEntriesDb::open(&db)?; + let export_entries_db = SledExportEntriesDb::open(&db)?; Ok((state_db, mmr_db, export_entries_db)) } diff --git a/bin/asm-runner/src/worker_context.rs b/bin/asm-runner/src/worker_context.rs index dac16030..ae4a10ae 100644 --- a/bin/asm-runner/src/worker_context.rs +++ b/bin/asm-runner/src/worker_context.rs @@ -3,56 +3,29 @@ //! Implements the four [`WorkerContext`](strata_asm_worker::WorkerContext) //! concern traits ([`L1DataProvider`], [`AnchorStateStore`], //! [`ManifestMmrStore`], [`AuxDataStore`]) for [`AsmWorkerContext`]. -//! -//! # Moho extension -//! -//! When [`MohoStorage`] is configured, we piggyback on the ASM worker: every -//! anchor-state write in [`AnchorStateStore::store_anchor_state`] also -//! materializes and persists the derived [`MohoState`] for the same -//! [`L1BlockCommitment`]. The two databases advance together under a single -//! call — Moho does not run its own driver, does not subscribe to L1, and -//! does not manage its own chain view. Whatever block sequence the ASM worker -//! decides to apply (including any future reorg handling it gains) is the -//! sequence Moho sees, for free. use std::sync::Arc; -use asm_storage::{AsmStateDb, ExportEntriesDb, MmrDb}; +use asm_storage::{AsmStateDb, MmrDb}; use bitcoin::{Block, BlockHash, Network}; use bitcoind_async_client::{Client, error::ClientError, traits::Reader}; -use moho_runtime_interface::MohoProgram; -use moho_types::{ExportState, MohoState}; -use strata_asm_common::{AnchorState, AsmManifest, AsmManifestHash, AuxData}; -use strata_asm_logs::NewExportEntry; -use strata_asm_proof_db::SledMohoStateDb; -use strata_asm_proof_impl::moho_program::program::{ - AsmStfProgram, advance_export_state_with_logs, extract_next_predicate_from_logs, -}; +use strata_asm_common::{AsmManifest, AsmManifestHash, AuxData}; use strata_asm_worker::{ AnchorStateStore, AsmState, AuxDataStore, L1DataProvider, ManifestMmrStore, WorkerError, WorkerResult, }; -use strata_btc_types::{BitcoinTxid, BlockHashExt, L1BlockIdBitcoinExt, RawBitcoinTx}; +use strata_btc_types::{BitcoinTxid, L1BlockIdBitcoinExt, RawBitcoinTx}; use strata_identifiers::{L1BlockCommitment, L1BlockId}; use strata_merkle::MerkleProofB32; -use strata_predicate::PredicateKey; use tokio::runtime::Handle; use crate::retry::{ExponentialBackoff, RetryConfig, retry_with_backoff_async}; -/// Dependencies the worker needs to materialize per-block [`MohoState`] -/// alongside each anchor state. `asm_predicate` is used only to seed the -/// genesis entry; every subsequent block is chain-forward from the parent. -pub(crate) struct MohoStorage { - pub db: SledMohoStateDb, - pub asm_predicate: PredicateKey, -} - /// ASM [`WorkerContext`](strata_asm_worker::WorkerContext) implementation. /// /// Fetches L1 blocks from a Bitcoin node and persists state via local sled -/// storage. When [`MohoStorage`] is supplied, each anchor-state write also -/// materializes the derived [`MohoState`] for the same block. +/// storage. Moho state and the export-entries index are derived separately by +/// the Moho worker; see [`moho_context`](crate::moho_context). pub(crate) struct AsmWorkerContext { runtime_handle: Handle, bitcoin_client: Arc, @@ -62,26 +35,15 @@ pub(crate) struct AsmWorkerContext { rpc_max_retries: u16, state_db: Arc, mmr_db: Arc, - export_entries_db: Option, - moho_storage: Option, - /// L1 height of the chain genesis (anchor) block. - genesis_height: u64, } impl AsmWorkerContext { - #[expect( - clippy::too_many_arguments, - reason = "constructor wires every dependency the worker holds; one call site" - )] pub(crate) fn new( runtime_handle: Handle, bitcoin_client: Arc, retry: &RetryConfig, state_db: Arc, mmr_db: Arc, - export_entries_db: Option, - moho_storage: Option, - genesis_height: u64, ) -> Self { Self { runtime_handle, @@ -90,52 +52,8 @@ impl AsmWorkerContext { rpc_max_retries: retry.max_retries, state_db, mmr_db, - export_entries_db, - moho_storage, - genesis_height, } } - - /// Materialize and persist the derived [`MohoState`] for this anchor state. - /// No-op when [`MohoStorage`] is not configured. - /// - /// Genesis is identified by the block commitment's height matching the - /// configured `genesis_height`. For non-genesis blocks we read the parent's - /// `MohoState` and chain forward. - fn compute_and_store_moho_state( - &self, - blockid: &L1BlockCommitment, - asm_state: &AsmState, - ) -> WorkerResult<()> { - let Some(moho) = &self.moho_storage else { - return Ok(()); - }; - - let genesis_height = self.genesis_height; - - let moho_state = if blockid.height() as u64 == genesis_height { - construct_genesis_moho_state(moho.asm_predicate.clone(), asm_state.state()) - } else { - let block = self.get_l1_block(blockid.blkid())?; - let parent = L1BlockCommitment::new( - blockid.height() - 1, - block.header.prev_blockhash.to_l1_block_id(), - ); - - let prev_moho = moho - .db - .get(parent) - .map_err(|_| WorkerError::DbError)? - .ok_or(WorkerError::DbError)?; // TODO(STR-3124): use appropriate error types after fixing the piggybanking on ASM worker - construct_next_moho_state(&prev_moho, asm_state) - }; - - moho.db - .store(*blockid, moho_state) - .map_err(|_| WorkerError::DbError)?; - - Ok(()) - } } impl L1DataProvider for AsmWorkerContext { @@ -202,30 +120,6 @@ impl AnchorStateStore for AsmWorkerContext { blockid: &L1BlockCommitment, state: &AsmState, ) -> WorkerResult<()> { - // Write order matters: moho and export_entries first, then anchor. The worker tracks - // progress via the anchor db (see get_latest_asm_state), so the anchor write is the - // effective commit point for this block. If we crash before it, progress has not - // advanced, so on restart the worker reprocesses this block and overwrites the - // orphaned entries with the same values. Reversing the order would risk advancing - // progress past a block whose moho or export_entries state was never persisted. - self.compute_and_store_moho_state(blockid, state)?; - - // Index each `NewExportEntry` alongside the MohoState's compact MMR so - // the RPC can regenerate inclusion proofs later. - if let Some(ref export_entries_db) = self.export_entries_db { - for log in state.logs() { - if let Ok(export) = log.try_into_log::() { - export_entries_db - .append( - export.container_id(), - blockid.height(), - *export.entry_data(), - ) - .map_err(|_| WorkerError::DbError)?; - } - } - } - self.state_db .put(blockid, state) .map_err(|_| WorkerError::DbError)?; @@ -289,25 +183,3 @@ impl AuxDataStore for AsmWorkerContext { .ok_or(WorkerError::MissingAuxData(*blockid)) } } - -/// Seed the genesis [`MohoState`]: no prior state to chain forward from, so we -/// use the configured `asm_predicate` and an empty export state. -fn construct_genesis_moho_state( - asm_predicate: PredicateKey, - genesis_anchor_state: &AnchorState, -) -> MohoState { - let inner = AsmStfProgram::compute_state_commitment(genesis_anchor_state); - let export_state = ExportState::new(vec![]).expect("empty export state is always valid"); - MohoState::new(inner, asm_predicate, export_state) -} - -/// Chain-forward the [`MohoState`]: let STF logs drive predicate and export -/// state updates, and recompute the inner commitment from the new anchor state. -fn construct_next_moho_state(prev_moho: &MohoState, state: &AsmState) -> MohoState { - let next_predicate = extract_next_predicate_from_logs(state.logs()) - .unwrap_or_else(|| prev_moho.next_predicate().clone()); - let next_export_state = - advance_export_state_with_logs(prev_moho.export_state().clone(), state.logs()); - let inner = AsmStfProgram::compute_state_commitment(state.state()); - MohoState::new(inner, next_predicate, next_export_state) -} diff --git a/crates/extensions/moho/storage/Cargo.toml b/crates/extensions/moho/storage/Cargo.toml new file mode 100644 index 00000000..3493a967 --- /dev/null +++ b/crates/extensions/moho/storage/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "strata-asm-moho-storage" +version = "0.1.0" +edition = "2024" + +[dependencies] +moho-types.workspace = true +strata-identifiers.workspace = true +strata-merkle = { workspace = true, features = ["ssz"] } +strata-merkle-node-store.workspace = true + +anyhow.workspace = true +sled.workspace = true +ssz.workspace = true + +[dev-dependencies] +proptest.workspace = true +strata-predicate.workspace = true +tempfile.workspace = true +tokio.workspace = true + +[lints] +workspace = true diff --git a/crates/extensions/moho/storage/src/export_entries.rs b/crates/extensions/moho/storage/src/export_entries.rs new file mode 100644 index 00000000..92f11d89 --- /dev/null +++ b/crates/extensions/moho/storage/src/export_entries.rs @@ -0,0 +1,60 @@ +//! Storage trait for per-container export-entry indexes. +//! +//! [`MohoState`](moho_types::MohoState) keeps only each container's compact MMR +//! (its peaks), so the original 32-byte leaves can't be recovered from it. An +//! export-entries store mirrors those leaves so the RPC can rebuild inclusion +//! proofs on demand. Containers are namespaced by `container_id`; each behaves +//! as an independent MMR over its entry hashes. + +use std::fmt::Debug; + +use strata_merkle::MerkleProofB32; + +/// Persistence interface for the per-container export-entry index. +pub trait ExportEntriesDb { + /// The error type returned by database operations. + type Error: Debug; + + /// Appends an entry for `container_id` and resolves to its `mmr_index`. + /// + /// Idempotent: a duplicate `(container_id, entry)` resolves to the original + /// index unchanged, so block replays after restart are a no-op. Assumes + /// `(container_id, entry_hash)` is unique within a correct chain. + fn append_entry( + &self, + container_id: u8, + height: u32, + entry: [u8; 32], + ) -> impl Future> + Send; + + /// Resolves to the number of entries currently stored for `container_id`. + fn entry_count( + &self, + container_id: u8, + ) -> impl Future> + Send; + + /// Reverse lookup: resolves to `(mmr_index, insertion_height)` for `hash` + /// under `container_id`, or `None` if absent. + fn find_entry_index( + &self, + container_id: u8, + hash: [u8; 32], + ) -> impl Future, Self::Error>> + Send; + + /// Resolves to `(insertion_height, entry_hash)` at `(container_id, mmr_index)`, + /// or `None` if absent. + fn get_entry( + &self, + container_id: u8, + mmr_index: u64, + ) -> impl Future, Self::Error>> + Send; + + /// Generates an inclusion proof for `mmr_index` against the container's MMR + /// at size `at_leaf_count`. + fn generate_entry_proof( + &self, + container_id: u8, + mmr_index: u64, + at_leaf_count: u64, + ) -> impl Future> + Send; +} diff --git a/crates/extensions/moho/storage/src/lib.rs b/crates/extensions/moho/storage/src/lib.rs new file mode 100644 index 00000000..1f31d17a --- /dev/null +++ b/crates/extensions/moho/storage/src/lib.rs @@ -0,0 +1,24 @@ +//! Persistence layer for the Moho worker. +//! +//! The Moho worker derives a [`moho_types::MohoState`] for each L1 block it +//! processes and persists it here, keyed by the block's +//! [`L1BlockCommitment`](strata_identifiers::L1BlockCommitment). Alongside it +//! the worker mirrors the per-container export-entry leaves of the state's +//! `ExportState` MMR so the RPC can rebuild inclusion proofs on demand. +//! +//! Each store is split into a backend-agnostic trait and a sled-backed +//! implementation: +//! +//! - [`MohoStateDb`] / [`SledMohoStateDb`] — the Moho-state store, keyed by L1 block commitment. +//! - [`ExportEntriesDb`] / [`SledExportEntriesDb`] — the per-container export-entry index mirroring +//! the `ExportState` MMR leaves. + +mod export_entries; +mod moho_state; +mod sled; + +pub use self::{ + export_entries::ExportEntriesDb, + moho_state::MohoStateDb, + sled::{SledExportEntriesDb, SledMohoStateDb}, +}; diff --git a/crates/proof/db/src/moho_state.rs b/crates/extensions/moho/storage/src/moho_state.rs similarity index 100% rename from crates/proof/db/src/moho_state.rs rename to crates/extensions/moho/storage/src/moho_state.rs diff --git a/crates/storage/src/export_entries.rs b/crates/extensions/moho/storage/src/sled/export_entries.rs similarity index 77% rename from crates/storage/src/export_entries.rs rename to crates/extensions/moho/storage/src/sled/export_entries.rs index 4aca4602..f7ea126e 100644 --- a/crates/storage/src/export_entries.rs +++ b/crates/extensions/moho/storage/src/sled/export_entries.rs @@ -1,8 +1,4 @@ -//! Sled-backed index of per-container export entries. -//! -//! `MohoState` keeps only each container's compact MMR (peaks), so the -//! original 32-byte leaves can't be recovered from it. We mirror them here so -//! the RPC can rebuild inclusion proofs on demand. +//! [`ExportEntriesDb`](crate::ExportEntriesDb) implementation backed by sled. //! //! Backed by [`strata_merkle_node_store`]: every MMR node is persisted, so a //! proof is `O(log n)` with no leaf replay. Containers share one node tree, @@ -14,6 +10,8 @@ use anyhow::{Context, Result}; use strata_merkle::{MerkleProofB32, Sha256Hasher}; use strata_merkle_node_store::{MmrNodeStore, NodePos, StoredMmr}; +use crate::ExportEntriesDb; + /// Decodes a stored 32-byte node value into a hash. /// /// The store only ever writes 32-byte values, so a wrong length is disk @@ -66,17 +64,17 @@ impl MmrNodeStore for ContainerNodes<'_> { } } -/// Per-container export-entry store: a namespaced MMR node tree plus a -/// `(container_id, index) → height` map and a reverse +/// Sled-backed per-container export-entry store: a namespaced MMR node tree plus +/// a `(container_id, index) → height` map and a reverse /// `(container_id, hash) → index` map. #[derive(Debug, Clone)] -pub struct ExportEntriesDb { +pub struct SledExportEntriesDb { nodes: sled::Tree, heights: sled::Tree, index_by_hash: sled::Tree, } -impl ExportEntriesDb { +impl SledExportEntriesDb { /// Opens or creates the export entries trees in the given sled instance. pub fn open(db: &sled::Db) -> Result { Ok(Self { @@ -104,11 +102,11 @@ impl ExportEntriesDb { } } - /// Appends an entry for `container_id` and returns its `mmr_index`. + /// Synchronous variant of [`ExportEntriesDb::append_entry`]. /// - /// Idempotent: a duplicate `(container_id, entry)` returns the original - /// index unchanged, so block replays after restart are a no-op. Assumes - /// `(container_id, entry_hash)` is unique within a correct chain. + /// The Moho worker appends entries from its synchronous `ExportEntryStore` + /// impl while running as an async service, so it calls these sync methods + /// directly rather than the async trait below. pub fn append(&self, container_id: u8, height: u32, entry: [u8; 32]) -> Result { let hash_key = encode_hash_key(container_id, &entry); if let Some(existing) = self.index_by_hash.get(hash_key)? { @@ -126,15 +124,14 @@ impl ExportEntriesDb { Ok(index) } - /// Returns the number of entries currently stored for `container_id`. + /// Synchronous variant of [`ExportEntriesDb::entry_count`]. See [`Self::append`]. pub fn num_entries(&self, container_id: u8) -> Result { Ok(StoredMmr::::leaf_count( &self.container(container_id), )?) } - /// Reverse lookup: returns `(mmr_index, insertion_height)` for `hash` - /// under `container_id`, or `None` if absent. + /// Synchronous variant of [`ExportEntriesDb::find_entry_index`]. See [`Self::append`]. pub fn find_index(&self, container_id: u8, hash: &[u8; 32]) -> Result> { let hash_key = encode_hash_key(container_id, hash); let Some(idx_bytes) = self.index_by_hash.get(hash_key)? else { @@ -147,7 +144,7 @@ impl ExportEntriesDb { Ok(Some((mmr_index, height))) } - /// Fetches `(insertion_height, entry_hash)` at `(container_id, mmr_index)`. + /// Synchronous variant of [`ExportEntriesDb::get_entry`]. See [`Self::append`]. pub fn get(&self, container_id: u8, mmr_index: u64) -> Result> { let Some(hash) = StoredMmr::::get_leaf(&self.container(container_id), mmr_index)? @@ -160,8 +157,7 @@ impl ExportEntriesDb { Ok(Some((height, hash))) } - /// Generates an inclusion proof for `mmr_index` against the container's - /// MMR at size `at_leaf_count`. + /// Synchronous variant of [`ExportEntriesDb::generate_entry_proof`]. See [`Self::append`]. /// /// `O(log n)`: walks the stored sibling path rather than replaying leaves. /// The store yields a generic [`MerkleProof`](strata_merkle::MerkleProof); @@ -182,6 +178,39 @@ impl ExportEntriesDb { } } +impl ExportEntriesDb for SledExportEntriesDb { + type Error = anyhow::Error; + + async fn append_entry(&self, container_id: u8, height: u32, entry: [u8; 32]) -> Result { + self.append(container_id, height, entry) + } + + async fn entry_count(&self, container_id: u8) -> Result { + self.num_entries(container_id) + } + + async fn find_entry_index( + &self, + container_id: u8, + hash: [u8; 32], + ) -> Result> { + self.find_index(container_id, &hash) + } + + async fn get_entry(&self, container_id: u8, mmr_index: u64) -> Result> { + self.get(container_id, mmr_index) + } + + async fn generate_entry_proof( + &self, + container_id: u8, + mmr_index: u64, + at_leaf_count: u64, + ) -> Result { + self.generate_proof(container_id, mmr_index, at_leaf_count) + } +} + fn encode_key(container_id: u8, mmr_index: u64) -> [u8; 9] { let mut key = [0u8; 9]; key[0] = container_id; @@ -206,6 +235,7 @@ fn decode_idx(bytes: &[u8]) -> Result { mod tests { use ssz::{Decode, Encode}; use strata_merkle::{Mmr, Mmr64B32, MmrState, Sha256Hasher}; + use tokio::runtime::Runtime; use super::*; @@ -226,7 +256,7 @@ mod tests { #[test] fn append_assigns_monotonic_indices_per_container() { let db = test_db(); - let store = ExportEntriesDb::open(&db).unwrap(); + let store = SledExportEntriesDb::open(&db).unwrap(); assert_eq!(store.append(1, 10, hash(0xa1)).unwrap(), 0); assert_eq!(store.append(1, 11, hash(0xa2)).unwrap(), 1); @@ -238,7 +268,7 @@ mod tests { #[test] fn num_entries_matches_appends() { let db = test_db(); - let store = ExportEntriesDb::open(&db).unwrap(); + let store = SledExportEntriesDb::open(&db).unwrap(); assert_eq!(store.num_entries(7).unwrap(), 0); for i in 0..5u8 { @@ -251,7 +281,7 @@ mod tests { #[test] fn get_returns_none_for_unknown() { let db = test_db(); - let store = ExportEntriesDb::open(&db).unwrap(); + let store = SledExportEntriesDb::open(&db).unwrap(); store.append(1, 42, hash(0xaa)).unwrap(); assert!(store.get(1, 1).unwrap().is_none()); @@ -261,7 +291,7 @@ mod tests { #[test] fn get_returns_height_and_hash() { let db = test_db(); - let store = ExportEntriesDb::open(&db).unwrap(); + let store = SledExportEntriesDb::open(&db).unwrap(); store.append(3, 999, hash(0xcc)).unwrap(); let (height, got) = store.get(3, 0).unwrap().unwrap(); @@ -272,7 +302,7 @@ mod tests { #[test] fn find_index_returns_match_with_height() { let db = test_db(); - let store = ExportEntriesDb::open(&db).unwrap(); + let store = SledExportEntriesDb::open(&db).unwrap(); store.append(1, 10, hash(0xa0)).unwrap(); store.append(1, 11, hash(0xa1)).unwrap(); store.append(1, 12, hash(0xa2)).unwrap(); @@ -287,7 +317,7 @@ mod tests { #[test] fn append_is_idempotent_on_duplicate_hash() { let db = test_db(); - let store = ExportEntriesDb::open(&db).unwrap(); + let store = SledExportEntriesDb::open(&db).unwrap(); let idx0 = store.append(1, 10, hash(0xa0)).unwrap(); let idx1 = store.append(1, 11, hash(0xa1)).unwrap(); @@ -303,7 +333,7 @@ mod tests { /// Reference compact-peaks MMR built by replaying the first `size` leaves /// of `container_id`, matching the accumulators that proofs verify against. - fn rebuild_compact_mmr(store: &ExportEntriesDb, container_id: u8, size: u64) -> Mmr64B32 { + fn rebuild_compact_mmr(store: &SledExportEntriesDb, container_id: u8, size: u64) -> Mmr64B32 { let mut compact = Mmr64B32::new_empty(); for i in 0..size { let (_h, hash) = store.get(container_id, i).unwrap().unwrap(); @@ -315,7 +345,7 @@ mod tests { #[test] fn generate_and_verify_proof_single_leaf() { let db = test_db(); - let store = ExportEntriesDb::open(&db).unwrap(); + let store = SledExportEntriesDb::open(&db).unwrap(); let h = hash(0x01); store.append(4, 100, h).unwrap(); @@ -327,7 +357,7 @@ mod tests { #[test] fn generate_proofs_for_all_leaves() { let db = test_db(); - let store = ExportEntriesDb::open(&db).unwrap(); + let store = SledExportEntriesDb::open(&db).unwrap(); for i in 0u8..8 { store.append(5, 1000 + i as u32, hash(i)).unwrap(); } @@ -344,7 +374,7 @@ mod tests { #[test] fn proof_at_earlier_size_is_valid() { let db = test_db(); - let store = ExportEntriesDb::open(&db).unwrap(); + let store = SledExportEntriesDb::open(&db).unwrap(); for i in 0u8..4 { store.append(6, 100 + i as u32, hash(i)).unwrap(); @@ -362,7 +392,7 @@ mod tests { #[test] fn proof_ssz_roundtrip_verifies() { let db = test_db(); - let store = ExportEntriesDb::open(&db).unwrap(); + let store = SledExportEntriesDb::open(&db).unwrap(); for i in 0u8..5 { store.append(9, 200 + i as u32, hash(i)).unwrap(); } @@ -374,4 +404,26 @@ mod tests { let compact = rebuild_compact_mmr(&store, 9, 5); assert!(compact.verify(&decoded, &hash(3))); } + + /// Exercises the async [`ExportEntriesDb`] trait surface, proving the + /// methods delegate to their synchronous counterparts. + #[test] + fn async_trait_delegates_to_sync() { + let db = test_db(); + let store = SledExportEntriesDb::open(&db).unwrap(); + + Runtime::new().unwrap().block_on(async { + assert_eq!(store.append_entry(1, 10, hash(0xa1)).await.unwrap(), 0); + assert_eq!(store.entry_count(1).await.unwrap(), 1); + assert_eq!( + store.find_entry_index(1, hash(0xa1)).await.unwrap(), + Some((0, 10)) + ); + assert_eq!(store.get_entry(1, 0).await.unwrap(), Some((10, hash(0xa1)))); + + let proof = store.generate_entry_proof(1, 0, 1).await.unwrap(); + let compact = rebuild_compact_mmr(&store, 1, 1); + assert!(compact.verify(&proof, &hash(0xa1))); + }); + } } diff --git a/crates/extensions/moho/storage/src/sled/mod.rs b/crates/extensions/moho/storage/src/sled/mod.rs new file mode 100644 index 00000000..138a41b1 --- /dev/null +++ b/crates/extensions/moho/storage/src/sled/mod.rs @@ -0,0 +1,85 @@ +//! [Sled](https://docs.rs/sled)-backed implementation of [`super::MohoStateDb`]. +//! +//! State is stored in a single sled tree. Keys use big-endian height encoding so +//! that sled's lexicographic ordering matches block-height ordering, which is +//! required for the range scans `prune` relies on. + +use strata_identifiers::{Buf32, L1BlockCommitment, L1BlockId}; + +mod export_entries; +mod moho_state; + +pub use self::{export_entries::SledExportEntriesDb, moho_state::SledMohoStateDb}; + +// ── Key encoding ────────────────────────────────────────────────────── +// +// We use a custom big-endian encoding for block commitment keys instead of +// borsh/bincode because those serialize integers as little-endian. Big-endian +// encoding ensures that sled's lexicographic key ordering matches block-height +// ordering, which is required for range scans. + +/// Size of an encoded [`L1BlockCommitment`]: 4-byte BE height + 32-byte block id. +const ENCODED_L1_COMMITMENT_SIZE: usize = 4 + 32; + +/// Encodes an [`L1BlockCommitment`] as 36 bytes: `[height_be(4)][blkid(32)]`. +pub(crate) fn encode_block_commitment( + commitment: &L1BlockCommitment, +) -> [u8; ENCODED_L1_COMMITMENT_SIZE] { + let mut buf = [0u8; ENCODED_L1_COMMITMENT_SIZE]; + buf[0..4].copy_from_slice(&commitment.height().to_be_bytes()); + buf[4..36].copy_from_slice(commitment.blkid().as_ref()); + buf +} + +/// Decodes a 36-byte buffer back into an [`L1BlockCommitment`]. +pub(crate) fn decode_block_commitment(buf: &[u8]) -> L1BlockCommitment { + let height = u32::from_be_bytes(buf[0..4].try_into().expect("key is at least 4 bytes")); + let blkid: [u8; 32] = buf[4..36].try_into().expect("key is at least 36 bytes"); + L1BlockCommitment::new(height, L1BlockId::from(Buf32::from(blkid))) +} + +/// Alias: encodes a Moho key (same as a single block commitment). +pub(crate) fn encode_moho_key(l1ref: &L1BlockCommitment) -> [u8; ENCODED_L1_COMMITMENT_SIZE] { + encode_block_commitment(l1ref) +} + +/// Alias: decodes a Moho key (same as a single block commitment). +pub(crate) fn decode_moho_key(key: &[u8]) -> L1BlockCommitment { + decode_block_commitment(key) +} + +#[cfg(test)] +pub(crate) mod test_util { + use proptest::prelude::*; + use strata_identifiers::{Buf32, L1BlockCommitment, L1BlockId}; + + /// Generates an arbitrary L1BlockCommitment. + /// Heights must be < 500_000_000 (bitcoin LOCK_TIME_THRESHOLD). + pub(crate) fn arb_l1_block_commitment() -> impl Strategy { + (0u32..500_000_000u32, any::<[u8; 32]>()) + .prop_map(|(h, blkid)| L1BlockCommitment::new(h, L1BlockId::from(Buf32::from(blkid)))) + } +} + +#[cfg(test)] +mod tests { + use proptest::prelude::*; + + use super::test_util::arb_l1_block_commitment; + + proptest! { + #[test] + fn block_commitment_key_roundtrip(commitment in arb_l1_block_commitment()) { + let encoded = super::encode_block_commitment(&commitment); + let decoded = super::decode_block_commitment(&encoded); + prop_assert_eq!(commitment, decoded); + } + + #[test] + fn moho_key_roundtrip(commitment in arb_l1_block_commitment()) { + let encoded = super::encode_moho_key(&commitment); + let decoded = super::decode_moho_key(&encoded); + prop_assert_eq!(commitment, decoded); + } + } +} diff --git a/crates/proof/db/src/sled/moho_state.rs b/crates/extensions/moho/storage/src/sled/moho_state.rs similarity index 74% rename from crates/proof/db/src/sled/moho_state.rs rename to crates/extensions/moho/storage/src/sled/moho_state.rs index d6de7e63..3d7ef539 100644 --- a/crates/proof/db/src/sled/moho_state.rs +++ b/crates/extensions/moho/storage/src/sled/moho_state.rs @@ -4,13 +4,13 @@ use moho_types::MohoState; use ssz::{Decode, Encode}; use strata_identifiers::L1BlockCommitment; -use super::encode_moho_key; +use super::{decode_moho_key, encode_moho_key}; use crate::MohoStateDb; /// Sled-backed store for [`MohoState`] snapshots keyed by [`L1BlockCommitment`]. /// -/// Values are SSZ-encoded; keys use the same big-endian height encoding as the -/// proof trees so lexicographic range scans match block-height ordering. +/// Values are SSZ-encoded; keys use big-endian height encoding so lexicographic +/// range scans match block-height ordering. #[derive(Debug, Clone)] pub struct SledMohoStateDb { moho_states: sled::Tree, @@ -19,19 +19,20 @@ pub struct SledMohoStateDb { impl SledMohoStateDb { /// Opens the Moho-state tree on an already-open sled database. /// - /// Callers open the [`sled::Db`] themselves so multiple handles — e.g. - /// [`super::SledProofDb`] — can share the same on-disk directory; sled - /// does not allow opening the same path twice in a process. + /// Callers open the [`sled::Db`] themselves so multiple handles can share + /// the same on-disk directory; sled does not allow opening the same path + /// twice in a process. pub fn open(db: &sled::Db) -> Result { Ok(Self { moho_states: db.open_tree("moho_states")?, }) } - /// Synchronous variant of [`MohoStateDb::store_moho_state`]. The ASM - /// worker runs on a sync thread (via `ServiceBuilder::launch_sync`) and - /// its genesis-seed path is invoked from an async bootstrap task, where - /// `Handle::block_on` would panic. Calling this directly avoids that. + /// Synchronous variant of [`MohoStateDb::store_moho_state`]. The Moho worker + /// interacts with storage through synchronous traits (`MohoStateStore`), and + /// it runs as an async service where a nested `Handle::block_on` would panic, + /// so the worker calls these sync methods directly rather than the async + /// trait below. pub fn store(&self, l1ref: L1BlockCommitment, state: MohoState) -> Result<(), sled::Error> { self.moho_states .insert(encode_moho_key(&l1ref), state.as_ssz_bytes())?; @@ -46,6 +47,21 @@ impl SledMohoStateDb { .map(|v| MohoState::from_ssz_bytes(&v).expect("stored state should be valid SSZ"))) } + /// Returns the highest-height stored Moho state and the block it is anchored + /// to, or `None` when the store is empty. + /// + /// Keys are big-endian `[height‖blkid]`, so the last entry is the + /// highest-height one (ties broken by block id). The Moho worker uses this to + /// resume from its latest committed state across restarts. + pub fn get_latest(&self) -> Result, sled::Error> { + let Some((key, value)) = self.moho_states.last()? else { + return Ok(None); + }; + let commitment = decode_moho_key(&key); + let state = MohoState::from_ssz_bytes(&value).expect("stored state should be valid SSZ"); + Ok(Some((commitment, state))) + } + /// Synchronous variant of [`MohoStateDb::prune`]. See [`Self::store`]. pub fn prune_before(&self, before_height: u32) -> Result<(), sled::Error> { let upper: &[u8] = &before_height.to_be_bytes(); @@ -112,6 +128,36 @@ mod tests { }) } + fn moho_state(inner: u8) -> MohoState { + MohoState::new( + InnerStateCommitment::from([inner; 32]), + PredicateKey::always_accept(), + ExportState::new(vec![]).unwrap(), + ) + } + + #[test] + fn get_latest_on_empty_returns_none() { + let (db, _dir) = temp_moho_db(); + assert!(db.get_latest().unwrap().is_none()); + } + + #[test] + fn get_latest_returns_highest_height() { + let (db, _dir) = temp_moho_db(); + let low = L1BlockCommitment::new(7, L1BlockId::from(Buf32::from([0x11; 32]))); + let high = L1BlockCommitment::new(42, L1BlockId::from(Buf32::from([0x22; 32]))); + + // Store out of height order to prove ordering comes from the key, not + // insertion order. + db.store(high, moho_state(0xbb)).unwrap(); + db.store(low, moho_state(0xaa)).unwrap(); + + let (blk, state) = db.get_latest().unwrap().unwrap(); + assert_eq!(blk, high); + assert_eq!(state, moho_state(0xbb)); + } + proptest! { #![proptest_config(ProptestConfig::with_cases(50))] diff --git a/crates/extensions/moho/worker/Cargo.toml b/crates/extensions/moho/worker/Cargo.toml new file mode 100644 index 00000000..26c990a9 --- /dev/null +++ b/crates/extensions/moho/worker/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "strata-asm-moho-worker" +version = "0.1.0" +edition = "2024" + +[lints] +workspace = true + +[dependencies] +strata-asm-common.workspace = true +strata-asm-logs.workspace = true +strata-asm-proof-impl.workspace = true +strata-asm-worker.workspace = true +strata-identifiers.workspace = true +strata-predicate.workspace = true +strata-service.workspace = true +strata-tasks.workspace = true + +moho-runtime-interface.workspace = true +moho-types = { workspace = true, features = ["serde"] } + +anyhow.workspace = true +serde.workspace = true +thiserror.workspace = true +tracing.workspace = true + +[dev-dependencies] +strata-asm-params = { workspace = true, features = ["arbitrary"] } +strata-asm-spec.workspace = true +strata-test-utils-arb.workspace = true diff --git a/crates/extensions/moho/worker/src/builder.rs b/crates/extensions/moho/worker/src/builder.rs new file mode 100644 index 00000000..1ea3078f --- /dev/null +++ b/crates/extensions/moho/worker/src/builder.rs @@ -0,0 +1,107 @@ +//! Builder for constructing and launching the Moho worker service. + +use strata_asm_worker::Subscription; +use strata_identifiers::L1BlockCommitment; +use strata_predicate::PredicateKey; +use strata_service::{ServiceBuilder, StreamInput}; +use strata_tasks::TaskExecutor; + +use crate::{ + MohoWorkerContext, MohoWorkerHandle, constants, errors::MohoWorkerError, + service::MohoWorkerService, state::MohoWorkerServiceState, +}; + +/// Builder for launching a Moho worker driven by the ASM worker's per-block +/// subscription. +/// +/// Wire it with the storage context, the subscription handed out by +/// [`AsmWorkerHandle::subscribe_blocks`](strata_asm_worker::AsmWorkerHandle::subscribe_blocks), +/// the genesis block, and the ASM predicate that seeds the genesis Moho state. +/// +/// Subscribe *before* the ASM worker begins committing blocks: the subscription +/// has no replay, so the worker must be wired in while the stream still starts +/// at the genesis successor. +#[derive(Debug)] +pub struct MohoWorkerBuilder { + context: Option, + subscription: Option>, + genesis_block: Option, + asm_predicate: Option, +} + +impl MohoWorkerBuilder { + /// Create a new builder instance. + pub fn new() -> Self { + Self { + context: None, + subscription: None, + genesis_block: None, + asm_predicate: None, + } + } + + /// Set the storage context (implements [`MohoWorkerContext`]). + pub fn with_context(mut self, context: W) -> Self { + self.context = Some(context); + self + } + + /// Set the ASM commit subscription driving the worker. + pub fn with_subscription(mut self, subscription: Subscription) -> Self { + self.subscription = Some(subscription); + self + } + + /// Set the genesis block whose ASM anchor state seeds the genesis Moho state. + pub fn with_genesis_block(mut self, genesis_block: L1BlockCommitment) -> Self { + self.genesis_block = Some(genesis_block); + self + } + + /// Set the ASM predicate carried by the genesis Moho state. + pub fn with_asm_predicate(mut self, asm_predicate: PredicateKey) -> Self { + self.asm_predicate = Some(asm_predicate); + self + } + + /// Launch the Moho worker service and return a handle to it. + /// + /// Validates dependencies, seeds or resumes the service state, adapts the + /// subscription into a stream input, and spawns the async worker. + pub async fn launch(self, executor: &TaskExecutor) -> anyhow::Result + where + W: MohoWorkerContext + Send + Sync + 'static, + { + let context = self + .context + .ok_or(MohoWorkerError::MissingDependency("context"))?; + let subscription = self + .subscription + .ok_or(MohoWorkerError::MissingDependency("subscription"))?; + let genesis_block = self + .genesis_block + .ok_or(MohoWorkerError::MissingDependency("genesis_block"))?; + let asm_predicate = self + .asm_predicate + .ok_or(MohoWorkerError::MissingDependency("asm_predicate"))?; + + // Seed or resume synchronously before launch, mirroring the ASM worker: + // the genesis Moho state must exist before the first commit is folded. + let state = MohoWorkerServiceState::new(context, genesis_block, asm_predicate)?; + + let input = StreamInput::new(subscription); + let monitor = ServiceBuilder::, _>::new() + .with_state(state) + .with_input(input) + .launch_async(constants::SERVICE_NAME, executor) + .await?; + + Ok(MohoWorkerHandle::new(monitor)) + } +} + +impl Default for MohoWorkerBuilder { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/extensions/moho/worker/src/compute.rs b/crates/extensions/moho/worker/src/compute.rs new file mode 100644 index 00000000..c8091653 --- /dev/null +++ b/crates/extensions/moho/worker/src/compute.rs @@ -0,0 +1,56 @@ +//! Derivation of [`MohoState`] from committed ASM anchor states. +//! +//! The Moho state is a thin projection of the ASM anchor state: the inner +//! commitment is the tree hash of the anchor state, while the predicate and +//! export state are advanced by replaying the STF logs the ASM worker recorded +//! for the block. Neither requires re-running the STF — everything needed lives +//! in the committed [`AnchorState`] and its [`AsmLogEntry`]s. + +use moho_runtime_interface::MohoProgram; +use moho_types::{ExportState, MohoState}; +use strata_asm_common::{AnchorState, AsmLogEntry}; +use strata_asm_logs::NewExportEntry; +use strata_asm_proof_impl::moho_program::program::{ + AsmStfProgram, advance_export_state_with_logs, extract_next_predicate_from_logs, +}; +use strata_predicate::PredicateKey; + +/// Seeds the genesis [`MohoState`]: there is no prior state to chain forward +/// from, so we pair the genesis anchor commitment with the configured +/// `asm_predicate` and an empty export state. +pub(crate) fn construct_genesis_moho_state( + asm_predicate: PredicateKey, + genesis: &AnchorState, +) -> MohoState { + let inner = AsmStfProgram::compute_state_commitment(genesis); + let export_state = ExportState::new(vec![]).expect("empty export state is always valid"); + MohoState::new(inner, asm_predicate, export_state) +} + +/// Chains the [`MohoState`] forward from its parent: the STF logs drive the +/// predicate and export-state updates, and the inner commitment is recomputed +/// from the new anchor state. +pub(crate) fn construct_next_moho_state( + prev: &MohoState, + anchor_state: &AnchorState, + logs: &[AsmLogEntry], +) -> MohoState { + let next_predicate = + extract_next_predicate_from_logs(logs).unwrap_or_else(|| prev.next_predicate().clone()); + let next_export_state = advance_export_state_with_logs(prev.export_state().clone(), logs); + let inner = AsmStfProgram::compute_state_commitment(anchor_state); + MohoState::new(inner, next_predicate, next_export_state) +} + +/// Extracts the `(container_id, entry)` leaves a block's [`NewExportEntry`] logs +/// append to the `ExportState` MMR, in log order. +/// +/// These are the same leaves [`advance_export_state_with_logs`] folds into the +/// state's compact per-container MMR; the worker persists them so the RPC can +/// rebuild inclusion proofs the compact MMR no longer carries. +pub(crate) fn export_entries_from_logs(logs: &[AsmLogEntry]) -> Vec<(u8, [u8; 32])> { + logs.iter() + .filter_map(|log| log.try_into_log::().ok()) + .map(|entry| (entry.container_id(), *entry.entry_data())) + .collect() +} diff --git a/crates/extensions/moho/worker/src/constants.rs b/crates/extensions/moho/worker/src/constants.rs new file mode 100644 index 00000000..3a8ef894 --- /dev/null +++ b/crates/extensions/moho/worker/src/constants.rs @@ -0,0 +1,4 @@ +//! Constants for the Moho worker. + +/// Service identifier for the Moho worker. +pub(crate) const SERVICE_NAME: &str = "moho_worker"; diff --git a/crates/extensions/moho/worker/src/errors.rs b/crates/extensions/moho/worker/src/errors.rs new file mode 100644 index 00000000..f393364b --- /dev/null +++ b/crates/extensions/moho/worker/src/errors.rs @@ -0,0 +1,38 @@ +use strata_identifiers::L1BlockCommitment; +use thiserror::Error; + +/// Return type for Moho worker operations. +pub type MohoWorkerResult = Result; + +#[derive(Debug, Error)] +pub enum MohoWorkerError { + /// The ASM anchor state the Moho state derives from was not found. The ASM + /// worker commits the anchor state before emitting its block notification, + /// so a miss here means the ASM and Moho stores are out of sync. + #[error("missing ASM anchor state for block {0:?}")] + MissingAsmState(L1BlockCommitment), + + /// The Moho state for a block was not found in the store. Hit when + /// resolving the parent of an incoming commit: the fold chains forward from + /// the parent's committed Moho state, so the parent must already be present. + /// With commits arriving in order from the ASM worker, a miss means the + /// parent's commit was never folded — a gap the worker cannot bridge alone. + // TODO(STR-3124): backfill the gap by replaying the intervening anchor + // states instead of erroring out, once the worker resumes from its own + // store on restart. + #[error("missing Moho state for block {0:?}")] + MissingMohoState(L1BlockCommitment), + + /// The parent of an L1 block commitment could not be resolved — e.g. the L1 + /// block or its header was unavailable from the provider. + #[error("could not resolve parent of L1 block {0:?}")] + MissingParentBlock(L1BlockCommitment), + + /// The underlying Moho-state store failed. Carries the backend's display so + /// the operator sees the real cause without us bucketing it. + #[error("moho state store: {0}")] + Storage(String), + + #[error("missing required dependency: {0}")] + MissingDependency(&'static str), +} diff --git a/crates/extensions/moho/worker/src/handle.rs b/crates/extensions/moho/worker/src/handle.rs new file mode 100644 index 00000000..66da5bb8 --- /dev/null +++ b/crates/extensions/moho/worker/src/handle.rs @@ -0,0 +1,25 @@ +//! Handle for interacting with the Moho worker service. + +use strata_service::ServiceMonitor; + +use crate::MohoWorkerStatus; + +/// Handle for observing the Moho worker service. +/// +/// The worker is purely subscription-driven — it takes no commands — so the +/// handle only exposes status monitoring. +#[derive(Debug)] +pub struct MohoWorkerHandle { + monitor: ServiceMonitor, +} + +impl MohoWorkerHandle { + pub(crate) fn new(monitor: ServiceMonitor) -> Self { + Self { monitor } + } + + /// Allows other services to listen to status updates. + pub fn monitor(&self) -> &ServiceMonitor { + &self.monitor + } +} diff --git a/crates/extensions/moho/worker/src/lib.rs b/crates/extensions/moho/worker/src/lib.rs new file mode 100644 index 00000000..1e833698 --- /dev/null +++ b/crates/extensions/moho/worker/src/lib.rs @@ -0,0 +1,38 @@ +//! # strata-asm-moho-worker +//! +//! A subscription-driven worker that materializes per-block +//! [`MohoState`](moho_types::MohoState) from the Strata ASM. +//! +//! The worker subscribes to the ASM worker's per-block commit stream +//! ([`Subscription`](strata_asm_worker::Subscription)) and, +//! for each committed block, derives the Moho state from the ASM anchor state +//! the ASM worker already persisted, chained onto the Moho state of the block's +//! parent, then stores it — together with the per-container export-entry leaves +//! the state's `ExportState` MMR commits to. It runs no chain view of its own: +//! it folds each commit onto its resolved parent, so it follows L1 reorgs rather +//! than assuming the commits arrive in unbroken height order. +//! +//! Storage is supplied by the caller through [`MohoWorkerContext`] — read access +//! to ASM anchor states ([`AsmStateProvider`]), L1 block ancestry +//! ([`L1ProviderContext`]), persistence for the derived Moho states +//! ([`MohoStateStore`]), and persistence for the export-entry leaves +//! ([`ExportEntryStore`]) — mirroring how `strata-asm-worker` takes a +//! [`WorkerContext`](strata_asm_worker::WorkerContext). + +mod builder; +mod compute; +mod constants; +mod errors; +mod handle; +mod service; +mod state; +mod traits; + +pub use builder::MohoWorkerBuilder; +pub use errors::{MohoWorkerError, MohoWorkerResult}; +pub use handle::MohoWorkerHandle; +pub use service::{MohoWorkerService, MohoWorkerStatus}; +pub use state::MohoWorkerServiceState; +pub use traits::{ + AsmStateProvider, ExportEntryStore, L1ProviderContext, MohoStateStore, MohoWorkerContext, +}; diff --git a/crates/extensions/moho/worker/src/service.rs b/crates/extensions/moho/worker/src/service.rs new file mode 100644 index 00000000..cbf9a5d8 --- /dev/null +++ b/crates/extensions/moho/worker/src/service.rs @@ -0,0 +1,111 @@ +//! Service-framework integration for the Moho worker. +//! +//! The worker is an [`AsyncService`] driven by the ASM worker's per-block +//! subscription (a [`Subscription`](strata_asm_worker::Subscription) +//! adapted into a [`StreamInput`](strata_service::StreamInput)). Each emitted +//! commitment is folded into a new [`MohoState`](moho_types::MohoState) and +//! persisted. + +use std::marker::PhantomData; + +use moho_types::MohoState; +use serde::{Deserialize, Serialize}; +use strata_identifiers::L1BlockCommitment; +use strata_service::{AsyncService, Response, Service}; +use tracing::info; + +use crate::{MohoWorkerContext, MohoWorkerResult, MohoWorkerServiceState, compute}; + +/// Moho worker service implementation using the service framework. +#[derive(Debug)] +pub struct MohoWorkerService { + _phantom: PhantomData, +} + +impl Service for MohoWorkerService +where + W: MohoWorkerContext + Send + Sync + 'static, +{ + type State = MohoWorkerServiceState; + type Msg = L1BlockCommitment; + type Status = MohoWorkerStatus; + + fn get_status(state: &Self::State) -> Self::Status { + MohoWorkerStatus { + is_initialized: true, + cur_block: Some(state.cur_block()), + cur_state: Some(state.cur_moho().clone()), + } + } +} + +impl AsyncService for MohoWorkerService +where + W: MohoWorkerContext + Send + Sync + 'static, +{ + async fn process_input( + state: &mut Self::State, + input: L1BlockCommitment, + ) -> anyhow::Result { + // The store is synchronous (sled), so the fold runs to completion + // without yielding. A processing error exits the worker — the commit + // stream cannot be skipped without leaving a gap. + process_block(state, input)?; + Ok(Response::Continue) + } +} + +/// Folds a single ASM commit into a new [`MohoState`] and persists it, along +/// with the export-entry leaves its `ExportState` MMR commits to. +/// +/// Resolves the commit's parent and chains the Moho state forward onto this +/// block's anchor state and logs. The parent's Moho state comes from the +/// in-memory [`cur_moho`](MohoWorkerServiceState::cur_moho) when the commit +/// builds on the block already held — the in-order common case; otherwise (an L1 +/// reorg) it is re-anchored from the parent's committed state in the store. +/// Resolving the real parent rather than assuming height contiguity is what lets +/// the worker follow reorgs. +pub(crate) fn process_block( + state: &mut MohoWorkerServiceState, + block: L1BlockCommitment, +) -> MohoWorkerResult<()> { + let parent = state.context.get_parent_block(&block)?; + + let parent_moho = if state.cur_block() == parent { + state.cur_moho().clone() + } else { + state.context.get_moho_state(&parent)? + }; + + let anchor_state = state.context.get_anchor_state(&block)?; + let logs = state.context.get_anchor_logs(&block)?; + let moho = compute::construct_next_moho_state(&parent_moho, &anchor_state, &logs); + + // Persist the export-entry leaves before the Moho state. The worker tracks + // progress via the Moho store (`get_latest_moho_state`), so `store_moho_state` + // is this block's commit point: a crash before it leaves progress unadvanced + // and the block is reprocessed on restart, re-appending the same + // (idempotent) leaves. Writing them after the commit point would risk a gap + // between the leaves and the `ExportState` MMR that commits to them. + // TODO(STR-3723): unlike manifest MMR we need to handle the reorg differently since there might + // be multiple ExportEntry in a single block. + for (container_id, entry) in compute::export_entries_from_logs(&logs) { + state + .context + .append_export_entry(container_id, block.height(), entry)?; + } + state.context.store_moho_state(&block, &moho)?; + + state.update_moho_state(moho, block); + + info!(%block, %parent, "committed Moho state"); + Ok(()) +} + +/// Status information for the Moho worker service. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct MohoWorkerStatus { + pub is_initialized: bool, + pub cur_block: Option, + pub cur_state: Option, +} diff --git a/crates/extensions/moho/worker/src/state.rs b/crates/extensions/moho/worker/src/state.rs new file mode 100644 index 00000000..d368a506 --- /dev/null +++ b/crates/extensions/moho/worker/src/state.rs @@ -0,0 +1,372 @@ +//! Service state for the Moho worker. + +use moho_types::MohoState; +use strata_identifiers::L1BlockCommitment; +use strata_predicate::PredicateKey; +use strata_service::ServiceState; +use tracing::info; + +use crate::{MohoWorkerContext, MohoWorkerResult, compute, constants}; + +/// In-memory state for the Moho worker. +/// +/// Holds the most recently folded [`MohoState`] and the block it is anchored to. +/// Each ASM commit is folded onto its parent's Moho state: in the common +/// in-order case the parent is the block already held here, so the fold reads +/// straight from memory; on an L1 reorg the incoming commit builds on a +/// different block, so the orchestration re-anchors from the parent's committed +/// state in the store. It keeps no chain view of its own. +/// +/// Mirrors `strata-asm-worker`'s `AsmWorkerServiceState`, which likewise holds +/// the current `AsmState` in memory and re-anchors on reorg. The fold +/// orchestration lives in the service layer's `process_block`; this type just +/// holds the data and the small `update_moho_state` mutation that advances it. +#[derive(Debug)] +pub struct MohoWorkerServiceState { + /// Context for reading ASM anchor states, resolving parents, and persisting + /// Moho states. + pub(crate) context: W, + + /// The most recently folded (or genesis-seeded) Moho state. The fold chains + /// directly onto this when the next commit builds on `cur_block`. + cur_moho: MohoState, + + /// The L1 block `cur_moho` is anchored to. + cur_block: L1BlockCommitment, +} + +impl MohoWorkerServiceState { + /// Creates the service state, resuming from the latest stored Moho state or + /// seeding the genesis entry when the store is empty. + /// + /// Genesis is seeded from the ASM anchor state already committed for + /// `genesis_block`; `asm_predicate` becomes the genesis Moho predicate. + pub(crate) fn new( + context: W, + genesis_block: L1BlockCommitment, + asm_predicate: PredicateKey, + ) -> MohoWorkerResult { + let (cur_block, cur_moho) = match context.get_latest_moho_state()? { + Some((blk, moho)) => { + info!(%blk, "resuming Moho worker from stored state"); + (blk, moho) + } + None => { + let genesis_anchor = context.get_anchor_state(&genesis_block)?; + let moho = compute::construct_genesis_moho_state(asm_predicate, &genesis_anchor); + context.store_moho_state(&genesis_block, &moho)?; + info!(%genesis_block, "seeded genesis Moho state"); + (genesis_block, moho) + } + }; + + Ok(Self { + context, + cur_moho, + cur_block, + }) + } + + /// The block the worker has most recently committed a Moho state for. + pub fn cur_block(&self) -> L1BlockCommitment { + self.cur_block + } + + /// The most recently folded (or genesis-seeded) Moho state. + pub fn cur_moho(&self) -> &MohoState { + &self.cur_moho + } + + /// Advances the in-memory state to `moho` at `blk` after a successful fold. + /// Mirrors `strata-asm-worker`'s `update_anchor_state`. + pub(crate) fn update_moho_state(&mut self, moho: MohoState, blk: L1BlockCommitment) { + self.cur_moho = moho; + self.cur_block = blk; + } +} + +impl ServiceState for MohoWorkerServiceState { + fn name(&self) -> &str { + constants::SERVICE_NAME + } +} + +#[cfg(test)] +mod tests { + use std::{cell::RefCell, collections::HashMap}; + + use moho_runtime_interface::MohoProgram; + use strata_asm_common::{AnchorState, AsmLogEntry}; + use strata_asm_params::AsmParams; + use strata_asm_proof_impl::moho_program::program::AsmStfProgram; + use strata_asm_spec::construct_genesis_state; + use strata_identifiers::{Buf32, L1BlockCommitment, L1BlockId}; + use strata_predicate::PredicateKey; + use strata_test_utils_arb::ArbitraryGenerator; + + use super::*; + use crate::{ + AsmStateProvider, ExportEntryStore, L1ProviderContext, MohoStateStore, MohoWorkerError, + service::process_block, + }; + + /// In-memory context backing the four concern traits. + #[derive(Debug, Default)] + struct MockContext { + anchors: RefCell>, + logs: RefCell>>, + parents: RefCell>, + moho: RefCell>, + latest: RefCell>, + export_entries: RefCell>, + } + + impl MockContext { + fn insert_anchor(&self, blk: L1BlockCommitment, state: AnchorState) { + self.anchors.borrow_mut().insert(blk, state); + } + + /// Registers `parent` as the parent of `blk` for parent resolution. + fn link_parent(&self, blk: L1BlockCommitment, parent: L1BlockCommitment) { + self.parents.borrow_mut().insert(blk, parent); + } + } + + impl AsmStateProvider for MockContext { + fn get_anchor_state(&self, blockid: &L1BlockCommitment) -> MohoWorkerResult { + self.anchors + .borrow() + .get(blockid) + .cloned() + .ok_or(MohoWorkerError::MissingAsmState(*blockid)) + } + + fn get_anchor_logs( + &self, + blockid: &L1BlockCommitment, + ) -> MohoWorkerResult> { + Ok(self.logs.borrow().get(blockid).cloned().unwrap_or_default()) + } + } + + impl L1ProviderContext for MockContext { + fn get_parent_block( + &self, + block: &L1BlockCommitment, + ) -> MohoWorkerResult { + self.parents + .borrow() + .get(block) + .copied() + .ok_or(MohoWorkerError::MissingParentBlock(*block)) + } + } + + impl MohoStateStore for MockContext { + fn get_latest_moho_state( + &self, + ) -> MohoWorkerResult> { + Ok(self.latest.borrow().clone()) + } + + fn get_moho_state(&self, blockid: &L1BlockCommitment) -> MohoWorkerResult { + self.moho + .borrow() + .get(blockid) + .cloned() + .ok_or(MohoWorkerError::MissingMohoState(*blockid)) + } + + fn store_moho_state( + &self, + blockid: &L1BlockCommitment, + state: &MohoState, + ) -> MohoWorkerResult<()> { + self.moho.borrow_mut().insert(*blockid, state.clone()); + let mut latest = self.latest.borrow_mut(); + if latest + .as_ref() + .is_none_or(|(b, _)| blockid.height() >= b.height()) + { + *latest = Some((*blockid, state.clone())); + } + Ok(()) + } + } + + impl ExportEntryStore for MockContext { + fn append_export_entry( + &self, + container_id: u8, + height: u32, + entry: [u8; 32], + ) -> MohoWorkerResult<()> { + self.export_entries + .borrow_mut() + .push((container_id, height, entry)); + Ok(()) + } + } + + /// Builds a genesis anchor state and its commitment from arbitrary params. + fn genesis_anchor() -> (L1BlockCommitment, AnchorState) { + let params: AsmParams = ArbitraryGenerator::new().generate(); + let anchor = construct_genesis_state(¶ms); + let commitment = anchor.chain_view.pow_state.last_verified_block; + (commitment, anchor) + } + + /// Reuses `anchor` as the next block's anchor state. The fold does not + /// validate the anchor against the block, so reusing it is fine for + /// exercising the chaining logic. + fn child(anchor: &AnchorState) -> AnchorState { + anchor.clone() + } + + /// A commitment one height above `prev`, with a caller-chosen id so that + /// sibling blocks at the same height — a reorg — stay distinguishable. + fn commitment_after_with_id(prev: L1BlockCommitment, id: u8) -> L1BlockCommitment { + L1BlockCommitment::new(prev.height() + 1, L1BlockId::from(Buf32::from([id; 32]))) + } + + fn commitment_after(prev: L1BlockCommitment) -> L1BlockCommitment { + commitment_after_with_id(prev, 0) + } + + #[test] + fn seeds_genesis_when_store_empty() { + let (genesis_blk, anchor) = genesis_anchor(); + let ctx = MockContext::default(); + ctx.insert_anchor(genesis_blk, anchor.clone()); + + let state = + MohoWorkerServiceState::new(ctx, genesis_blk, PredicateKey::always_accept()).unwrap(); + + assert_eq!(state.cur_block(), genesis_blk); + // Genesis moho was persisted and its inner commitment matches the anchor. + let stored = state + .context + .moho + .borrow() + .get(&genesis_blk) + .cloned() + .unwrap(); + assert_eq!( + stored.inner_state(), + AsmStfProgram::compute_state_commitment(&anchor) + ); + } + + #[test] + fn resumes_from_latest_without_reseeding_genesis() { + let (genesis_blk, anchor) = genesis_anchor(); + let ctx = MockContext::default(); + ctx.insert_anchor(genesis_blk, anchor.clone()); + + // Pre-populate a "later" stored moho state to resume from. + let later_blk = commitment_after(genesis_blk); + let later_moho = + compute::construct_genesis_moho_state(PredicateKey::always_accept(), &anchor); + ctx.store_moho_state(&later_blk, &later_moho).unwrap(); + + let state = + MohoWorkerServiceState::new(ctx, genesis_blk, PredicateKey::always_accept()).unwrap(); + + assert_eq!(state.cur_block(), later_blk); + } + + #[test] + fn folds_contiguous_commits_forward() { + let (genesis_blk, anchor) = genesis_anchor(); + let ctx = MockContext::default(); + ctx.insert_anchor(genesis_blk, anchor.clone()); + + let blk1 = commitment_after(genesis_blk); + let blk2 = commitment_after(blk1); + ctx.insert_anchor(blk1, child(&anchor)); + ctx.insert_anchor(blk2, child(&anchor)); + ctx.link_parent(blk1, genesis_blk); + ctx.link_parent(blk2, blk1); + + let mut state = + MohoWorkerServiceState::new(ctx, genesis_blk, PredicateKey::always_accept()).unwrap(); + + process_block(&mut state, blk1).unwrap(); + process_block(&mut state, blk2).unwrap(); + + assert_eq!(state.cur_block(), blk2); + assert!(state.context.moho.borrow().contains_key(&blk1)); + assert!(state.context.moho.borrow().contains_key(&blk2)); + } + + #[test] + fn folds_reorged_sibling_from_shared_parent() { + // Two siblings at the same height both build on genesis (a reorg). Each + // must fold from genesis's Moho state; the old height-successor logic + // would have dropped the second as a "stale" same-height commit. + let (genesis_blk, anchor) = genesis_anchor(); + let ctx = MockContext::default(); + ctx.insert_anchor(genesis_blk, anchor.clone()); + + let blk_a = commitment_after_with_id(genesis_blk, 0xaa); + let blk_b = commitment_after_with_id(genesis_blk, 0xbb); + ctx.insert_anchor(blk_a, child(&anchor)); + ctx.insert_anchor(blk_b, child(&anchor)); + ctx.link_parent(blk_a, genesis_blk); + ctx.link_parent(blk_b, genesis_blk); + + let mut state = + MohoWorkerServiceState::new(ctx, genesis_blk, PredicateKey::always_accept()).unwrap(); + + process_block(&mut state, blk_a).unwrap(); + // blk_b's parent (genesis) is no longer the in-memory cur_block (blk_a), + // so this exercises the store re-anchor path, not the fast path. + process_block(&mut state, blk_b).unwrap(); + + let moho = state.context.moho.borrow(); + // The second sibling was folded, not ignored. + assert!(moho.contains_key(&blk_a)); + assert!(moho.contains_key(&blk_b)); + // Both fold from the shared genesis state onto the same anchor, so their + // inner commitments match. + let inner = AsmStfProgram::compute_state_commitment(&anchor); + assert_eq!(moho.get(&blk_a).unwrap().inner_state(), inner); + assert_eq!(moho.get(&blk_b).unwrap().inner_state(), inner); + } + + #[test] + fn errors_when_parent_moho_missing() { + let (genesis_blk, anchor) = genesis_anchor(); + let ctx = MockContext::default(); + ctx.insert_anchor(genesis_blk, anchor.clone()); + + // `orphan`'s parent was never committed, so its Moho state is absent. + let missing_parent = commitment_after(genesis_blk); + let orphan = commitment_after(missing_parent); + ctx.insert_anchor(orphan, child(&anchor)); + ctx.link_parent(orphan, missing_parent); + + let mut state = + MohoWorkerServiceState::new(ctx, genesis_blk, PredicateKey::always_accept()).unwrap(); + + let err = process_block(&mut state, orphan).unwrap_err(); + assert!(matches!(err, MohoWorkerError::MissingMohoState(_))); + } + + #[test] + fn errors_when_parent_unresolvable() { + let (genesis_blk, anchor) = genesis_anchor(); + let ctx = MockContext::default(); + ctx.insert_anchor(genesis_blk, anchor.clone()); + + // No parent link registered, so the provider cannot resolve the parent. + let blk = commitment_after(genesis_blk); + ctx.insert_anchor(blk, child(&anchor)); + + let mut state = + MohoWorkerServiceState::new(ctx, genesis_blk, PredicateKey::always_accept()).unwrap(); + + let err = process_block(&mut state, blk).unwrap_err(); + assert!(matches!(err, MohoWorkerError::MissingParentBlock(_))); + } +} diff --git a/crates/extensions/moho/worker/src/traits.rs b/crates/extensions/moho/worker/src/traits.rs new file mode 100644 index 00000000..69dcdbf1 --- /dev/null +++ b/crates/extensions/moho/worker/src/traits.rs @@ -0,0 +1,110 @@ +//! Storage traits the Moho worker interfaces through. +//! +//! The worker derives each [`MohoState`] from the ASM anchor state the ASM +//! worker already committed, chaining it onto the Moho state of the block's +//! parent, then persists it. Those concerns are split into separate traits so +//! an implementor can back them with whatever subsystem it likes: +//! +//! - [`AsmStateProvider`] — reads the [`AnchorState`] and [`AsmLogEntry`]s the Moho state is +//! computed from. +//! - [`L1ProviderContext`] — resolves the parent of an L1 block commitment, so the fold can chain +//! onto the parent's Moho state across reorgs. +//! - [`MohoStateStore`] — persists and loads the derived [`MohoState`]. +//! - [`ExportEntryStore`] — persists the per-container export-entry leaves the state's +//! `ExportState` MMR commits to, so inclusion proofs can be rebuilt later. +//! +//! [`MohoWorkerContext`] is the umbrella with a blanket impl, mirroring +//! `strata-asm-worker`'s [`WorkerContext`](strata_asm_worker::WorkerContext): +//! implement the concern traits and get the context for free. + +use moho_types::MohoState; +use strata_asm_common::{AnchorState, AsmLogEntry}; +use strata_identifiers::L1BlockCommitment; + +use crate::MohoWorkerResult; + +/// Reads the ASM anchor states and logs the Moho worker derives from. +pub trait AsmStateProvider { + /// Fetches the [`AnchorState`] committed by the ASM worker for `blockid`. + /// + /// Errors with [`MissingAsmState`](crate::MohoWorkerError::MissingAsmState) + /// when no anchor state exists for the block. + fn get_anchor_state(&self, blockid: &L1BlockCommitment) -> MohoWorkerResult; + + /// Fetches the [`AsmLogEntry`]s the ASM worker emitted for `blockid`. + /// + /// Committed alongside the anchor state, so this errors with + /// [`MissingAsmState`](crate::MohoWorkerError::MissingAsmState) when the + /// block's ASM commit is absent. An empty vec means the block had no logs. + fn get_anchor_logs(&self, blockid: &L1BlockCommitment) -> MohoWorkerResult>; +} + +/// Resolves L1 block ancestry so the fold can chain onto the parent's state. +pub trait L1ProviderContext { + /// Fetches the parent of `block` — the commitment whose Moho state the fold + /// for `block` chains forward from. + /// + /// Resolving the real parent (rather than assuming the commit is the + /// height-successor of the last one processed) is what lets the worker + /// follow L1 reorgs. Errors with + /// [`MissingParentBlock`](crate::MohoWorkerError::MissingParentBlock) when + /// the parent cannot be resolved. + fn get_parent_block(&self, block: &L1BlockCommitment) -> MohoWorkerResult; +} + +/// Persists and loads the derived per-block [`MohoState`]. +pub trait MohoStateStore { + /// Fetches the most recently committed [`MohoState`] and the block it is + /// anchored to, or `None` if the store is empty. Used to resume across + /// restarts. + fn get_latest_moho_state(&self) -> MohoWorkerResult>; + + /// Fetches the [`MohoState`] committed for `blockid`. + /// + /// Errors with [`MissingMohoState`](crate::MohoWorkerError::MissingMohoState) + /// when no Moho state exists for the block. + fn get_moho_state(&self, blockid: &L1BlockCommitment) -> MohoWorkerResult; + + /// Persists the [`MohoState`] derived for `blockid`. + fn store_moho_state( + &self, + blockid: &L1BlockCommitment, + state: &MohoState, + ) -> MohoWorkerResult<()>; +} + +/// Persists the per-container export-entry leaves the derived state commits to. +/// +/// [`MohoState`] keeps only each container's compact `ExportState` MMR (its +/// peaks), so the original leaves cannot be recovered from it. The worker +/// mirrors them here as it folds each block — from the same `NewExportEntry` +/// logs that advance the MMR — so the RPC can rebuild inclusion proofs. +pub trait ExportEntryStore { + /// Appends one export-entry leaf for `container_id` inserted at `height`. + /// + /// Must be idempotent in `(container_id, entry)`: the worker reprocesses a + /// block whose fold did not reach its commit point, so the same leaf can be + /// appended more than once and must not be duplicated. + fn append_export_entry( + &self, + container_id: u8, + height: u32, + entry: [u8; 32], + ) -> MohoWorkerResult<()>; +} + +/// Context the Moho worker interacts with the outside world through. +/// +/// Umbrella over [`AsmStateProvider`], [`L1ProviderContext`], [`MohoStateStore`] +/// and [`ExportEntryStore`]. The blanket impl means any type implementing all +/// four automatically implements `MohoWorkerContext`, so implementors never name +/// it directly. +pub trait MohoWorkerContext: + AsmStateProvider + L1ProviderContext + MohoStateStore + ExportEntryStore +{ +} + +impl MohoWorkerContext for T where + T: AsmStateProvider + L1ProviderContext + MohoStateStore + ExportEntryStore +{ +} diff --git a/crates/proof/db/Cargo.toml b/crates/proof/db/Cargo.toml index 87decc6d..4162ecc0 100644 --- a/crates/proof/db/Cargo.toml +++ b/crates/proof/db/Cargo.toml @@ -4,18 +4,15 @@ version = "0.1.0" edition = "2024" [dependencies] -moho-types.workspace = true strata-asm-proof-types.workspace = true strata-identifiers.workspace = true borsh.workspace = true sled.workspace = true -ssz.workspace = true zkaleido.workspace = true [dev-dependencies] proptest.workspace = true -strata-predicate.workspace = true tempfile.workspace = true tokio.workspace = true diff --git a/crates/proof/db/src/lib.rs b/crates/proof/db/src/lib.rs index c7906d60..628db58f 100644 --- a/crates/proof/db/src/lib.rs +++ b/crates/proof/db/src/lib.rs @@ -13,27 +13,20 @@ //! - [`RemoteProofStatusDb`] — tracks the execution status of in-flight remote proof jobs until //! their results are retrieved and stored locally. //! -//! A fourth trait, [`MohoStateDb`], persists per-block [`moho_types::MohoState`] -//! snapshots derived by the worker. It is deliberately kept separate from -//! [`ProofDb`] because the underlying data is materialised state, not a proof -//! artifact. -//! -//! Sled-backed implementations are provided: [`SledProofDb`] for proofs and -//! [`SledMohoStateDb`] for Moho-state snapshots. To back both with a single -//! sled directory, open the `sled::Db` yourself and pass it to -//! `SledProofDb::from_db` and `SledMohoStateDb::from_db` — sled does not -//! allow the same path to be opened twice in a process. +//! A sled-backed implementation, [`SledProofDb`], is provided. Per-block +//! `MohoState` snapshots are persisted separately by `strata-asm-moho-storage`; +//! both can share one sled directory by opening the `sled::Db` yourself and +//! passing it to each — sled does not allow the same path to be opened twice in +//! a process. -mod moho_state; mod proof_db; mod remote_mapping; mod remote_status; mod sled; pub use self::{ - moho_state::MohoStateDb, proof_db::ProofDb, remote_mapping::RemoteProofMappingDb, remote_status::RemoteProofStatusDb, - sled::{RemoteProofMappingError, RemoteProofStatusError, SledMohoStateDb, SledProofDb}, + sled::{RemoteProofMappingError, RemoteProofStatusError, SledProofDb}, }; diff --git a/crates/proof/db/src/sled/mod.rs b/crates/proof/db/src/sled/mod.rs index 099c4e2d..18ed4b62 100644 --- a/crates/proof/db/src/sled/mod.rs +++ b/crates/proof/db/src/sled/mod.rs @@ -8,15 +8,11 @@ use strata_asm_proof_types::L1Range; use strata_identifiers::{Buf32, L1BlockCommitment, L1BlockId}; -mod moho_state; mod proof_db; mod remote_mapping; mod remote_status; -pub use self::{ - moho_state::SledMohoStateDb, remote_mapping::RemoteProofMappingError, - remote_status::RemoteProofStatusError, -}; +pub use self::{remote_mapping::RemoteProofMappingError, remote_status::RemoteProofStatusError}; /// Sled-backed proof database. /// @@ -41,9 +37,9 @@ pub struct SledProofDb { impl SledProofDb { /// Opens the proof trees on an already-open sled database. /// - /// Callers open the [`sled::Db`] themselves so multiple handles — e.g. - /// [`SledMohoStateDb`] — can share the same on-disk directory; sled does - /// not allow opening the same path twice in a process. + /// Callers open the [`sled::Db`] themselves so multiple handles — e.g. the + /// `strata-asm-moho-storage` state store — can share the same on-disk + /// directory; sled does not allow opening the same path twice in a process. pub fn open(db: &sled::Db) -> Result { Ok(Self { asm_proofs: db.open_tree("asm_proofs")?, diff --git a/crates/storage/Cargo.toml b/crates/storage/Cargo.toml index 41304de7..a3c607b6 100644 --- a/crates/storage/Cargo.toml +++ b/crates/storage/Cargo.toml @@ -18,5 +18,4 @@ borsh.workspace = true sled.workspace = true [dev-dependencies] -ssz.workspace = true tempfile.workspace = true diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index a8b510fa..a3a95a71 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -3,15 +3,16 @@ //! Replaces alpen's `strata-state`, `strata-storage`, and `strata-db-store-sled` //! with a self-contained implementation that has zero alpen dependencies. //! -//! Three storage backends: +//! Two storage backends: //! - [`AsmStateDb`] — anchor states + aux data, keyed by L1 block commitment //! - [`MmrDb`] — manifest hash MMR (append, prove, query) -//! - [`ExportEntriesDb`] — per-container export entries, indexed for proof generation +//! +//! Per-container export entries moved to `strata-asm-moho-storage`, persisted +//! by the Moho worker alongside the `MohoState` whose `ExportState` MMR they +//! mirror. -mod export_entries; mod mmr; mod state; -pub use export_entries::ExportEntriesDb; pub use mmr::MmrDb; pub use state::AsmStateDb;