diff --git a/Cargo.lock b/Cargo.lock index 11a5b991..b23da6ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7727,23 +7727,6 @@ dependencies = [ "strata-predicate", ] -[[package]] -name = "strata-asm-proof-db" -version = "0.1.0" -dependencies = [ - "borsh", - "moho-types", - "proptest", - "sled", - "ssz", - "strata-asm-proof-types", - "strata-identifiers", - "strata-predicate", - "tempfile", - "tokio", - "zkaleido", -] - [[package]] name = "strata-asm-proof-impl" version = "0.1.0" @@ -7771,16 +7754,6 @@ dependencies = [ "zkaleido-native-adapter", ] -[[package]] -name = "strata-asm-proof-types" -version = "0.1.0" -dependencies = [ - "borsh", - "serde", - "strata-identifiers", - "zkaleido", -] - [[package]] name = "strata-asm-proto-admin" version = "0.1.0" @@ -8025,16 +7998,81 @@ dependencies = [ "zkaleido-sp1-host", ] +[[package]] +name = "strata-asm-prover-storage" +version = "0.1.0" +dependencies = [ + "borsh", + "moho-types", + "proptest", + "sled", + "ssz", + "strata-asm-prover-types", + "strata-identifiers", + "strata-predicate", + "tempfile", + "tokio", + "zkaleido", +] + +[[package]] +name = "strata-asm-prover-types" +version = "0.1.0" +dependencies = [ + "borsh", + "serde", + "strata-identifiers", + "zkaleido", +] + +[[package]] +name = "strata-asm-prover-worker" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "bincode", + "bitcoin", + "hex", + "k256", + "moho-recursive-proof", + "moho-runtime-impl", + "moho-types", + "serde", + "sp1-sdk", + "sp1-verifier", + "ssz", + "strata-asm-common", + "strata-asm-proof-impl", + "strata-asm-prover-storage", + "strata-asm-prover-types", + "strata-asm-worker", + "strata-btc-types", + "strata-btc-verification", + "strata-identifiers", + "strata-merkle", + "strata-predicate", + "strata-tasks", + "thiserror 2.0.18", + "tokio", + "tracing", + "tree_hash", + "zkaleido", + "zkaleido-native-adapter", + "zkaleido-sp1-groth16-verifier", + "zkaleido-sp1-host", +] + [[package]] name = "strata-asm-rpc" version = "0.1.0" dependencies = [ "bitcoin", "jsonrpsee", - "strata-asm-proof-types", "strata-asm-proto-bridge-v1", "strata-asm-proto-bridge-v1-types", "strata-asm-proto-checkpoint-types", + "strata-asm-prover-types", "strata-asm-worker", ] @@ -8045,42 +8083,35 @@ dependencies = [ "anyhow", "asm-storage", "async-trait", - "bincode", "bitcoin", "bitcoincore-zmq", "bitcoind-async-client", "clap", "futures", - "hex", "jsonrpsee", - "k256", - "moho-recursive-proof", - "moho-runtime-impl", "moho-runtime-interface", "moho-types", "serde", "serde_json", "sled", - "sp1-sdk", - "sp1-verifier", "ssz", "strata-asm-common", "strata-asm-logs", "strata-asm-params", - "strata-asm-proof-db", "strata-asm-proof-impl", - "strata-asm-proof-types", "strata-asm-proto-bridge-v1", "strata-asm-proto-bridge-v1-txs", "strata-asm-proto-bridge-v1-types", "strata-asm-proto-checkpoint", "strata-asm-proto-checkpoint-txs", "strata-asm-proto-checkpoint-types", + "strata-asm-prover-storage", + "strata-asm-prover-types", + "strata-asm-prover-worker", "strata-asm-rpc", "strata-asm-spec", "strata-asm-worker", "strata-btc-types", - "strata-btc-verification", "strata-identifiers", "strata-logging", "strata-merkle", @@ -8091,11 +8122,7 @@ dependencies = [ "tokio", "toml 1.1.2+spec-1.1.0", "tracing", - "tree_hash", "zkaleido", - "zkaleido-native-adapter", - "zkaleido-sp1-groth16-verifier", - "zkaleido-sp1-host", ] [[package]] @@ -8155,6 +8182,7 @@ dependencies = [ "bitcoind-async-client", "borsh", "corepc-node", + "futures", "serde", "strata-asm-common", "strata-asm-params", diff --git a/Cargo.toml b/Cargo.toml index fbbe00c0..5862f8c4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,9 +7,10 @@ members = [ "crates/logs", "crates/manifest-types", "crates/params", - "crates/proof/db", + "crates/extensions/prover/storage", + "crates/extensions/prover/types", + "crates/extensions/prover/worker", "crates/proof/statements", - "crates/proof/types", "crates/rpc", "crates/spec", "crates/spec-debug", @@ -62,9 +63,7 @@ strata-asm-common = { path = "crates/common" } strata-asm-logs = { path = "crates/logs" } strata-asm-manifest-types = { path = "crates/manifest-types" } strata-asm-params = { path = "crates/params" } -strata-asm-proof-db = { path = "crates/proof/db" } strata-asm-proof-impl = { path = "crates/proof/statements" } -strata-asm-proof-types = { path = "crates/proof/types" } strata-asm-proto-admin = { path = "crates/subprotocols/admin/subprotocol" } strata-asm-proto-admin-txs = { path = "crates/subprotocols/admin/txs" } strata-asm-proto-bridge-v1 = { path = "crates/subprotocols/bridge-v1/subprotocol" } @@ -77,6 +76,9 @@ strata-asm-proto-checkpoint-txs = { path = "crates/subprotocols/checkpoint/txs" strata-asm-proto-checkpoint-types = { path = "crates/subprotocols/checkpoint/types" } strata-asm-proto-debug-v1 = { path = "crates/subprotocols/debug-v1/subprotocol" } strata-asm-proto-txs-test-utils = { path = "crates/subprotocols/txs-test-utils" } +strata-asm-prover-storage = { path = "crates/extensions/prover/storage" } +strata-asm-prover-types = { path = "crates/extensions/prover/types" } +strata-asm-prover-worker = { path = "crates/extensions/prover/worker" } strata-asm-rpc = { path = "crates/rpc" } strata-asm-spec = { path = "crates/spec" } strata-asm-spec-debug = { path = "crates/spec-debug" } diff --git a/bin/asm-runner/Cargo.toml b/bin/asm-runner/Cargo.toml index dda2c6d0..8c50d973 100644 --- a/bin/asm-runner/Cargo.toml +++ b/bin/asm-runner/Cargo.toml @@ -7,25 +7,23 @@ edition = "2024" workspace = true [dependencies] -moho-recursive-proof.workspace = true -moho-runtime-impl.workspace = true moho-runtime-interface.workspace = true moho-types.workspace = true strata-asm-rpc.workspace = true -strata-btc-verification.workspace = true strata-asm-common.workspace = true strata-asm-logs.workspace = true strata-asm-params.workspace = true -strata-asm-proof-db.workspace = true strata-asm-proof-impl.workspace = true -strata-asm-proof-types.workspace = true strata-asm-proto-bridge-v1.workspace = true strata-asm-proto-bridge-v1-txs.workspace = true strata-asm-proto-bridge-v1-types.workspace = true strata-asm-proto-checkpoint.workspace = true strata-asm-proto-checkpoint-txs.workspace = true strata-asm-proto-checkpoint-types.workspace = true +strata-asm-prover-storage.workspace = true +strata-asm-prover-types.workspace = true +strata-asm-prover-worker.workspace = true strata-asm-spec.workspace = true strata-asm-worker.workspace = true strata-btc-types.workspace = true @@ -37,43 +35,28 @@ strata-identifiers.workspace = true strata-merkle.workspace = true strata-predicate.workspace = true strata-tasks.workspace = true -tree_hash.workspace = true anyhow.workspace = true async-trait.workspace = true -bincode = { workspace = true, optional = true } bitcoin.workspace = true bitcoincore-zmq = { version = "1.5.2", features = ["async"] } bitcoind-async-client.workspace = true clap.workspace = true futures.workspace = true -hex.workspace = true jsonrpsee = { workspace = true, features = ["server", "macros"] } -k256 = { workspace = true, features = ["schnorr"] } serde.workspace = true serde_json.workspace = true sled.workspace = true -sp1-sdk = { workspace = true, optional = true } -sp1-verifier = { workspace = true, optional = true } ssz.workspace = true thiserror.workspace = true tokio.workspace = true toml.workspace = true tracing.workspace = true zkaleido.workspace = true -zkaleido-native-adapter.workspace = true -zkaleido-sp1-groth16-verifier = { workspace = true, optional = true } -zkaleido-sp1-host = { workspace = true, optional = true } [dev-dependencies] tempfile.workspace = true [features] default = [] -sp1 = [ - "dep:zkaleido-sp1-host", - "dep:zkaleido-sp1-groth16-verifier", - "dep:sp1-verifier", - "dep:sp1-sdk", - "dep:bincode", -] +sp1 = ["strata-asm-prover-worker/sp1"] diff --git a/bin/asm-runner/src/block_watcher.rs b/bin/asm-runner/src/block_watcher.rs index 5d9d9c78..827f51fc 100644 --- a/bin/asm-runner/src/block_watcher.rs +++ b/bin/asm-runner/src/block_watcher.rs @@ -15,12 +15,11 @@ use bitcoin::Block; use bitcoincore_zmq::{Message, SocketMessage, subscribe_async_wait_handshake}; use bitcoind_async_client::{Client, traits::Reader}; use futures::StreamExt; -use strata_asm_proof_types::{L1Range, ProofId}; use strata_asm_worker::AsmWorkerHandle; use strata_btc_types::BlockHashExt; use strata_identifiers::L1BlockCommitment; use strata_tasks::ShutdownGuard; -use tokio::{sync::mpsc, time::timeout}; +use tokio::time::timeout; use tracing::{debug, error, info, warn}; use crate::config::BitcoinConfig; @@ -37,7 +36,6 @@ pub(crate) async fn drive_asm_from_bitcoin( bitcoin_client: Arc, asm_worker: Arc, start_height: u64, - proof_tx: Option>, shutdown: ShutdownGuard, ) -> Result<()> { info!(%start_height, "starting ASM block watcher"); @@ -105,7 +103,7 @@ pub(crate) async fn drive_asm_from_bitcoin( for height in cursor..received_height { match fetch_block_at_height(&bitcoin_client, height).await { Ok(fetched) => { - if let Err(err) = submit_block(&asm_worker, &proof_tx, fetched).await { + if let Err(err) = submit_block(&asm_worker, fetched).await { error!(%height, ?err, "failed to submit backfill block"); // Stop backfilling on failure so we don't hand the // worker a gap. The next ZMQ event will retry. @@ -120,7 +118,7 @@ pub(crate) async fn drive_asm_from_bitcoin( } } - if let Err(err) = submit_block(&asm_worker, &proof_tx, block).await { + if let Err(err) = submit_block(&asm_worker, block).await { error!(%received_height, ?err, "failed to submit block from ZMQ"); } cursor = received_height + 1; @@ -140,12 +138,12 @@ async fn fetch_block_at_height(client: &Client, height: u64) -> Result { Ok(block) } -/// Submit a block to the ASM worker and, optionally, enqueue a proof request. -async fn submit_block( - asm_worker: &AsmWorkerHandle, - proof_tx: &Option>, - block: Block, -) -> Result<()> { +/// Submit a block to the ASM worker. +/// +/// Proof requests are not issued here: the prover worker subscribes to the ASM +/// worker's commit stream and derives them from each *committed* block, so they +/// fire only after the block is durably stored. +async fn submit_block(asm_worker: &AsmWorkerHandle, block: Block) -> Result<()> { let height = block.bip34_block_height().unwrap_or(0); let hash = block.block_hash(); let block_id = hash.to_l1_block_id(); @@ -158,16 +156,5 @@ async fn submit_block( debug!(%height, %hash, "submitted block to ASM worker"); - if let Some(tx) = proof_tx { - let asm_proof_id = ProofId::Asm(L1Range::single(commitment)); - if let Err(err) = tx.send(asm_proof_id) { - warn!(%height, %hash, ?err, "failed to enqueue ASM proof request"); - } - let moho_proof_id = ProofId::Moho(commitment); - if let Err(err) = tx.send(moho_proof_id) { - warn!(%height, %hash, ?err, "failed to enqueue Moho proof request"); - } - } - Ok(()) } diff --git a/bin/asm-runner/src/bootstrap.rs b/bin/asm-runner/src/bootstrap.rs index 43c8c5e4..1aa92674 100644 --- a/bin/asm-runner/src/bootstrap.rs +++ b/bin/asm-runner/src/bootstrap.rs @@ -3,20 +3,20 @@ use std::sync::Arc; use anyhow::Result; use bitcoind_async_client::{Auth, Client}; use strata_asm_params::AsmParams; -use strata_asm_proof_db::{SledMohoStateDb, SledProofDb}; +use strata_asm_prover_storage::{SledMohoStateDb, SledProofDb}; +use strata_asm_prover_worker::{InputBuilder, ProofBackend, ProverWorkerBuilder}; use strata_asm_spec::StrataAsmSpec; use strata_asm_worker::AsmWorkerBuilder; use strata_tasks::TaskExecutor; use tokio::{ runtime::{Builder as RuntimeBuilder, Handle}, - sync::mpsc, task::{self, LocalSet}, }; use crate::{ block_watcher::drive_asm_from_bitcoin, config::{AsmRpcConfig, BitcoinConfig}, - prover::{InputBuilder, ProofBackend, ProofOrchestrator}, + prover_context::AsmProverContext, rpc_server::{AsmProofRpcDeps, run_rpc_server}, storage::create_storage, worker_context::{AsmWorkerContext, MohoStorage}, @@ -83,10 +83,7 @@ pub(crate) async fn bootstrap( let asm_worker = Arc::new(asm_worker); // 7. Finish orchestrator wiring if it was configured. - let (proof_tx, proof_rpc_deps) = if let Some((orch_config, proof_db, moho_state_db, backend)) = - orch_prep - { - let (tx, rx) = mpsc::unbounded_channel(); + let proof_rpc_deps = if let Some((orch_config, proof_db, moho_state_db, backend)) = orch_prep { let rpc_deps = AsmProofRpcDeps { proof_db: proof_db.clone(), moho_state_db: moho_state_db.clone(), @@ -100,23 +97,28 @@ pub(crate) async fn bootstrap( moho_predicate, } = backend; - let input_builder = InputBuilder::new( + // The prover context wires the proof store, moho-state store, ASM + // anchor-state store, and Bitcoin client into the worker's traits. + let prover_ctx = AsmProverContext::new( + proof_db, + moho_state_db, state_db.clone(), bitcoin_client.clone(), - proof_db.clone(), - moho_state_db, - params.anchor.block, - asm_predicate, - moho_predicate, - ); - let mut orchestrator = ProofOrchestrator::new( - proof_db, - asm_host, - moho_host, - orch_config, - input_builder, - rx, ); + let input_builder = InputBuilder::new(params.anchor.block, asm_predicate, moho_predicate); + + // Subscribe before the block watcher (spawned below) starts feeding the + // worker. The stream has no replay buffer, but the worker only commits + // blocks the watcher hands it, so subscribing here misses nothing. + let block_subscription = asm_worker.subscribe_blocks(); + + let mut orchestrator = ProverWorkerBuilder::new() + .with_context(prover_ctx) + .with_hosts(asm_host, moho_host) + .with_config(orch_config) + .with_input_builder(input_builder) + .with_block_subscription(block_subscription) + .build()?; // ZkVmRemoteProver is !Send (#[async_trait(?Send)]), so the orchestrator // future cannot be spawned on a multi-threaded runtime directly. We run it @@ -133,9 +135,9 @@ pub(crate) async fn bootstrap( }, ); - (Some(tx), Some(rpc_deps)) + Some(rpc_deps) } else { - (None, None) + None }; // 8. Spawn block watcher as a critical task. @@ -148,7 +150,6 @@ pub(crate) async fn bootstrap( bitcoin_client_for_driver, asm_worker_for_driver, start_height as u64, - proof_tx, shutdown, ) }); diff --git a/bin/asm-runner/src/config.rs b/bin/asm-runner/src/config.rs index 15fa2863..0aa9c94f 100644 --- a/bin/asm-runner/src/config.rs +++ b/bin/asm-runner/src/config.rs @@ -3,8 +3,9 @@ use std::{path::PathBuf, time::Duration}; use serde::{Deserialize, Serialize}; +use strata_asm_prover_worker::OrchestratorConfig; -use crate::{prover::config::OrchestratorConfig, retry::RetryConfig}; +use crate::retry::RetryConfig; /// Main configuration structure #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/bin/asm-runner/src/main.rs b/bin/asm-runner/src/main.rs index f88ddfa8..eaa3194b 100644 --- a/bin/asm-runner/src/main.rs +++ b/bin/asm-runner/src/main.rs @@ -6,7 +6,7 @@ mod block_watcher; mod bootstrap; mod config; -mod prover; +mod prover_context; mod retry; mod rpc_server; mod storage; @@ -21,7 +21,6 @@ use strata_logging::{LoggingInitConfig, finalize, init_logging_from_config}; use strata_tasks::TaskManager; use tokio::runtime::{Builder, Handle}; use tracing::{error, info}; -use zkaleido_native_adapter as _; use crate::{ bootstrap::bootstrap, diff --git a/bin/asm-runner/src/prover/backend.rs b/bin/asm-runner/src/prover/backend.rs deleted file mode 100644 index 97b289c3..00000000 --- a/bin/asm-runner/src/prover/backend.rs +++ /dev/null @@ -1,209 +0,0 @@ -//! ZK proof backend setup for the runner. -//! -//! Bundles the feature-gated selection of the ZK proof backend in one place: -//! host construction (SP1 or native), and derivation of the [`PredicateKey`] -//! that authorizes proofs from each host. The result is exposed as a single -//! [`ProofBackend`] value that the runner builds once at startup and threads -//! into the proof orchestrator and the input builder. - -use std::path::Path; - -use anyhow::{Result, bail}; -use k256::schnorr::SigningKey; -use strata_predicate::{PredicateKey, PredicateTypeId}; -use zkaleido::{ZkVm, ZkVmHost}; -#[cfg(feature = "sp1")] -use { - anyhow::Context, - sp1_sdk::{HashableKey, SP1VerifyingKey}, - sp1_verifier::{GROTH16_VK_BYTES, VK_ROOT_BYTES}, - zkaleido_sp1_groth16_verifier::SP1Groth16Verifier, - zkaleido_sp1_host::SP1Host, -}; - -use crate::prover::config::BackendConfig; - -/// Concrete host type used by the proof orchestrator. -/// -/// Resolves to [`SP1Host`] when the `sp1` feature is enabled, otherwise to -/// the in-process [`zkaleido_native_adapter::NativeHost`]. -#[cfg(feature = "sp1")] -pub(crate) type ProofHost = SP1Host; - -#[cfg(not(feature = "sp1"))] -pub(crate) type ProofHost = zkaleido_native_adapter::NativeHost; - -/// ZK proof backend used by the runner. -/// -/// Bundles the `(asm, moho)` host pair together with the [`PredicateKey`] that -/// each one's proofs verify against. Constructed once at startup via -/// [`ProofBackend::new`] and consumed by the proof orchestrator (hosts) and -/// the input builder (predicates). -#[derive(Debug)] -pub(crate) struct ProofBackend { - pub(crate) asm_host: ProofHost, - pub(crate) moho_host: ProofHost, - pub(crate) asm_predicate: PredicateKey, - pub(crate) moho_predicate: PredicateKey, -} - -impl ProofBackend { - /// Builds the ZK proof backend. - /// - /// Constructs both proof hosts and resolves the [`PredicateKey`] each - /// host's proofs verify against. - /// - /// # Errors - /// - /// - Returns an error if the requested [`BackendConfig`] variant does not match the binary's - /// build features (e.g. `Sp1` requested without the `sp1` feature). - /// - Returns an error if either host cannot be constructed (e.g. a guest ELF cannot be read in - /// `sp1` builds) or if either host's verifying key cannot be turned into a [`PredicateKey`]. - pub(crate) async fn new(cfg: &BackendConfig) -> Result { - let (asm_host, moho_host) = build_proof_hosts(cfg).await?; - let asm_predicate = resolve_predicate(&asm_host)?; - let moho_predicate = resolve_predicate(&moho_host)?; - Ok(Self { - asm_host, - moho_host, - asm_predicate, - moho_predicate, - }) - } -} - -/// Builds the `(asm, moho)` host pair used by the proof orchestrator. -/// -/// Dispatches on the [`BackendConfig`] variant. If the variant does not -/// match the binary's build features, surfaces a clear startup error rather -/// than failing later in the proving path. -async fn build_proof_hosts(cfg: &BackendConfig) -> Result<(ProofHost, ProofHost)> { - match cfg { - BackendConfig::Sp1 { - asm_elf_path, - moho_elf_path, - } => build_sp1_hosts(asm_elf_path, moho_elf_path).await, - BackendConfig::Native { - asm_schnorr_signing_key, - moho_schnorr_signing_key, - } => build_native_hosts(asm_schnorr_signing_key, moho_schnorr_signing_key).await, - } -} - -#[cfg(feature = "sp1")] -async fn build_sp1_hosts( - asm_elf_path: &Path, - moho_elf_path: &Path, -) -> Result<(ProofHost, ProofHost)> { - use std::fs; - - let asm_elf = fs::read(asm_elf_path) - .with_context(|| format!("failed to read ASM guest ELF at {}", asm_elf_path.display()))?; - let moho_elf = fs::read(moho_elf_path).with_context(|| { - format!( - "failed to read Moho guest ELF at {}", - moho_elf_path.display() - ) - })?; - - Ok(( - SP1Host::init(&asm_elf).await, - SP1Host::init(&moho_elf).await, - )) -} - -#[cfg(not(feature = "sp1"))] -async fn build_sp1_hosts( - _asm_elf_path: &Path, - _moho_elf_path: &Path, -) -> Result<(ProofHost, ProofHost)> { - bail!("sp1 backend requested but binary was built without the `sp1` feature"); -} - -#[cfg(feature = "sp1")] -async fn build_native_hosts( - _asm_signing_key: &SigningKey, - _moho_signing_key: &SigningKey, -) -> Result<(ProofHost, ProofHost)> { - bail!("native backend requested but binary was built with the `sp1` feature"); -} - -#[cfg(not(feature = "sp1"))] -async fn build_native_hosts( - asm_signing_key: &SigningKey, - moho_signing_key: &SigningKey, -) -> Result<(ProofHost, ProofHost)> { - // Bypass the `*::native_host()` convenience constructors: they call - // `NativeHost::new_with_random_key`, which would make each host's - // verifying key — and therefore its derived `PredicateKey` — different - // on every restart. The orchestrator needs stable predicate identities - // across runs, so we construct `NativeHost` directly with the keys - // supplied by config. - use moho_recursive_proof::process_recursive_moho_proof; - use strata_asm_proof_impl::statements::process_asm_stf; - use zkaleido_native_adapter::NativeHost; - - Ok(( - NativeHost::new(asm_signing_key.clone(), process_asm_stf), - NativeHost::new(moho_signing_key.clone(), process_recursive_moho_proof), - )) -} - -/// Resolves the [`PredicateKey`] for proofs produced by `host`. -/// -/// The returned key carries both the predicate type (matching the host's -/// [`ZkVm`] backend) and the encoded verifying-key material required to -/// validate proofs from that host. -/// -/// # Errors -/// -/// - For SP1 hosts, returns an error if the host's verifying key cannot be deserialized into an -/// `SP1VerifyingKey` or if the SP1 Groth16 verifier cannot be loaded for the resulting program -/// hash. -/// - For Risc0 hosts, returns an error because predicate resolution is not yet implemented for that -/// backend. -/// - When built without the `sp1` feature, an SP1 host returns an error because the SP1 -/// verifying-key handling is gated behind that feature. -fn resolve_predicate(host: &impl ZkVmHost) -> Result { - match host.zkvm() { - // Native execution does not produce a real cryptographic proof; the - // predicate simply carries the verifying-key bytes verbatim under the - // BIP-340 Schnorr type as a placeholder identifier. - ZkVm::Native => Ok(PredicateKey::new( - PredicateTypeId::Bip340Schnorr, - host.vk().as_bytes().to_vec(), - )), - - // SP1 proofs are wrapped in a Groth16 proof, so the on-chain - // predicate must identify the SP1 Groth16 verifying key (not the SP1 - // program vk itself). The conversion is: - // 1. Decode the SP1 verifying key from the host's raw bytes. - // 2. Hash it to obtain the program commitment expected by the Groth16 verifier. - // 3. Load the matching Groth16 verifier and serialize its vk into the predicate key. - #[cfg(feature = "sp1")] - ZkVm::SP1 => { - let vk = host.vk(); - let sp1_vk: SP1VerifyingKey = bincode::deserialize(vk.as_bytes()) - .context("failed to deserialize SP1 verifying key")?; - - let verifier = SP1Groth16Verifier::load( - &GROTH16_VK_BYTES, - sp1_vk.bytes32_raw(), - *VK_ROOT_BYTES, - true, - ) - .context("failed to load SP1 Groth16 verifier")?; - - Ok(PredicateKey::new( - PredicateTypeId::Sp1Groth16, - verifier.to_uncompressed_bytes(), - )) - } - #[cfg(not(feature = "sp1"))] - ZkVm::SP1 => bail!("SP1 predicate key resolution requires the `sp1` feature"), - - // Risc0 support is not yet wired up; surface a clear error rather - // than panicking so callers can fail gracefully. - ZkVm::Risc0 => bail!("predicate key resolution is not implemented for Risc0"), - } -} diff --git a/bin/asm-runner/src/prover/mod.rs b/bin/asm-runner/src/prover/mod.rs deleted file mode 100644 index c2cbce6e..00000000 --- a/bin/asm-runner/src/prover/mod.rs +++ /dev/null @@ -1,15 +0,0 @@ -//! Proof orchestration for the ASM runner. -//! -//! Manages the lifecycle of ASM step proofs and Moho recursive proofs by -//! scheduling jobs on a remote prover service and reconciling results. - -mod backend; -pub(crate) mod config; -mod input; -mod orchestrator; -mod proof_store; -mod queue; - -pub(crate) use self::{ - backend::ProofBackend, input::InputBuilder, orchestrator::ProofOrchestrator, -}; diff --git a/bin/asm-runner/src/prover_context.rs b/bin/asm-runner/src/prover_context.rs new file mode 100644 index 00000000..82132f63 --- /dev/null +++ b/bin/asm-runner/src/prover_context.rs @@ -0,0 +1,221 @@ +//! Concrete [`ProverContext`] implementation for the ASM runner. +//! +//! [`AsmProverContext`] is the prover-side analogue of +//! [`AsmWorkerContext`](crate::worker_context::AsmWorkerContext): it wires the +//! sled-backed proof store, the Moho-state store, the ASM anchor-state store, +//! and the Bitcoin client into the concern traits the prover worker drives +//! against. The proof-store traits are delegated to [`SledProofDb`]; Moho-state +//! reads to [`SledMohoStateDb`]; anchor/aux reads to [`AsmStateDb`]; and L1 +//! block reads to the Bitcoin [`Client`]. +//! +//! [`ProverContext`]: strata_asm_prover_worker::ProverContext + +use std::sync::Arc; + +use anyhow::Context; +use asm_storage::AsmStateDb; +use bitcoin::{Block, block::Header}; +use bitcoind_async_client::{Client, traits::Reader}; +use moho_types::MohoState; +use strata_asm_common::{AnchorState, AuxData}; +use strata_asm_prover_storage::{ + MohoStateDb, ProofDb, RemoteProofMappingDb, RemoteProofMappingError, RemoteProofStatusDb, + RemoteProofStatusError, SledMohoStateDb, SledProofDb, +}; +use strata_asm_prover_types::{AsmProof, L1Range, MohoProof, ProofId, RemoteProofId}; +use strata_asm_prover_worker::{AnchorStateReader, AuxDataReader, L1BlockProvider}; +use strata_asm_worker::AsmState; +use strata_btc_types::L1BlockIdBitcoinExt; +use strata_identifiers::{L1BlockCommitment, L1BlockId}; +use zkaleido::RemoteProofStatus; + +/// Concrete prover context for the ASM runner. +/// +/// Implements every concern trait the prover worker needs, so it satisfies the +/// `ProverContext` umbrella via its blanket impl. The bitcoin reads are async +/// and hit the client directly — the orchestrator drives this from a +/// single-threaded runtime, so blocking on the client would deadlock. +pub(crate) struct AsmProverContext { + proof_db: SledProofDb, + moho_state_db: SledMohoStateDb, + state_db: Arc, + bitcoin_client: Arc, +} + +impl AsmProverContext { + pub(crate) fn new( + proof_db: SledProofDb, + moho_state_db: SledMohoStateDb, + state_db: Arc, + bitcoin_client: Arc, + ) -> Self { + Self { + proof_db, + moho_state_db, + state_db, + bitcoin_client, + } + } +} + +// ---- Proof persistence: delegate to the sled proof store ------------------ + +impl ProofDb for AsmProverContext { + type Error = sled::Error; + + async fn store_asm_proof(&self, range: L1Range, proof: AsmProof) -> Result<(), Self::Error> { + self.proof_db.store_asm_proof(range, proof).await + } + + async fn get_asm_proof(&self, range: L1Range) -> Result, Self::Error> { + self.proof_db.get_asm_proof(range).await + } + + async fn store_moho_proof( + &self, + l1ref: L1BlockCommitment, + proof: MohoProof, + ) -> Result<(), Self::Error> { + self.proof_db.store_moho_proof(l1ref, proof).await + } + + async fn get_moho_proof( + &self, + l1ref: L1BlockCommitment, + ) -> Result, Self::Error> { + self.proof_db.get_moho_proof(l1ref).await + } + + async fn get_latest_moho_proof( + &self, + ) -> Result, Self::Error> { + self.proof_db.get_latest_moho_proof().await + } + + async fn prune(&self, before_height: u32) -> Result<(), Self::Error> { + self.proof_db.prune(before_height).await + } +} + +impl RemoteProofMappingDb for AsmProverContext { + type Error = RemoteProofMappingError; + + async fn get_remote_proof_id(&self, id: ProofId) -> Result, Self::Error> { + self.proof_db.get_remote_proof_id(id).await + } + + async fn get_proof_id( + &self, + remote_id: &RemoteProofId, + ) -> Result, Self::Error> { + self.proof_db.get_proof_id(remote_id).await + } + + async fn put_remote_proof_id( + &self, + id: ProofId, + remote_id: RemoteProofId, + ) -> Result<(), Self::Error> { + self.proof_db.put_remote_proof_id(id, remote_id).await + } +} + +impl RemoteProofStatusDb for AsmProverContext { + type Error = RemoteProofStatusError; + + async fn put_status( + &self, + remote_id: &RemoteProofId, + status: RemoteProofStatus, + ) -> Result<(), Self::Error> { + self.proof_db.put_status(remote_id, status).await + } + + async fn update_status( + &self, + remote_id: &RemoteProofId, + status: RemoteProofStatus, + ) -> Result<(), Self::Error> { + self.proof_db.update_status(remote_id, status).await + } + + async fn get_status( + &self, + remote_id: &RemoteProofId, + ) -> Result, Self::Error> { + self.proof_db.get_status(remote_id).await + } + + async fn get_all_in_progress( + &self, + ) -> Result, Self::Error> { + self.proof_db.get_all_in_progress().await + } + + async fn remove(&self, remote_id: &RemoteProofId) -> Result<(), Self::Error> { + self.proof_db.remove(remote_id).await + } +} + +// ---- Moho-state reads: delegate to the sled moho-state store -------------- + +impl MohoStateDb for AsmProverContext { + type Error = sled::Error; + + async fn store_moho_state( + &self, + l1ref: L1BlockCommitment, + state: MohoState, + ) -> Result<(), Self::Error> { + self.moho_state_db.store_moho_state(l1ref, state).await + } + + async fn get_moho_state( + &self, + l1ref: L1BlockCommitment, + ) -> Result, Self::Error> { + self.moho_state_db.get_moho_state(l1ref).await + } + + async fn prune(&self, before_height: u32) -> Result<(), Self::Error> { + self.moho_state_db.prune(before_height).await + } +} + +// ---- Chain/state reads for the input builder ------------------------------ + +impl AnchorStateReader for AsmProverContext { + fn get_anchor_state(&self, blockid: &L1BlockCommitment) -> anyhow::Result { + let asm_state: AsmState = self + .state_db + .get(blockid)? + .context("anchor state not found")?; + Ok(asm_state.state().clone()) + } +} + +impl AuxDataReader for AsmProverContext { + fn get_aux_data(&self, blockid: &L1BlockCommitment) -> anyhow::Result { + self.state_db + .get_aux_data(blockid)? + .context("aux data not found for block") + } +} + +impl L1BlockProvider for AsmProverContext { + async fn get_l1_block(&self, blockid: &L1BlockId) -> anyhow::Result { + let hash = blockid.to_block_hash(); + self.bitcoin_client + .get_block(&hash) + .await + .context("failed to fetch Bitcoin block") + } + + async fn get_l1_block_header(&self, blockid: &L1BlockId) -> anyhow::Result
{ + let hash = blockid.to_block_hash(); + self.bitcoin_client + .get_block_header(&hash) + .await + .context("failed to fetch Bitcoin block header") + } +} diff --git a/bin/asm-runner/src/rpc_server.rs b/bin/asm-runner/src/rpc_server.rs index d5103f12..e5b04357 100644 --- a/bin/asm-runner/src/rpc_server.rs +++ b/bin/asm-runner/src/rpc_server.rs @@ -13,14 +13,14 @@ use jsonrpsee::{ types::{ErrorObject, ErrorObjectOwned}, }; use ssz::{Decode, Encode}; -use strata_asm_proof_db::{ProofDb, SledMohoStateDb, SledProofDb}; -use strata_asm_proof_types::{AsmProof, L1Range, MohoProof}; use strata_asm_proto_bridge_v1::{AssignmentEntry, BridgeV1State, DepositEntry}; use strata_asm_proto_bridge_v1_txs::BRIDGE_V1_SUBPROTOCOL_ID; use strata_asm_proto_bridge_v1_types::SafeHarbour; use strata_asm_proto_checkpoint::CheckpointState; use strata_asm_proto_checkpoint_txs::CHECKPOINT_SUBPROTOCOL_ID; use strata_asm_proto_checkpoint_types::CheckpointTip; +use strata_asm_prover_storage::{ProofDb, SledMohoStateDb, SledProofDb}; +use strata_asm_prover_types::{AsmProof, L1Range, MohoProof}; use strata_asm_rpc::traits::{AsmControlApiServer, AsmProofApiServer, AsmStateApiServer}; use strata_asm_worker::{AsmState, AsmWorkerHandle, AsmWorkerStatus}; use strata_btc_types::BlockHashExt; diff --git a/bin/asm-runner/src/worker_context.rs b/bin/asm-runner/src/worker_context.rs index dac16030..be2c7d80 100644 --- a/bin/asm-runner/src/worker_context.rs +++ b/bin/asm-runner/src/worker_context.rs @@ -24,10 +24,10 @@ use moho_runtime_interface::MohoProgram; use moho_types::{ExportState, MohoState}; use strata_asm_common::{AnchorState, AsmManifest, AsmManifestHash, AuxData}; use strata_asm_logs::NewExportEntry; -use strata_asm_proof_db::SledMohoStateDb; use strata_asm_proof_impl::moho_program::program::{ AsmStfProgram, advance_export_state_with_logs, extract_next_predicate_from_logs, }; +use strata_asm_prover_storage::SledMohoStateDb; use strata_asm_worker::{ AnchorStateStore, AsmState, AuxDataStore, L1DataProvider, ManifestMmrStore, WorkerError, WorkerResult, diff --git a/crates/proof/db/Cargo.toml b/crates/extensions/prover/storage/Cargo.toml similarity index 83% rename from crates/proof/db/Cargo.toml rename to crates/extensions/prover/storage/Cargo.toml index 87decc6d..68a1c260 100644 --- a/crates/proof/db/Cargo.toml +++ b/crates/extensions/prover/storage/Cargo.toml @@ -1,11 +1,11 @@ [package] -name = "strata-asm-proof-db" +name = "strata-asm-prover-storage" version = "0.1.0" edition = "2024" [dependencies] moho-types.workspace = true -strata-asm-proof-types.workspace = true +strata-asm-prover-types.workspace = true strata-identifiers.workspace = true borsh.workspace = true diff --git a/crates/proof/db/src/lib.rs b/crates/extensions/prover/storage/src/lib.rs similarity index 90% rename from crates/proof/db/src/lib.rs rename to crates/extensions/prover/storage/src/lib.rs index c7906d60..bf5f348a 100644 --- a/crates/proof/db/src/lib.rs +++ b/crates/extensions/prover/storage/src/lib.rs @@ -7,8 +7,8 @@ //! - [`ProofDb`] — stores and retrieves finalised ASM step proofs and Moho recursive proofs, keyed //! by their L1 block range or commitment. //! - [`RemoteProofMappingDb`] — maintains a bidirectional mapping between local -//! [`ProofId`](strata_asm_proof_types::ProofId)s and opaque -//! [`RemoteProofId`](strata_asm_proof_types::RemoteProofId)s assigned by the remote prover +//! [`ProofId`](strata_asm_prover_types::ProofId)s and opaque +//! [`RemoteProofId`](strata_asm_prover_types::RemoteProofId)s assigned by the remote prover //! service. //! - [`RemoteProofStatusDb`] — tracks the execution status of in-flight remote proof jobs until //! their results are retrieved and stored locally. diff --git a/crates/proof/db/src/moho_state.rs b/crates/extensions/prover/storage/src/moho_state.rs similarity index 100% rename from crates/proof/db/src/moho_state.rs rename to crates/extensions/prover/storage/src/moho_state.rs diff --git a/crates/proof/db/src/proof_db.rs b/crates/extensions/prover/storage/src/proof_db.rs similarity index 97% rename from crates/proof/db/src/proof_db.rs rename to crates/extensions/prover/storage/src/proof_db.rs index f7097623..13ea9cd3 100644 --- a/crates/proof/db/src/proof_db.rs +++ b/crates/extensions/prover/storage/src/proof_db.rs @@ -5,7 +5,7 @@ use std::fmt::Debug; -use strata_asm_proof_types::{AsmProof, L1Range, MohoProof}; +use strata_asm_prover_types::{AsmProof, L1Range, MohoProof}; use strata_identifiers::L1BlockCommitment; /// Persistence interface for proof storage. diff --git a/crates/proof/db/src/remote_mapping.rs b/crates/extensions/prover/storage/src/remote_mapping.rs similarity index 97% rename from crates/proof/db/src/remote_mapping.rs rename to crates/extensions/prover/storage/src/remote_mapping.rs index 13fc2caf..985501bc 100644 --- a/crates/proof/db/src/remote_mapping.rs +++ b/crates/extensions/prover/storage/src/remote_mapping.rs @@ -6,7 +6,7 @@ use std::fmt::Debug; -use strata_asm_proof_types::{ProofId, RemoteProofId}; +use strata_asm_prover_types::{ProofId, RemoteProofId}; /// Persistent bidirectional mapping between local [`ProofId`]s and /// [`RemoteProofId`]s assigned by the remote prover service. diff --git a/crates/proof/db/src/remote_status.rs b/crates/extensions/prover/storage/src/remote_status.rs similarity index 97% rename from crates/proof/db/src/remote_status.rs rename to crates/extensions/prover/storage/src/remote_status.rs index 42d6a53c..845b0834 100644 --- a/crates/proof/db/src/remote_status.rs +++ b/crates/extensions/prover/storage/src/remote_status.rs @@ -6,7 +6,7 @@ use std::fmt::Debug; -use strata_asm_proof_types::RemoteProofId; +use strata_asm_prover_types::RemoteProofId; use zkaleido::RemoteProofStatus; /// Persistent store for the execution status of remote proof jobs. diff --git a/crates/proof/db/src/sled/mod.rs b/crates/extensions/prover/storage/src/sled/mod.rs similarity index 98% rename from crates/proof/db/src/sled/mod.rs rename to crates/extensions/prover/storage/src/sled/mod.rs index 099c4e2d..8bf7e9a0 100644 --- a/crates/proof/db/src/sled/mod.rs +++ b/crates/extensions/prover/storage/src/sled/mod.rs @@ -5,7 +5,7 @@ //! concern. Keys use big-endian height encoding so that sled's lexicographic //! ordering matches block-height ordering. -use strata_asm_proof_types::L1Range; +use strata_asm_prover_types::L1Range; use strata_identifiers::{Buf32, L1BlockCommitment, L1BlockId}; mod moho_state; @@ -121,7 +121,7 @@ pub(crate) fn decode_moho_key(key: &[u8]) -> L1BlockCommitment { #[cfg(test)] pub(crate) mod test_util { use proptest::{collection::vec, prelude::*}; - use strata_asm_proof_types::{AsmProof, L1Range, MohoProof}; + use strata_asm_prover_types::{AsmProof, L1Range, MohoProof}; use strata_identifiers::{Buf32, L1BlockCommitment, L1BlockId}; use zkaleido::{ ProgramId, Proof, ProofMetadata, ProofReceipt, ProofReceiptWithMetadata, ProofType, diff --git a/crates/proof/db/src/sled/moho_state.rs b/crates/extensions/prover/storage/src/sled/moho_state.rs similarity index 100% rename from crates/proof/db/src/sled/moho_state.rs rename to crates/extensions/prover/storage/src/sled/moho_state.rs diff --git a/crates/proof/db/src/sled/proof_db.rs b/crates/extensions/prover/storage/src/sled/proof_db.rs similarity index 99% rename from crates/proof/db/src/sled/proof_db.rs rename to crates/extensions/prover/storage/src/sled/proof_db.rs index ffbc3c03..c7012850 100644 --- a/crates/proof/db/src/sled/proof_db.rs +++ b/crates/extensions/prover/storage/src/sled/proof_db.rs @@ -1,7 +1,7 @@ //! [`ProofDb`] implementation for [`SledProofDb`]. use borsh::BorshDeserialize; -use strata_asm_proof_types::{AsmProof, L1Range, MohoProof}; +use strata_asm_prover_types::{AsmProof, L1Range, MohoProof}; use strata_identifiers::L1BlockCommitment; use super::{SledProofDb, decode_moho_key, encode_asm_key, encode_moho_key}; diff --git a/crates/proof/db/src/sled/remote_mapping.rs b/crates/extensions/prover/storage/src/sled/remote_mapping.rs similarity index 99% rename from crates/proof/db/src/sled/remote_mapping.rs rename to crates/extensions/prover/storage/src/sled/remote_mapping.rs index bd302549..2a9b7c37 100644 --- a/crates/proof/db/src/sled/remote_mapping.rs +++ b/crates/extensions/prover/storage/src/sled/remote_mapping.rs @@ -3,7 +3,7 @@ use std::{error::Error, fmt}; use borsh::BorshDeserialize; -use strata_asm_proof_types::{ProofId, RemoteProofId}; +use strata_asm_prover_types::{ProofId, RemoteProofId}; use super::SledProofDb; use crate::RemoteProofMappingDb; @@ -112,7 +112,7 @@ mod tests { use std::collections::HashSet; use proptest::{collection::vec, prelude::*}; - use strata_asm_proof_types::ProofId; + use strata_asm_prover_types::ProofId; use tokio::runtime::Runtime; use super::*; diff --git a/crates/proof/db/src/sled/remote_status.rs b/crates/extensions/prover/storage/src/sled/remote_status.rs similarity index 99% rename from crates/proof/db/src/sled/remote_status.rs rename to crates/extensions/prover/storage/src/sled/remote_status.rs index 2bae51a6..8b14ea66 100644 --- a/crates/proof/db/src/sled/remote_status.rs +++ b/crates/extensions/prover/storage/src/sled/remote_status.rs @@ -3,7 +3,7 @@ use std::{error::Error, fmt}; use borsh::BorshDeserialize; -use strata_asm_proof_types::RemoteProofId; +use strata_asm_prover_types::RemoteProofId; use zkaleido::RemoteProofStatus; use super::SledProofDb; diff --git a/crates/proof/types/Cargo.toml b/crates/extensions/prover/types/Cargo.toml similarity index 85% rename from crates/proof/types/Cargo.toml rename to crates/extensions/prover/types/Cargo.toml index 93d376ae..03f56e9b 100644 --- a/crates/proof/types/Cargo.toml +++ b/crates/extensions/prover/types/Cargo.toml @@ -1,6 +1,6 @@ [package] edition = "2021" -name = "strata-asm-proof-types" +name = "strata-asm-prover-types" version = "0.1.0" [lints] diff --git a/crates/proof/types/src/lib.rs b/crates/extensions/prover/types/src/lib.rs similarity index 100% rename from crates/proof/types/src/lib.rs rename to crates/extensions/prover/types/src/lib.rs diff --git a/crates/extensions/prover/worker/Cargo.toml b/crates/extensions/prover/worker/Cargo.toml new file mode 100644 index 00000000..49978572 --- /dev/null +++ b/crates/extensions/prover/worker/Cargo.toml @@ -0,0 +1,56 @@ +[package] +name = "strata-asm-prover-worker" +version = "0.1.0" +edition = "2024" + +[lints] +workspace = true + +[dependencies] +moho-recursive-proof.workspace = true +moho-runtime-impl.workspace = true +moho-types.workspace = true +strata-asm-common.workspace = true +strata-asm-proof-impl.workspace = true +strata-asm-prover-storage.workspace = true +strata-asm-prover-types.workspace = true +strata-asm-worker.workspace = true +strata-btc-types.workspace = true +strata-btc-verification.workspace = true +strata-identifiers.workspace = true +strata-merkle = { workspace = true, features = ["ssz"] } +strata-predicate.workspace = true +strata-tasks.workspace = true + +anyhow.workspace = true +async-trait.workspace = true +bitcoin.workspace = true +hex.workspace = true +k256 = { workspace = true, features = ["schnorr"] } +serde.workspace = true +ssz.workspace = true +thiserror.workspace = true +tokio.workspace = true +tracing.workspace = true +tree_hash.workspace = true +zkaleido.workspace = true +zkaleido-native-adapter.workspace = true + +bincode = { workspace = true, optional = true } +sp1-sdk = { workspace = true, optional = true } +sp1-verifier = { workspace = true, optional = true } +zkaleido-sp1-groth16-verifier = { workspace = true, optional = true } +zkaleido-sp1-host = { workspace = true, optional = true } + +[dev-dependencies] +tokio.workspace = true + +[features] +default = [] +sp1 = [ + "dep:zkaleido-sp1-host", + "dep:zkaleido-sp1-groth16-verifier", + "dep:sp1-verifier", + "dep:sp1-sdk", + "dep:bincode", +] diff --git a/crates/extensions/prover/worker/src/backend/mod.rs b/crates/extensions/prover/worker/src/backend/mod.rs new file mode 100644 index 00000000..80f459a3 --- /dev/null +++ b/crates/extensions/prover/worker/src/backend/mod.rs @@ -0,0 +1,103 @@ +//! ZK proof backend setup for the prover worker. +//! +//! Bundles the feature-gated selection of the ZK proof backend in one place: +//! host construction (SP1 or native, in [`sp1`] / [`native`]) and derivation of +//! the [`PredicateKey`] that authorizes proofs from each host. The result is +//! exposed as a single [`ProofBackend`] value that the runner builds once at +//! startup and threads into the proof orchestrator and the input builder. + +mod native; +mod sp1; + +use anyhow::{Result, bail}; +use strata_predicate::PredicateKey; +use zkaleido::{ZkVm, ZkVmHost}; +#[cfg(feature = "sp1")] +use zkaleido_sp1_host::SP1Host; + +use crate::config::BackendConfig; + +/// Concrete host type used by the proof orchestrator. +/// +/// Resolves to [`SP1Host`] when the `sp1` feature is enabled, otherwise to +/// the in-process [`zkaleido_native_adapter::NativeHost`]. +#[cfg(feature = "sp1")] +pub type ProofHost = SP1Host; + +#[cfg(not(feature = "sp1"))] +pub type ProofHost = zkaleido_native_adapter::NativeHost; + +/// ZK proof backend used by the runner. +/// +/// Bundles the `(asm, moho)` host pair together with the [`PredicateKey`] that +/// each one's proofs verify against. Constructed once at startup via +/// [`ProofBackend::new`] and consumed by the proof orchestrator (hosts) and +/// the input builder (predicates). +#[derive(Debug)] +pub struct ProofBackend { + pub asm_host: ProofHost, + pub moho_host: ProofHost, + pub asm_predicate: PredicateKey, + pub moho_predicate: PredicateKey, +} + +impl ProofBackend { + /// Builds the ZK proof backend. + /// + /// Constructs both proof hosts and resolves the [`PredicateKey`] each + /// host's proofs verify against. + /// + /// # Errors + /// + /// - Returns an error if the requested [`BackendConfig`] variant does not match the binary's + /// build features (e.g. `Sp1` requested without the `sp1` feature). + /// - Returns an error if either host cannot be constructed (e.g. a guest ELF cannot be read in + /// `sp1` builds) or if either host's verifying key cannot be turned into a [`PredicateKey`]. + pub async fn new(cfg: &BackendConfig) -> Result { + let (asm_host, moho_host) = build_proof_hosts(cfg).await?; + let asm_predicate = resolve_predicate(&asm_host)?; + let moho_predicate = resolve_predicate(&moho_host)?; + Ok(Self { + asm_host, + moho_host, + asm_predicate, + moho_predicate, + }) + } +} + +/// Builds the `(asm, moho)` host pair used by the proof orchestrator. +/// +/// Dispatches on the [`BackendConfig`] variant. If the variant does not +/// match the binary's build features, the corresponding builder surfaces a +/// clear startup error rather than failing later in the proving path. +async fn build_proof_hosts(cfg: &BackendConfig) -> Result<(ProofHost, ProofHost)> { + match cfg { + BackendConfig::Sp1 { + asm_elf_path, + moho_elf_path, + } => sp1::build_sp1_hosts(asm_elf_path, moho_elf_path).await, + BackendConfig::Native { + asm_schnorr_signing_key, + moho_schnorr_signing_key, + } => native::build_native_hosts(asm_schnorr_signing_key, moho_schnorr_signing_key).await, + } +} + +/// Resolves the [`PredicateKey`] for proofs produced by `host`, dispatching on +/// its [`ZkVm`] backend. +/// +/// # Errors +/// +/// - For SP1 hosts, returns an error if the verifying key cannot be decoded or the Groth16 verifier +/// cannot be loaded (and, when built without the `sp1` feature, that the feature is required). +/// - For Risc0 hosts, returns an error because predicate resolution is not yet implemented. +fn resolve_predicate(host: &impl ZkVmHost) -> Result { + match host.zkvm() { + ZkVm::Native => native::resolve_native_predicate(host), + ZkVm::SP1 => sp1::resolve_sp1_predicate(host), + // Risc0 support is not yet wired up; surface a clear error rather + // than panicking so callers can fail gracefully. + ZkVm::Risc0 => bail!("predicate key resolution is not implemented for Risc0"), + } +} diff --git a/crates/extensions/prover/worker/src/backend/native.rs b/crates/extensions/prover/worker/src/backend/native.rs new file mode 100644 index 00000000..35292e73 --- /dev/null +++ b/crates/extensions/prover/worker/src/backend/native.rs @@ -0,0 +1,49 @@ +//! Native (in-process) proof host construction and predicate resolution. + +use anyhow::Result; +use k256::schnorr::SigningKey; +use strata_predicate::{PredicateKey, PredicateTypeId}; +use zkaleido::ZkVmHost; + +use super::ProofHost; + +/// Resolves the [`PredicateKey`] for a native host. +/// +/// Native execution does not produce a real cryptographic proof; the predicate +/// simply carries the verifying-key bytes verbatim under the BIP-340 Schnorr +/// type as a placeholder identifier. +pub(super) fn resolve_native_predicate(host: &impl ZkVmHost) -> Result { + Ok(PredicateKey::new( + PredicateTypeId::Bip340Schnorr, + host.vk().as_bytes().to_vec(), + )) +} + +#[cfg(feature = "sp1")] +pub(super) async fn build_native_hosts( + _asm_signing_key: &SigningKey, + _moho_signing_key: &SigningKey, +) -> Result<(ProofHost, ProofHost)> { + anyhow::bail!("native backend requested but binary was built with the `sp1` feature"); +} + +#[cfg(not(feature = "sp1"))] +pub(super) async fn build_native_hosts( + asm_signing_key: &SigningKey, + moho_signing_key: &SigningKey, +) -> Result<(ProofHost, ProofHost)> { + // Bypass the `*::native_host()` convenience constructors: they call + // `NativeHost::new_with_random_key`, which would make each host's + // verifying key — and therefore its derived `PredicateKey` — different + // on every restart. The orchestrator needs stable predicate identities + // across runs, so we construct `NativeHost` directly with the keys + // supplied by config. + use moho_recursive_proof::process_recursive_moho_proof; + use strata_asm_proof_impl::statements::process_asm_stf; + use zkaleido_native_adapter::NativeHost; + + Ok(( + NativeHost::new(asm_signing_key.clone(), process_asm_stf), + NativeHost::new(moho_signing_key.clone(), process_recursive_moho_proof), + )) +} diff --git a/crates/extensions/prover/worker/src/backend/sp1.rs b/crates/extensions/prover/worker/src/backend/sp1.rs new file mode 100644 index 00000000..27b3f70f --- /dev/null +++ b/crates/extensions/prover/worker/src/backend/sp1.rs @@ -0,0 +1,81 @@ +//! SP1 proof host construction and predicate resolution. + +use std::path::Path; + +use anyhow::Result; +use strata_predicate::PredicateKey; +use zkaleido::ZkVmHost; +#[cfg(feature = "sp1")] +use { + anyhow::Context, + sp1_sdk::{HashableKey, SP1VerifyingKey}, + sp1_verifier::{GROTH16_VK_BYTES, VK_ROOT_BYTES}, + strata_predicate::PredicateTypeId, + zkaleido_sp1_groth16_verifier::SP1Groth16Verifier, + zkaleido_sp1_host::SP1Host, +}; + +use super::ProofHost; + +#[cfg(feature = "sp1")] +pub(super) async fn build_sp1_hosts( + asm_elf_path: &Path, + moho_elf_path: &Path, +) -> Result<(ProofHost, ProofHost)> { + use std::fs; + + let asm_elf = fs::read(asm_elf_path) + .with_context(|| format!("failed to read ASM guest ELF at {}", asm_elf_path.display()))?; + let moho_elf = fs::read(moho_elf_path).with_context(|| { + format!( + "failed to read Moho guest ELF at {}", + moho_elf_path.display() + ) + })?; + + Ok(( + SP1Host::init(&asm_elf).await, + SP1Host::init(&moho_elf).await, + )) +} + +#[cfg(not(feature = "sp1"))] +pub(super) async fn build_sp1_hosts( + _asm_elf_path: &Path, + _moho_elf_path: &Path, +) -> Result<(ProofHost, ProofHost)> { + anyhow::bail!("sp1 backend requested but binary was built without the `sp1` feature"); +} + +/// Resolves the [`PredicateKey`] for an SP1 host. +/// +/// SP1 proofs are wrapped in a Groth16 proof, so the on-chain predicate must +/// identify the SP1 Groth16 verifying key (not the SP1 program vk itself). The +/// conversion is: +/// 1. Decode the SP1 verifying key from the host's raw bytes. +/// 2. Hash it to obtain the program commitment expected by the Groth16 verifier. +/// 3. Load the matching Groth16 verifier and serialize its vk into the predicate key. +#[cfg(feature = "sp1")] +pub(super) fn resolve_sp1_predicate(host: &impl ZkVmHost) -> Result { + let vk = host.vk(); + let sp1_vk: SP1VerifyingKey = + bincode::deserialize(vk.as_bytes()).context("failed to deserialize SP1 verifying key")?; + + let verifier = SP1Groth16Verifier::load( + &GROTH16_VK_BYTES, + sp1_vk.bytes32_raw(), + *VK_ROOT_BYTES, + true, + ) + .context("failed to load SP1 Groth16 verifier")?; + + Ok(PredicateKey::new( + PredicateTypeId::Sp1Groth16, + verifier.to_uncompressed_bytes(), + )) +} + +#[cfg(not(feature = "sp1"))] +pub(super) fn resolve_sp1_predicate(_host: &impl ZkVmHost) -> Result { + anyhow::bail!("SP1 predicate key resolution requires the `sp1` feature"); +} diff --git a/crates/extensions/prover/worker/src/builder.rs b/crates/extensions/prover/worker/src/builder.rs new file mode 100644 index 00000000..b384c029 --- /dev/null +++ b/crates/extensions/prover/worker/src/builder.rs @@ -0,0 +1,122 @@ +//! Builder for assembling a prover worker. + +use strata_asm_worker::Subscription; +use strata_identifiers::L1BlockCommitment; +use zkaleido::ZkVmRemoteHost; + +use crate::{ + InputBuilder, ProofOrchestrator, ProverContext, + config::OrchestratorConfig, + errors::{ProverError, ProverResult}, +}; + +/// Builder for assembling a prover worker. +/// +/// Wires the context, remote hosts, config, input builder, and the ASM worker's +/// commit subscription into a [`ProofOrchestrator`]. The orchestrator's only +/// input is that subscription: it turns each committed [`L1BlockCommitment`] +/// into the proofs the block requires. +/// +/// The orchestrator is *not* spawned here: its run loop is `!Send` (the +/// upstream `ZkVmRemoteProver` is `#[async_trait(?Send)]`), so the caller must +/// drive [`ProofOrchestrator::run`] itself — typically on a dedicated thread +/// with a single-threaded runtime and a `LocalSet`. +#[derive(Debug)] +pub struct ProverWorkerBuilder { + ctx: Option, + asm_host: Option, + moho_host: Option, + config: Option, + input_builder: Option, + subscription: Option>, +} + +impl ProverWorkerBuilder { + /// Creates a new, empty builder. + pub fn new() -> Self { + Self { + ctx: None, + asm_host: None, + moho_host: None, + config: None, + input_builder: None, + subscription: None, + } + } + + /// Sets the prover context (implements [`ProverContext`]). + pub fn with_context(mut self, ctx: C) -> Self { + self.ctx = Some(ctx); + self + } + + /// Sets the `(asm, moho)` remote host pair. + pub fn with_hosts(mut self, asm_host: H, moho_host: H) -> Self { + self.asm_host = Some(asm_host); + self.moho_host = Some(moho_host); + self + } + + /// Sets the orchestrator configuration. + pub fn with_config(mut self, config: OrchestratorConfig) -> Self { + self.config = Some(config); + self + } + + /// Sets the input builder used to assemble ZkVM inputs. + pub fn with_input_builder(mut self, input_builder: InputBuilder) -> Self { + self.input_builder = Some(input_builder); + self + } + + /// Sets the ASM worker commit subscription that drives the orchestrator. + /// + /// Subscribe *before* the worker starts processing blocks — there is no + /// replay buffer, so any block committed before this subscription exists is + /// not seen. + pub fn with_block_subscription( + mut self, + subscription: Subscription, + ) -> Self { + self.subscription = Some(subscription); + self + } +} + +impl ProverWorkerBuilder { + /// Validates the supplied dependencies and assembles the orchestrator for + /// the caller to drive. + pub fn build(self) -> ProverResult> { + let ctx = self.ctx.ok_or(ProverError::MissingDependency("context"))?; + let asm_host = self + .asm_host + .ok_or(ProverError::MissingDependency("asm_host"))?; + let moho_host = self + .moho_host + .ok_or(ProverError::MissingDependency("moho_host"))?; + let config = self + .config + .ok_or(ProverError::MissingDependency("config"))?; + let input_builder = self + .input_builder + .ok_or(ProverError::MissingDependency("input_builder"))?; + let subscription = self + .subscription + .ok_or(ProverError::MissingDependency("subscription"))?; + + Ok(ProofOrchestrator::new( + ctx, + asm_host, + moho_host, + config, + input_builder, + subscription, + )) + } +} + +impl Default for ProverWorkerBuilder { + fn default() -> Self { + Self::new() + } +} diff --git a/bin/asm-runner/src/prover/config.rs b/crates/extensions/prover/worker/src/config.rs similarity index 95% rename from bin/asm-runner/src/prover/config.rs rename to crates/extensions/prover/worker/src/config.rs index d5e06557..e847effb 100644 --- a/bin/asm-runner/src/prover/config.rs +++ b/crates/extensions/prover/worker/src/config.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; /// Configuration for the proof orchestrator. #[derive(Debug, Clone, Serialize, Deserialize)] -pub(crate) struct OrchestratorConfig { +pub struct OrchestratorConfig { /// Interval between orchestrator ticks. pub tick_interval: Duration, @@ -26,7 +26,7 @@ pub(crate) struct OrchestratorConfig { /// Tagged with `kind` so the same config schema is valid regardless of /// which features the binary was built with. If the selected variant does /// not match the build (e.g. `sp1` requested in a binary built without the -/// `sp1` feature), [`crate::prover::backend::ProofBackend::new`] surfaces a +/// `sp1` feature), [`ProofBackend::new`](crate::ProofBackend::new) surfaces a /// startup error. #[derive(Clone, Serialize, Deserialize)] #[serde(tag = "kind", rename_all = "snake_case")] @@ -34,7 +34,7 @@ pub(crate) struct OrchestratorConfig { clippy::large_enum_variant, reason = "BackendConfig is parsed once at startup; boxing a SigningKey to save a few bytes on a singleton value is not worth the indirection" )] -pub(crate) enum BackendConfig { +pub enum BackendConfig { /// SP1 backend. Loads the ASM and Moho guest ELFs from explicit paths at startup. Sp1 { asm_elf_path: PathBuf, diff --git a/crates/extensions/prover/worker/src/errors.rs b/crates/extensions/prover/worker/src/errors.rs new file mode 100644 index 00000000..402cccca --- /dev/null +++ b/crates/extensions/prover/worker/src/errors.rs @@ -0,0 +1,21 @@ +//! Error types for the prover worker setup surface. +//! +//! The orchestration loop itself stays `anyhow`-based (it logs and continues on +//! transient failures); these typed errors cover the builder/launch path. + +use thiserror::Error; + +/// Result alias for prover-worker setup operations. +pub type ProverResult = Result; + +/// Errors surfaced while building or launching the prover worker. +#[derive(Debug, Error)] +pub enum ProverError { + /// A required dependency was not supplied to the builder. + #[error("missing required dependency: {0}")] + MissingDependency(&'static str), + + /// Any other error, typically from backend construction. + #[error(transparent)] + Other(#[from] anyhow::Error), +} diff --git a/bin/asm-runner/src/prover/input.rs b/crates/extensions/prover/worker/src/input.rs similarity index 64% rename from bin/asm-runner/src/prover/input.rs rename to crates/extensions/prover/worker/src/input.rs index 7d6e289c..5b31283e 100644 --- a/bin/asm-runner/src/prover/input.rs +++ b/crates/extensions/prover/worker/src/input.rs @@ -1,70 +1,68 @@ //! Input preparation for proof generation. //! -//! Builds the [`RuntimeInput`] required by the ZkVM program for each proof type. - -use std::sync::Arc; +//! Builds the [`RuntimeInput`] required by the ZkVM program for each proof type, +//! reading every dependency (proofs, Moho state, anchor state, aux data, L1 +//! blocks) through the [`ProverContext`] rather than holding concrete handles. use anyhow::{Context, Result}; -use asm_storage::AsmStateDb; -use bitcoind_async_client::{Client, traits::Reader}; use moho_recursive_proof::{MohoRecursiveInput, MohoRecursiveOutput}; use moho_runtime_impl::RuntimeInput; use moho_types::{MohoState, RecursiveMohoProof, StepMohoAttestation, StepMohoProof}; use ssz::{Decode, Encode}; -use strata_asm_proof_db::{MohoStateDb, ProofDb, SledMohoStateDb, SledProofDb}; use strata_asm_proof_impl::moho_program::input::AsmStepInput; -use strata_asm_proof_types::L1Range; -use strata_btc_types::{BlockHashExt, L1BlockIdBitcoinExt}; -use strata_btc_verification::{self, TxidInclusionProof}; +use strata_asm_prover_types::L1Range; +use strata_btc_types::BlockHashExt; +use strata_btc_verification::TxidInclusionProof; use strata_identifiers::L1BlockCommitment; use strata_merkle::{BinaryMerkleTree, MerkleProofB32, Sha256NoPrefixHasher}; use strata_predicate::PredicateKey; use tree_hash::{Sha256Hasher as TreeSha256Hasher, TreeHash}; +use crate::ProverContext; + /// Builds [`RuntimeInput`] for proof generation, dispatching by proof type. -pub(crate) struct InputBuilder { - state_db: Arc, - bitcoin_client: Arc, - proof_db: SledProofDb, - moho_state_db: SledMohoStateDb, +/// +/// Holds only the values that are fixed for the lifetime of the prover (the +/// genesis commitment and the two predicate keys); all per-block data is read +/// from the [`ProverContext`] passed to each method. +#[derive(Debug)] +pub struct InputBuilder { genesis: L1BlockCommitment, asm_predicate: PredicateKey, moho_predicate: PredicateKey, } -pub(crate) struct MohoPrerequisite { +/// Prerequisites required to build a Moho recursive proof input for a block: +/// the inner ASM step proof and (unless genesis) the previous Moho proof. +#[derive(Debug)] +pub struct MohoPrerequisite { prev_moho_proof: Option, incremental_step_proof: StepMohoProof, } impl InputBuilder { - pub(crate) fn new( - state_db: Arc, - bitcoin_client: Arc, - proof_db: SledProofDb, - moho_state_db: SledMohoStateDb, + /// Creates a new input builder. + pub fn new( genesis: L1BlockCommitment, asm_predicate: PredicateKey, moho_predicate: PredicateKey, ) -> Self { Self { - state_db, - bitcoin_client, - proof_db, - moho_state_db, genesis, asm_predicate, moho_predicate, } } - async fn get_parent_commitment(&self, l1_ref: L1BlockCommitment) -> Result { - let block_hash = l1_ref.blkid().to_block_hash(); - let header = self - .bitcoin_client - .get_block_header(&block_hash) + async fn get_parent_commitment( + &self, + ctx: &C, + l1_ref: L1BlockCommitment, + ) -> Result { + let header = ctx + .get_l1_block_header(l1_ref.blkid()) .await - .context("failed to fetch Bitcoin block")?; + .context("failed to fetch Bitcoin block header")?; let parent_hash = header.prev_blockhash; let parent_height = l1_ref @@ -77,46 +75,55 @@ impl InputBuilder { } /// Fetches the persisted [`MohoState`] for the given L1 block. The worker - /// materializes this alongside each anchor state — see `AsmWorkerContext::store_anchor_state`. - async fn get_moho_state(&self, l1_ref: L1BlockCommitment) -> Result { - self.moho_state_db - .get_moho_state(l1_ref) + /// materializes this alongside each anchor state — see the runner's + /// `AsmWorkerContext::store_anchor_state`. + async fn get_moho_state( + &self, + ctx: &C, + l1_ref: L1BlockCommitment, + ) -> Result { + ctx.get_moho_state(l1_ref) .await .context("failed to fetch moho state")? .context("moho state not found for block") } - pub(crate) async fn check_moho_prerequisite( + /// Checks whether the prerequisites for a Moho recursive proof at `block` + /// are available, returning them if so. + pub async fn check_moho_prerequisite( &self, + ctx: &C, block: L1BlockCommitment, ) -> Result { // 1. ASM step proof is required. - let asm_proof = self - .proof_db + let asm_proof = ctx .get_asm_proof(L1Range::single(block)) - .await? + .await + .context("failed to fetch ASM step proof")? .context("ASM step proof not available yet for this block")?; let asm_receipt = asm_proof.0.receipt(); let asm_attestation = StepMohoAttestation::from_ssz_bytes(asm_receipt.public_values().as_bytes()) - .context("invalid ASM attestation in stored proof")?; + .map_err(|e| anyhow::anyhow!("invalid ASM attestation in stored proof: {e:?}"))?; let asm_step_proof = StepMohoProof::new(asm_attestation, asm_receipt.proof().as_bytes().to_vec()); // 2. Previous moho proof: required unless this is the genesis block. - let parent = self.get_parent_commitment(block).await?; + let parent = self.get_parent_commitment(ctx, block).await?; let prev_moho_proof = if parent == self.genesis { None } else { - let proof = self - .proof_db + let proof = ctx .get_moho_proof(parent) - .await? + .await + .context("failed to fetch previous moho proof")? .context("previous moho recursive proof not available yet")?; let receipt = proof.0.receipt(); let output = MohoRecursiveOutput::from_ssz_bytes(receipt.public_values().as_bytes()) - .context("invalid moho recursive output in stored proof")?; + .map_err(|e| { + anyhow::anyhow!("invalid moho recursive output in stored proof: {e:?}") + })?; Some(RecursiveMohoProof::new( output.attestation().clone(), receipt.proof().as_bytes().to_vec(), @@ -133,23 +140,23 @@ impl InputBuilder { /// /// This fetches the Bitcoin block and auxiliary data, reconstructs the /// pre-state, and assembles the input the ZkVM program expects. - pub(crate) async fn build_asm_runtime_input(&self, range: &L1Range) -> Result { + pub async fn build_asm_runtime_input( + &self, + ctx: &C, + range: &L1Range, + ) -> Result { let commitment = range.start(); // 1. Fetch the Bitcoin block. - let block_hash = commitment.blkid().to_block_hash(); - let block = self - .bitcoin_client - .get_block(&block_hash) + let block = ctx + .get_l1_block(commitment.blkid()) .await .context("failed to fetch Bitcoin block")?; // 2. Fetch the auxiliary data stored during STF execution. - let aux_data = self - .state_db + let aux_data = ctx .get_aux_data(&commitment) - .context("failed to fetch aux data")? - .context("aux data not found for block")?; + .context("failed to fetch aux data")?; let coinbase_inclusion_proof = match block.witness_root() { Some(_) => Some(TxidInclusionProof::generate(&block.txdata, 0)), @@ -160,17 +167,14 @@ impl InputBuilder { let step_input = AsmStepInput::new(block.clone(), aux_data, coinbase_inclusion_proof); // 4. Fetch the pre-state (anchor state for the parent block). - let parent_commitment = self.get_parent_commitment(commitment).await?; + let parent_commitment = self.get_parent_commitment(ctx, commitment).await?; - let asm_state = self - .state_db - .get(&parent_commitment) - .context("failed to fetch parent anchor state")? - .context("parent anchor state not found")?; - let anchor_state = asm_state.state(); + let anchor_state = ctx + .get_anchor_state(&parent_commitment) + .context("failed to fetch parent anchor state")?; // 5. Compute the Moho pre-state from the anchor state. - let moho_pre_state = self.get_moho_state(parent_commitment).await?; + let moho_pre_state = self.get_moho_state(ctx, parent_commitment).await?; // 6. Build RuntimeInput. let runtime_input = RuntimeInput::new( @@ -182,8 +186,10 @@ impl InputBuilder { Ok(runtime_input) } - pub(crate) async fn build_moho_runtime_input( + /// Builds the [`MohoRecursiveInput`] for a Moho recursive proof at `l1_ref`. + pub async fn build_moho_runtime_input( &self, + ctx: &C, prerequisite: MohoPrerequisite, l1_ref: L1BlockCommitment, ) -> Result { @@ -198,8 +204,8 @@ impl InputBuilder { // the ASM predicate. let step_predicate = self.asm_predicate.clone(); - let parent = self.get_parent_commitment(l1_ref).await?; - let parent_state = self.get_moho_state(parent).await?; + let parent = self.get_parent_commitment(ctx, l1_ref).await?; + let parent_state = self.get_moho_state(ctx, parent).await?; let leaves = vec![ <_ as TreeHash>::tree_hash_root::(&parent_state.inner_state) diff --git a/crates/extensions/prover/worker/src/lib.rs b/crates/extensions/prover/worker/src/lib.rs new file mode 100644 index 00000000..4ded7bb9 --- /dev/null +++ b/crates/extensions/prover/worker/src/lib.rs @@ -0,0 +1,35 @@ +//! # strata-asm-prover-worker +//! +//! Orchestrates remote ASM step proofs and Moho recursive proofs. +//! +//! The worker defines a [`ProverContext`] umbrella trait abstracting its +//! storage and chain-data dependencies, and a [`ProofOrchestrator`] that drives +//! proof scheduling and reconciliation generically over it — mirroring how the +//! ASM worker (`strata-asm-worker`) is built. The orchestrator is fed by the ASM +//! worker's commit subscription: each committed block expands into the ASM step +//! proof and Moho recursive proof it requires. Concrete sled-backed storage +//! lives in the sibling `strata-asm-prover-storage` crate; the binary supplies +//! the `ProverContext` impl that wires storage and the Bitcoin client together. + +mod backend; +mod builder; +mod config; +mod errors; +mod input; +mod orchestrator; +mod proof_store; +mod queue; +mod traits; + +pub use backend::{ProofBackend, ProofHost}; +pub use builder::ProverWorkerBuilder; +pub use config::{BackendConfig, OrchestratorConfig}; +pub use errors::{ProverError, ProverResult}; +pub use input::{InputBuilder, MohoPrerequisite}; +pub use orchestrator::ProofOrchestrator; +pub use traits::{AnchorStateReader, AuxDataReader, L1BlockProvider, ProverContext}; +// In `sp1` builds the native host path is compiled out, leaving the +// `zkaleido-native-adapter` dependency otherwise unused; this keeps the +// `unused_crate_dependencies` lint satisfied. +#[cfg(feature = "sp1")] +use zkaleido_native_adapter as _; diff --git a/bin/asm-runner/src/prover/orchestrator.rs b/crates/extensions/prover/worker/src/orchestrator.rs similarity index 81% rename from bin/asm-runner/src/prover/orchestrator.rs rename to crates/extensions/prover/worker/src/orchestrator.rs index d6fbcc20..beb9cdc3 100644 --- a/bin/asm-runner/src/prover/orchestrator.rs +++ b/crates/extensions/prover/worker/src/orchestrator.rs @@ -7,43 +7,53 @@ use anyhow::{Context, Result}; use async_trait::async_trait; use moho_recursive_proof::MohoRecursiveProgram; -use strata_asm_proof_db::{RemoteProofMappingDb, RemoteProofStatusDb, SledProofDb}; use strata_asm_proof_impl::program::AsmStfProofProgram; -use strata_asm_proof_types::{ProofId, RemoteProofId}; +use strata_asm_prover_types::{L1Range, ProofId, RemoteProofId}; +use strata_asm_worker::Subscription; +use strata_identifiers::L1BlockCommitment; use strata_tasks::ShutdownGuard; -use tokio::{sync::mpsc, time}; +use tokio::{sync::mpsc::error::TryRecvError, time}; use tracing::{debug, error, info, warn}; use zkaleido::{RemoteProofStatus, ZkVmRemoteHost, ZkVmRemoteProgram}; -use super::{ - config::OrchestratorConfig, input::InputBuilder, proof_store, queue::PendingProofQueue, +use crate::{ + ProverContext, config::OrchestratorConfig, input::InputBuilder, proof_store, + queue::PendingProofQueue, }; /// Orchestrates remote proof generation for ASM and Moho proofs. -pub(crate) struct ProofOrchestrator { - db: SledProofDb, +/// +/// The orchestrator's only input is the ASM worker's commit stream: each +/// [`L1BlockCommitment`] arrives *after* the block has been durably stored, and +/// the orchestrator expands it into the ASM step proof and the Moho recursive +/// proof that block requires (see [`drain_committed_blocks`]). +/// +/// [`drain_committed_blocks`]: ProofOrchestrator::drain_committed_blocks +#[derive(Debug)] +pub struct ProofOrchestrator { + ctx: C, queue: PendingProofQueue, - rx: mpsc::UnboundedReceiver, - asm: Host, - moho: Host, + subscription: Subscription, + asm: H, + moho: H, config: OrchestratorConfig, input_builder: InputBuilder, } -impl ProofOrchestrator { +impl ProofOrchestrator { /// Creates a new orchestrator. - pub(crate) fn new( - db: SledProofDb, - asm: R, - moho: R, + pub fn new( + ctx: C, + asm: H, + moho: H, config: OrchestratorConfig, input_builder: InputBuilder, - rx: mpsc::UnboundedReceiver, + subscription: Subscription, ) -> Self { Self { - db, + ctx, queue: PendingProofQueue::new(), - rx, + subscription, asm, moho, config, @@ -51,10 +61,15 @@ impl ProofOrchestrator { } } - /// Runs the orchestrator loop until shutdown is requested or the channel is closed. - pub(crate) async fn run(&mut self, shutdown: ShutdownGuard) -> Result<()> { + /// Runs the orchestrator loop until shutdown is requested or the ASM + /// worker's commit stream closes. + pub async fn run(&mut self, shutdown: ShutdownGuard) -> Result<()> { info!("proof orchestrator started"); loop { + // Pull every block committed since the last tick and expand each into + // its proof requests before doing the periodic reconcile/schedule work. + let closed = self.drain_committed_blocks(); + if let Err(e) = self.tick().await { error!(?e, "orchestrator tick failed"); } @@ -64,9 +79,9 @@ impl ProofOrchestrator { return Ok(()); } - // Exit once the sender side has been dropped (shutdown) and there is - // nothing left to process. - if self.rx.is_closed() && self.queue.is_empty() { + // The ASM worker dropped the commit stream (shutdown) and there is + // nothing left to prove. + if closed && self.queue.is_empty() { info!("proof orchestrator shutting down"); return Ok(()); } @@ -81,18 +96,29 @@ impl ProofOrchestrator { } } - /// Drains incoming proof requests from the channel into the pending queue. - fn drain_incoming(&mut self) { - while let Ok(id) = self.rx.try_recv() { - debug!(?id, "received proof request"); - self.queue.enqueue(id); + /// Drains committed-block notifications from the ASM worker into the pending + /// queue, expanding each block into the ASM step proof and the Moho + /// recursive proof it requires. + /// + /// Returns `true` once the subscription has closed — the ASM worker shut + /// down and the backlog is drained — so the caller can exit after the queue + /// empties. + fn drain_committed_blocks(&mut self) -> bool { + loop { + match self.subscription.try_recv() { + Ok(block) => { + debug!(%block, "ASM worker committed block, enqueuing proofs"); + self.queue.enqueue(ProofId::Asm(L1Range::single(block))); + self.queue.enqueue(ProofId::Moho(block)); + } + Err(TryRecvError::Empty) => return false, + Err(TryRecvError::Disconnected) => return true, + } } } /// Executes one orchestration cycle. async fn tick(&mut self) -> Result<()> { - self.drain_incoming(); - if !self.queue.is_empty() { debug!(pending = self.queue.len(), "orchestrator tick"); } @@ -107,7 +133,7 @@ impl ProofOrchestrator { /// Polls all in-progress remote proofs and stores any that have completed. async fn reconcile_active_proofs(&mut self) -> Result<()> { let in_progress = self - .db + .ctx .get_all_in_progress() .await .context("failed to query in-progress proofs")?; @@ -126,11 +152,11 @@ impl ProofOrchestrator { remote_id: &RemoteProofId, old_status: &RemoteProofStatus, ) -> Result<()> { - let typed_id = to_typed_proof_id::(remote_id)?; + let typed_id = to_typed_proof_id::(remote_id)?; // NOTE: We use `self.asm` here but this could be any `ZkVmRemoteHost` instance. // `get_status` only requires a network client and proof ID — not the ELF or - // proving key. Since the orchestrator is generic over a single `R: ZkVmRemoteHost`, + // proving key. Since the orchestrator is generic over a single `H: ZkVmRemoteHost`, // both `asm` and `moho` share the same concrete type, so either works. let new_status = self .asm @@ -155,13 +181,13 @@ impl ProofOrchestrator { } RemoteProofStatus::Failed(reason) => { error!(?remote_id, %reason, "remote proof generation failed"); - self.db + self.ctx .remove(remote_id) .await .context("failed to remove failed proof status")?; } _ => { - self.db + self.ctx .update_status(remote_id, new_status) .await .context("failed to update proof status")?; @@ -170,15 +196,15 @@ impl ProofOrchestrator { Ok(()) } - /// Retrieves a completed proof and stores it in the proof DB. + /// Retrieves a completed proof and stores it in the proof store. async fn handle_completed( &self, remote_id: &RemoteProofId, - typed_id: &R::ProofId, + typed_id: &H::ProofId, ) -> Result<()> { // NOTE: We use `self.asm` here but this could be any `ZkVmRemoteHost` instance. // `get_proof` only requires a network client and proof ID — not the ELF or - // proving key. Since the orchestrator is generic over a single `R: ZkVmRemoteHost`, + // proving key. Since the orchestrator is generic over a single `H: ZkVmRemoteHost`, // both `asm` and `moho` share the same concrete type, so either works. let receipt = self .asm @@ -187,15 +213,15 @@ impl ProofOrchestrator { .map_err(|e| anyhow::anyhow!("failed to retrieve completed proof: {e}"))?; let proof_id = self - .db + .ctx .get_proof_id(remote_id) .await .context("failed to look up proof ID from remote ID")? .context("no mapping found for completed remote proof")?; - proof_store::store_completed_proof(&self.db, proof_id, receipt).await?; + proof_store::store_completed_proof(&self.ctx, proof_id, receipt).await?; - self.db + self.ctx .remove(remote_id) .await .context("failed to remove completed proof status")?; @@ -213,7 +239,7 @@ impl ProofOrchestrator { /// unit-tested with a fake submitter. async fn schedule_proofs(&mut self) -> Result<()> { let in_flight = self - .db + .ctx .get_all_in_progress() .await .context("failed to query in-progress proofs")? @@ -225,7 +251,7 @@ impl ProofOrchestrator { } let mut submitter = OrchestratorSubmitter { - db: &self.db, + ctx: &self.ctx, asm: &self.asm, moho: &self.moho, input_builder: &self.input_builder, @@ -256,7 +282,7 @@ enum SubmitOutcome { /// return non-`Send` futures to accommodate backends whose clients hold /// non-`Send` state across `.await`. Awaiting them here transitively makes /// `try_submit` non-`Send`. This is fine because the orchestrator is driven -/// from a `LocalSet` (see `bootstrap.rs`). +/// from a `LocalSet` (see the runner's `bootstrap`). #[async_trait(?Send)] trait ProofSubmitter { async fn try_submit(&mut self, proof_id: ProofId) -> Result; @@ -300,22 +326,22 @@ async fn schedule_with( } } -/// [`ProofSubmitter`] backed by the orchestrator's DB, hosts, and input +/// [`ProofSubmitter`] backed by the orchestrator's context, hosts, and input /// builder. Constructed inline by [`ProofOrchestrator::schedule_proofs`] for /// the duration of one scheduling cycle. -struct OrchestratorSubmitter<'a, R: ZkVmRemoteHost> { - db: &'a SledProofDb, - asm: &'a R, - moho: &'a R, +struct OrchestratorSubmitter<'a, C: ProverContext, H: ZkVmRemoteHost> { + ctx: &'a C, + asm: &'a H, + moho: &'a H, input_builder: &'a InputBuilder, } #[async_trait(?Send)] -impl ProofSubmitter for OrchestratorSubmitter<'_, R> { +impl ProofSubmitter for OrchestratorSubmitter<'_, C, H> { async fn try_submit(&mut self, proof_id: ProofId) -> Result { // Skip if already submitted. if self - .db + .ctx .get_remote_proof_id(proof_id) .await .context("failed to check remote proof mapping")? @@ -326,7 +352,7 @@ impl ProofSubmitter for OrchestratorSubmitter<'_, R> { } // Skip if proof already exists locally. - if proof_store::proof_exists(self.db, &proof_id).await? { + if proof_store::proof_exists(self.ctx, &proof_id).await? { debug!(?proof_id, "proof already exists, skipping"); return Ok(SubmitOutcome::Skipped); } @@ -334,13 +360,20 @@ impl ProofSubmitter for OrchestratorSubmitter<'_, R> { // Build input and submit to remote prover, dispatching by proof type. let typed_id = match &proof_id { ProofId::Asm(range) => { - let runtime_input = self.input_builder.build_asm_runtime_input(range).await?; + let runtime_input = self + .input_builder + .build_asm_runtime_input(self.ctx, range) + .await?; AsmStfProofProgram::start_proving(&runtime_input, self.asm) .await .map_err(|e| anyhow::anyhow!("failed to submit proof to remote prover: {e}"))? } ProofId::Moho(block) => { - let prerequisite = match self.input_builder.check_moho_prerequisite(*block).await { + let prerequisite = match self + .input_builder + .check_moho_prerequisite(self.ctx, *block) + .await + { Ok(prereq) => prereq, Err(e) => { debug!(?proof_id, %e, "moho prerequisite not ready, deferring"); @@ -349,7 +382,7 @@ impl ProofSubmitter for OrchestratorSubmitter<'_, R> { }; let input = self .input_builder - .build_moho_runtime_input(prerequisite, *block) + .build_moho_runtime_input(self.ctx, prerequisite, *block) .await?; MohoRecursiveProgram::start_proving(&input, self.moho) .await @@ -361,12 +394,12 @@ impl ProofSubmitter for OrchestratorSubmitter<'_, R> { info!(?proof_id, %typed_id, "proof submitted to remote prover"); // Store mapping and initial status. - self.db + self.ctx .put_remote_proof_id(proof_id, remote_id.clone()) .await .context("failed to store proof mapping")?; - self.db + self.ctx .put_status(&remote_id, RemoteProofStatus::Requested) .await .context("failed to store initial proof status")?; @@ -376,8 +409,8 @@ impl ProofSubmitter for OrchestratorSubmitter<'_, R> { } /// Converts a persisted [`RemoteProofId`] back into the host's typed proof ID. -fn to_typed_proof_id(remote_id: &RemoteProofId) -> Result { - R::ProofId::try_from(remote_id.0.clone()) +fn to_typed_proof_id(remote_id: &RemoteProofId) -> Result { + H::ProofId::try_from(remote_id.0.clone()) .map_err(|_| anyhow::anyhow!("failed to decode remote proof ID")) } @@ -385,7 +418,7 @@ fn to_typed_proof_id(remote_id: &RemoteProofId) -> Result Result { +use crate::ProverContext; + +/// Returns `true` if the proof already exists in the local proof store. +pub(crate) async fn proof_exists(ctx: &C, proof_id: &ProofId) -> Result { match proof_id { ProofId::Asm(range) => { - let exists = db + let exists = ctx .get_asm_proof(*range) .await .context("failed to check ASM proof")? @@ -21,7 +23,7 @@ pub(crate) async fn proof_exists(db: &SledProofDb, proof_id: &ProofId) -> Result Ok(exists) } ProofId::Moho(commitment) => { - let exists = db + let exists = ctx .get_moho_proof(*commitment) .await .context("failed to check Moho proof")? @@ -31,22 +33,22 @@ pub(crate) async fn proof_exists(db: &SledProofDb, proof_id: &ProofId) -> Result } } -/// Stores a completed proof receipt in the appropriate DB table. -pub(crate) async fn store_completed_proof( - db: &SledProofDb, +/// Stores a completed proof receipt in the appropriate proof-store table. +pub(crate) async fn store_completed_proof( + ctx: &C, proof_id: ProofId, receipt: ProofReceiptWithMetadata, ) -> Result<()> { match proof_id { ProofId::Asm(range) => { info!(?range, "storing completed ASM proof"); - db.store_asm_proof(range, AsmProof(receipt)) + ctx.store_asm_proof(range, AsmProof(receipt)) .await .context("failed to store ASM proof")?; } ProofId::Moho(commitment) => { info!(?commitment, "storing completed Moho proof"); - db.store_moho_proof(commitment, MohoProof(receipt)) + ctx.store_moho_proof(commitment, MohoProof(receipt)) .await .context("failed to store Moho proof")?; } diff --git a/bin/asm-runner/src/prover/queue.rs b/crates/extensions/prover/worker/src/queue.rs similarity index 97% rename from bin/asm-runner/src/prover/queue.rs rename to crates/extensions/prover/worker/src/queue.rs index 4a0b20d8..6507fb17 100644 --- a/bin/asm-runner/src/prover/queue.rs +++ b/crates/extensions/prover/worker/src/queue.rs @@ -7,7 +7,7 @@ use std::collections::BTreeSet; -use strata_asm_proof_types::ProofId; +use strata_asm_prover_types::ProofId; /// In-memory queue of proofs awaiting generation. /// @@ -50,7 +50,7 @@ impl PendingProofQueue { #[cfg(test)] mod tests { - use strata_asm_proof_types::L1Range; + use strata_asm_prover_types::L1Range; use strata_identifiers::{L1BlockCommitment, L1BlockId}; use super::*; diff --git a/crates/extensions/prover/worker/src/traits.rs b/crates/extensions/prover/worker/src/traits.rs new file mode 100644 index 00000000..ff0f65e0 --- /dev/null +++ b/crates/extensions/prover/worker/src/traits.rs @@ -0,0 +1,88 @@ +//! Traits the prover worker uses to interface with the underlying system. +//! +//! The orchestrator's dependencies split into concerns, each backed by a +//! distinct subsystem in production: +//! +//! - Proof persistence and remote-job tracking — reused verbatim from `strata-asm-prover-storage` +//! ([`ProofDb`], [`RemoteProofMappingDb`], [`RemoteProofStatusDb`], [`MohoStateDb`]). +//! - [`AnchorStateReader`] — reads persisted ASM anchor states. +//! - [`AuxDataReader`] — reads per-block auxiliary data captured during STF execution. +//! - [`L1BlockProvider`] — reads L1 blocks/headers from the Bitcoin source. +//! +//! [`ProverContext`] is the umbrella that combines all of them. It has a blanket +//! impl, so an implementor just implements the concern traits and gets +//! `ProverContext` for free — mirroring +//! [`WorkerContext`](https://docs.rs/strata-asm-worker) in the ASM worker. + +use std::error::Error as StdError; + +use bitcoin::{Block, block::Header}; +use strata_asm_common::{AnchorState, AuxData}; +use strata_asm_prover_storage::{MohoStateDb, ProofDb, RemoteProofMappingDb, RemoteProofStatusDb}; +use strata_identifiers::{L1BlockCommitment, L1BlockId}; + +/// Reads the persisted ASM anchor state for a given L1 block. +pub trait AnchorStateReader { + /// Fetches the [`AnchorState`] for the block at `blockid`. + /// + /// Errors if the state is missing — the orchestrator only requests proofs + /// for blocks the ASM worker has already processed. + fn get_anchor_state(&self, blockid: &L1BlockCommitment) -> anyhow::Result; +} + +/// Reads per-block auxiliary data captured during STF execution. +pub trait AuxDataReader { + /// Fetches the [`AuxData`] stored for the block at `blockid`. + fn get_aux_data(&self, blockid: &L1BlockCommitment) -> anyhow::Result; +} + +/// Fetches L1 blocks and headers from the backing Bitcoin source. +/// +/// Async because the backing client is async, and the orchestrator drives this +/// from a single-threaded runtime where blocking on the client would deadlock. +pub trait L1BlockProvider { + /// Fetches the full Bitcoin [`Block`] for `blockid`. + fn get_l1_block(&self, blockid: &L1BlockId) -> impl Future>; + + /// Fetches just the [`Header`] for `blockid`. + /// + /// Used to resolve a block's parent commitment (`prev_blockhash`) without + /// pulling the full transaction data. + fn get_l1_block_header( + &self, + blockid: &L1BlockId, + ) -> impl Future>; +} + +/// Umbrella context the [`ProofOrchestrator`](crate::ProofOrchestrator) runs +/// against. +/// +/// Combines proof persistence and remote-job tracking (reused from +/// `strata-asm-prover-storage`), Moho-state reads, and the chain/state reads the +/// [`InputBuilder`](crate::InputBuilder) needs. The blanket impl means any type +/// that implements all of the concern traits automatically implements +/// `ProverContext`, so implementors never name it directly. +/// +/// The associated `Error` bounds let the orchestrator surface storage failures +/// through `anyhow::Context`; every concrete backend already satisfies them. +pub trait ProverContext: + ProofDb + + RemoteProofMappingDb + + RemoteProofStatusDb + + MohoStateDb + + AnchorStateReader + + AuxDataReader + + L1BlockProvider +{ +} + +impl ProverContext for T where + T: ProofDb + + RemoteProofMappingDb + + RemoteProofStatusDb + + MohoStateDb + + AnchorStateReader + + AuxDataReader + + L1BlockProvider +{ +} diff --git a/crates/rpc/Cargo.toml b/crates/rpc/Cargo.toml index 50152021..f6b7cd68 100644 --- a/crates/rpc/Cargo.toml +++ b/crates/rpc/Cargo.toml @@ -7,10 +7,10 @@ version = "0.1.0" workspace = true [dependencies] -strata-asm-proof-types.workspace = true strata-asm-proto-bridge-v1.workspace = true strata-asm-proto-bridge-v1-types.workspace = true strata-asm-proto-checkpoint-types.workspace = true +strata-asm-prover-types.workspace = true strata-asm-worker.workspace = true bitcoin.workspace = true diff --git a/crates/rpc/src/traits.rs b/crates/rpc/src/traits.rs index 86ee654a..8c9b7126 100644 --- a/crates/rpc/src/traits.rs +++ b/crates/rpc/src/traits.rs @@ -2,10 +2,10 @@ use bitcoin::BlockHash; use jsonrpsee::{core::RpcResult, proc_macros::rpc}; -use strata_asm_proof_types::{AsmProof, MohoProof}; use strata_asm_proto_bridge_v1::{AssignmentEntry, DepositEntry}; use strata_asm_proto_bridge_v1_types::SafeHarbour; use strata_asm_proto_checkpoint_types::CheckpointTip; +use strata_asm_prover_types::{AsmProof, MohoProof}; use strata_asm_worker::{AsmState, AsmWorkerStatus}; /// Control-plane ASM RPCs: liveness and overall worker status. diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 86b11a2b..175a99eb 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -19,8 +19,10 @@ strata-tasks.workspace = true anyhow.workspace = true bitcoin.workspace = true borsh.workspace = true +futures.workspace = true serde.workspace = true thiserror.workspace = true +tokio.workspace = true tracing.workspace = true [dev-dependencies] @@ -31,4 +33,3 @@ strata-asm-params = { workspace = true, features = ["arbitrary"] } bitcoind-async-client.workspace = true corepc-node.workspace = true -tokio.workspace = true diff --git a/crates/worker/src/builder.rs b/crates/worker/src/builder.rs index 55b2356a..43b24148 100644 --- a/crates/worker/src/builder.rs +++ b/crates/worker/src/builder.rs @@ -4,7 +4,7 @@ use strata_tasks::TaskExecutor; use crate::{ constants, errors::WorkerError, handle::AsmWorkerHandle, service::AsmWorkerService, - state::AsmWorkerServiceState, traits::WorkerContext, + state::AsmWorkerServiceState, subscription::AsmSubscribers, traits::WorkerContext, }; /// Builder for constructing and launching an ASM worker service. @@ -74,8 +74,13 @@ impl AsmWorkerBuilder { .ok_or(WorkerError::MissingDependency("params"))?; let spec = self.spec.ok_or(WorkerError::MissingDependency("spec"))?; + // Shared between the service state (which emits) and the handle (which + // hands out subscriptions), so a `subscribe_blocks()` on the handle + // registers into the same list the service fans out to. + let subscribers = AsmSubscribers::default(); + // Create the service state. - let service_state = AsmWorkerServiceState::new(context, spec, params)?; + let service_state = AsmWorkerServiceState::new(context, spec, params, subscribers.clone())?; // Create the service builder and get command handle. let mut service_builder = @@ -88,7 +93,7 @@ impl AsmWorkerBuilder { let service_monitor = service_builder.launch_sync(constants::SERVICE_NAME, executor)?; // Create and return the handle. - let handle = AsmWorkerHandle::new(command_handle, service_monitor); + let handle = AsmWorkerHandle::new(command_handle, service_monitor, subscribers); Ok(handle) } diff --git a/crates/worker/src/handle.rs b/crates/worker/src/handle.rs index 1786b000..5e65da15 100644 --- a/crates/worker/src/handle.rs +++ b/crates/worker/src/handle.rs @@ -3,27 +3,46 @@ use strata_identifiers::L1BlockCommitment; use strata_service::{CommandHandle, ServiceError, ServiceMonitor}; -use crate::{AsmWorkerStatus, WorkerError, message::AsmWorkerMessage}; +use crate::{ + AsmWorkerStatus, Subscription, WorkerError, message::AsmWorkerMessage, + subscription::AsmSubscribers, +}; /// Handle for interacting with the ASM worker service. #[derive(Debug)] pub struct AsmWorkerHandle { command_handle: CommandHandle, monitor: ServiceMonitor, + subscribers: AsmSubscribers, } impl AsmWorkerHandle { /// Create a new ASM worker handle from a service command handle. - pub fn new( + /// + /// `subscribers` is the same registry the service state emits into, so + /// handles created here can hand out [`Subscription`]s wired to the worker. + pub(crate) fn new( command_handle: CommandHandle, monitor: ServiceMonitor, + subscribers: AsmSubscribers, ) -> Self { Self { command_handle, monitor, + subscribers, } } + /// Subscribes to per-block notifications. + /// + /// Returns a [`Subscription`] that yields each [`L1BlockCommitment`] the + /// worker commits, starting from the next commit after this call. There is + /// no replay: register before the worker begins processing the blocks you + /// care about (the bootstrap order enforces this). + pub fn subscribe_blocks(&self) -> Subscription { + self.subscribers.subscribe() + } + /// Sends an L1 block to the ASM service and waits for processing to complete. pub fn submit_block(&self, block: L1BlockCommitment) -> anyhow::Result<()> { self.command_handle diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index a597c4c6..e24cea39 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -12,6 +12,7 @@ mod handle; mod message; mod service; mod state; +mod subscription; mod traits; pub use asm_state::AsmState; @@ -22,4 +23,5 @@ pub use handle::AsmWorkerHandle; pub use message::{AsmWorkerMessage, SubprotocolMessage}; pub use service::{AsmWorkerService, AsmWorkerStatus}; pub use state::AsmWorkerServiceState; +pub use subscription::Subscription; pub use traits::{AnchorStateStore, AuxDataStore, L1DataProvider, ManifestMmrStore, WorkerContext}; diff --git a/crates/worker/src/service.rs b/crates/worker/src/service.rs index 06767010..a98ad32b 100644 --- a/crates/worker/src/service.rs +++ b/crates/worker/src/service.rs @@ -152,6 +152,11 @@ where state.context.store_anchor_state(block_id, &new_state)?; state.update_anchor_state(new_state, *block_id); + // Notify subscribers only after the anchor is durably committed, so any + // consumer that reads `AsmStateDb` for this commitment is guaranteed a + // hit. Non-blocking: an unbounded fan-out, never awaited. + state.subscribers.emit(*block_id); + info!(%block_id, %height, "ASM transition complete, manifest and state stored"); } // transition_span drops here diff --git a/crates/worker/src/state.rs b/crates/worker/src/state.rs index 4c98871a..a8227705 100644 --- a/crates/worker/src/state.rs +++ b/crates/worker/src/state.rs @@ -8,6 +8,7 @@ use tracing::field::Empty; use crate::{ AsmState, WorkerContext, WorkerError, WorkerResult, aux_resolver::AuxDataResolver, constants, + subscription::AsmSubscribers, }; /// Service state for the ASM worker. @@ -33,6 +34,11 @@ pub struct AsmWorkerServiceState { /// sentinels for heights `0..=genesis_height`, so this is the height just /// below the first real manifest. pub(crate) genesis_height: u64, + + /// Registry of ASM-commit subscribers. After each successful anchor commit + /// the service fans the new commitment out to these; see + /// [`crate::AsmWorkerHandle::subscribe_blocks`]. + pub(crate) subscribers: AsmSubscribers, } impl AsmWorkerServiceState @@ -42,7 +48,15 @@ where S::Params: Send + Sync + 'static, { /// Creates a new service state, loading the latest anchor or creating genesis. - pub fn new(context: W, spec: S, params: S::Params) -> WorkerResult { + /// + /// Construction goes through [`crate::AsmWorkerBuilder`], which owns the + /// shared [`AsmSubscribers`] registry — hence `pub(crate)`. + pub(crate) fn new( + context: W, + spec: S, + params: S::Params, + subscribers: AsmSubscribers, + ) -> WorkerResult { let genesis_height = spec.genesis_l1_height(¶ms); // Align the manifest MMR with L1 heights before processing any block: @@ -70,6 +84,7 @@ where anchor, blkid, genesis_height, + subscribers, }) } @@ -195,8 +210,13 @@ mod tests { // 3. Set worker context and initialize service state let context = MockWorkerContext::new(); - let service_state = AsmWorkerServiceState::new(context.clone(), StrataAsmSpec, asm_params) - .expect("Failed to create service state"); + let service_state = AsmWorkerServiceState::new( + context.clone(), + StrataAsmSpec, + asm_params, + AsmSubscribers::default(), + ) + .expect("Failed to create service state"); println!("Service initialized with genesis at height 101"); diff --git a/crates/worker/src/subscription.rs b/crates/worker/src/subscription.rs new file mode 100644 index 00000000..048e360e --- /dev/null +++ b/crates/worker/src/subscription.rs @@ -0,0 +1,187 @@ +//! Per-block notification stream for the ASM worker. +//! +//! After every successful anchor-state commit, the worker fans the new +//! [`L1BlockCommitment`] out to all live subscribers over unbounded channels. +//! Consumers run on their own tasks and react to whatever block sequence the +//! worker commits — including any future reorg re-emission — without sitting in +//! the worker's hot path. +//! +//! `send` on an unbounded channel is one allocation plus an atomic, so the +//! worker never awaits a consumer: it fans out and returns to the next block. +//! The trade-off is that a stuck consumer is a memory leak; [`Subscription::backlog`] +//! exposes the queue depth so consumers (or alerting) can notice. + +use std::{ + pin::Pin, + sync::{Arc, Mutex}, + task::{Context, Poll}, +}; + +use futures::Stream; +use strata_identifiers::L1BlockCommitment; +use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender, error::TryRecvError}; + +/// A live stream of items emitted by the ASM worker, one per committed block. +/// +/// Implements [`Stream`] and also exposes [`Subscription::recv`] for use in a +/// `select!` loop. There is no replay buffer: a subscription only sees events +/// emitted after it was created (see [`AsmWorkerHandle::subscribe_blocks`]). +/// +/// [`AsmWorkerHandle::subscribe_blocks`]: crate::AsmWorkerHandle::subscribe_blocks +#[derive(Debug)] +pub struct Subscription { + rx: UnboundedReceiver, +} + +impl Subscription { + fn new(rx: UnboundedReceiver) -> Self { + Self { rx } + } + + /// Number of emitted items queued but not yet consumed. + /// + /// A persistently growing backlog means the consumer is falling behind the + /// worker; the channel is unbounded, so this is the only back-pressure signal. + pub fn backlog(&self) -> usize { + self.rx.len() + } + + /// Receives the next emitted item, or `None` once the worker has shut down + /// (every sender dropped). + pub async fn recv(&mut self) -> Option { + self.rx.recv().await + } + + /// Pulls the next already-queued item without awaiting. + /// + /// Returns [`TryRecvError::Empty`] when nothing is queued right now, and + /// [`TryRecvError::Disconnected`] once the worker has shut down and the + /// backlog is drained. Lets a periodically-ticking consumer drain everything + /// buffered at the top of each tick without parking on the channel. + pub fn try_recv(&mut self) -> Result { + self.rx.try_recv() + } +} + +impl Stream for Subscription { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.rx.poll_recv(cx) + } +} + +/// Producer-side registry of ASM-commit subscribers. +/// +/// Subscribers are notified only *after* the ASM worker has successfully +/// processed and durably committed a block — not when a raw L1 block arrives. +/// +/// Cloned so the service state (which emits) and the worker handle (which +/// registers new subscribers) share one list. Each entry is the sending half of +/// a [`Subscription`]'s channel; dead receivers are pruned lazily on the next +/// [`emit`](Self::emit). Mirrors `strata-bridge`'s `btc-tracker` subscriber pattern. +#[derive(Clone, Default, Debug)] +pub(crate) struct AsmSubscribers { + inner: Arc>>>, +} + +impl AsmSubscribers { + /// Registers a new subscriber and returns its [`Subscription`]. + pub(crate) fn subscribe(&self) -> Subscription { + let (tx, rx) = mpsc::unbounded_channel(); + self.inner + .lock() + .expect("subscribers lock poisoned") + .push(tx); + Subscription::new(rx) + } + + /// Fans a commitment out to every live subscriber, pruning any whose + /// receiver has been dropped. Never blocks: each `send` is an unbounded + /// enqueue. + pub(crate) fn emit(&self, block: L1BlockCommitment) { + self.inner + .lock() + .expect("subscribers lock poisoned") + .retain(|tx| tx.send(block).is_ok()); + } +} + +#[cfg(test)] +mod tests { + use strata_identifiers::L1BlockId; + + use super::*; + + fn commitment(height: u32) -> L1BlockCommitment { + L1BlockCommitment::new(height, L1BlockId::default()) + } + + #[tokio::test] + async fn emit_delivers_to_subscriber_in_order() { + let subs = AsmSubscribers::default(); + let mut sub = subs.subscribe(); + + subs.emit(commitment(1)); + subs.emit(commitment(2)); + + assert_eq!(sub.backlog(), 2); + assert_eq!(sub.recv().await, Some(commitment(1))); + assert_eq!(sub.recv().await, Some(commitment(2))); + assert_eq!(sub.backlog(), 0); + } + + #[tokio::test] + async fn fans_out_to_multiple_subscribers() { + let subs = AsmSubscribers::default(); + let mut a = subs.subscribe(); + let mut b = subs.subscribe(); + + subs.emit(commitment(7)); + + assert_eq!(a.recv().await, Some(commitment(7))); + assert_eq!(b.recv().await, Some(commitment(7))); + } + + #[tokio::test] + async fn dropped_subscriber_is_pruned_on_next_emit() { + let subs = AsmSubscribers::default(); + let live = subs.subscribe(); + let dead = subs.subscribe(); + + assert_eq!(subs.inner.lock().unwrap().len(), 2); + + drop(dead); + // The emit that follows the drop prunes the dead slot. + subs.emit(commitment(1)); + + assert_eq!(subs.inner.lock().unwrap().len(), 1); + drop(live); + } + + #[tokio::test] + async fn recv_returns_none_once_all_senders_dropped() { + let subs = AsmSubscribers::default(); + let mut sub = subs.subscribe(); + drop(subs); + assert_eq!(sub.recv().await, None); + } + + #[tokio::test] + async fn try_recv_drains_then_reports_empty_and_disconnected() { + let subs = AsmSubscribers::default(); + let mut sub = subs.subscribe(); + + subs.emit(commitment(1)); + subs.emit(commitment(2)); + + assert_eq!(sub.try_recv(), Ok(commitment(1))); + assert_eq!(sub.try_recv(), Ok(commitment(2))); + // Backlog drained but the producer is still live. + assert_eq!(sub.try_recv(), Err(TryRecvError::Empty)); + + // Once every sender is gone, a drained subscription reports disconnect. + drop(subs); + assert_eq!(sub.try_recv(), Err(TryRecvError::Disconnected)); + } +}