From 62febabf9baaf01c2d49cb1afd037173157c3dad Mon Sep 17 00:00:00 2001 From: Prajwol Gyawali Date: Tue, 2 Jun 2026 15:32:49 +0545 Subject: [PATCH 1/6] feat(worker): add per-block subscription stream Consumers of committed blocks (Moho-state derivation, and later proof requests) were wired into the ASM worker's hot path. Expose AsmWorkerHandle::subscribe_blocks(): after each anchor write the service fans the new L1BlockCommitment out to subscribers over unbounded channels and prunes dropped receivers. send on an unbounded channel is non-blocking, so the worker never awaits a consumer and stays off the critical path. Subscribers fire only after a successful ASM commit, not on raw block arrival -- hence AsmSubscribers. --- Cargo.lock | 1 + crates/worker/Cargo.toml | 3 +- crates/worker/src/builder.rs | 11 ++- crates/worker/src/handle.rs | 23 ++++- crates/worker/src/lib.rs | 2 + crates/worker/src/service.rs | 5 + crates/worker/src/state.rs | 26 ++++- crates/worker/src/subscription.rs | 159 ++++++++++++++++++++++++++++++ 8 files changed, 221 insertions(+), 9 deletions(-) create mode 100644 crates/worker/src/subscription.rs diff --git a/Cargo.lock b/Cargo.lock index 11a5b991..dd4522fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8155,6 +8155,7 @@ dependencies = [ "bitcoind-async-client", "borsh", "corepc-node", + "futures", "serde", "strata-asm-common", "strata-asm-params", diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 86b11a2b..175a99eb 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -19,8 +19,10 @@ strata-tasks.workspace = true anyhow.workspace = true bitcoin.workspace = true borsh.workspace = true +futures.workspace = true serde.workspace = true thiserror.workspace = true +tokio.workspace = true tracing.workspace = true [dev-dependencies] @@ -31,4 +33,3 @@ strata-asm-params = { workspace = true, features = ["arbitrary"] } bitcoind-async-client.workspace = true corepc-node.workspace = true -tokio.workspace = true diff --git a/crates/worker/src/builder.rs b/crates/worker/src/builder.rs index 55b2356a..43b24148 100644 --- a/crates/worker/src/builder.rs +++ b/crates/worker/src/builder.rs @@ -4,7 +4,7 @@ use strata_tasks::TaskExecutor; use crate::{ constants, errors::WorkerError, handle::AsmWorkerHandle, service::AsmWorkerService, - state::AsmWorkerServiceState, traits::WorkerContext, + state::AsmWorkerServiceState, subscription::AsmSubscribers, traits::WorkerContext, }; /// Builder for constructing and launching an ASM worker service. @@ -74,8 +74,13 @@ impl AsmWorkerBuilder { .ok_or(WorkerError::MissingDependency("params"))?; let spec = self.spec.ok_or(WorkerError::MissingDependency("spec"))?; + // Shared between the service state (which emits) and the handle (which + // hands out subscriptions), so a `subscribe_blocks()` on the handle + // registers into the same list the service fans out to. + let subscribers = AsmSubscribers::default(); + // Create the service state. - let service_state = AsmWorkerServiceState::new(context, spec, params)?; + let service_state = AsmWorkerServiceState::new(context, spec, params, subscribers.clone())?; // Create the service builder and get command handle. let mut service_builder = @@ -88,7 +93,7 @@ impl AsmWorkerBuilder { let service_monitor = service_builder.launch_sync(constants::SERVICE_NAME, executor)?; // Create and return the handle. - let handle = AsmWorkerHandle::new(command_handle, service_monitor); + let handle = AsmWorkerHandle::new(command_handle, service_monitor, subscribers); Ok(handle) } diff --git a/crates/worker/src/handle.rs b/crates/worker/src/handle.rs index 1786b000..5e65da15 100644 --- a/crates/worker/src/handle.rs +++ b/crates/worker/src/handle.rs @@ -3,27 +3,46 @@ use strata_identifiers::L1BlockCommitment; use strata_service::{CommandHandle, ServiceError, ServiceMonitor}; -use crate::{AsmWorkerStatus, WorkerError, message::AsmWorkerMessage}; +use crate::{ + AsmWorkerStatus, Subscription, WorkerError, message::AsmWorkerMessage, + subscription::AsmSubscribers, +}; /// Handle for interacting with the ASM worker service. #[derive(Debug)] pub struct AsmWorkerHandle { command_handle: CommandHandle, monitor: ServiceMonitor, + subscribers: AsmSubscribers, } impl AsmWorkerHandle { /// Create a new ASM worker handle from a service command handle. - pub fn new( + /// + /// `subscribers` is the same registry the service state emits into, so + /// handles created here can hand out [`Subscription`]s wired to the worker. + pub(crate) fn new( command_handle: CommandHandle, monitor: ServiceMonitor, + subscribers: AsmSubscribers, ) -> Self { Self { command_handle, monitor, + subscribers, } } + /// Subscribes to per-block notifications. + /// + /// Returns a [`Subscription`] that yields each [`L1BlockCommitment`] the + /// worker commits, starting from the next commit after this call. There is + /// no replay: register before the worker begins processing the blocks you + /// care about (the bootstrap order enforces this). + pub fn subscribe_blocks(&self) -> Subscription { + self.subscribers.subscribe() + } + /// Sends an L1 block to the ASM service and waits for processing to complete. pub fn submit_block(&self, block: L1BlockCommitment) -> anyhow::Result<()> { self.command_handle diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index a597c4c6..e24cea39 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -12,6 +12,7 @@ mod handle; mod message; mod service; mod state; +mod subscription; mod traits; pub use asm_state::AsmState; @@ -22,4 +23,5 @@ pub use handle::AsmWorkerHandle; pub use message::{AsmWorkerMessage, SubprotocolMessage}; pub use service::{AsmWorkerService, AsmWorkerStatus}; pub use state::AsmWorkerServiceState; +pub use subscription::Subscription; pub use traits::{AnchorStateStore, AuxDataStore, L1DataProvider, ManifestMmrStore, WorkerContext}; diff --git a/crates/worker/src/service.rs b/crates/worker/src/service.rs index 06767010..a98ad32b 100644 --- a/crates/worker/src/service.rs +++ b/crates/worker/src/service.rs @@ -152,6 +152,11 @@ where state.context.store_anchor_state(block_id, &new_state)?; state.update_anchor_state(new_state, *block_id); + // Notify subscribers only after the anchor is durably committed, so any + // consumer that reads `AsmStateDb` for this commitment is guaranteed a + // hit. Non-blocking: an unbounded fan-out, never awaited. + state.subscribers.emit(*block_id); + info!(%block_id, %height, "ASM transition complete, manifest and state stored"); } // transition_span drops here diff --git a/crates/worker/src/state.rs b/crates/worker/src/state.rs index 4c98871a..a8227705 100644 --- a/crates/worker/src/state.rs +++ b/crates/worker/src/state.rs @@ -8,6 +8,7 @@ use tracing::field::Empty; use crate::{ AsmState, WorkerContext, WorkerError, WorkerResult, aux_resolver::AuxDataResolver, constants, + subscription::AsmSubscribers, }; /// Service state for the ASM worker. @@ -33,6 +34,11 @@ pub struct AsmWorkerServiceState { /// sentinels for heights `0..=genesis_height`, so this is the height just /// below the first real manifest. pub(crate) genesis_height: u64, + + /// Registry of ASM-commit subscribers. After each successful anchor commit + /// the service fans the new commitment out to these; see + /// [`crate::AsmWorkerHandle::subscribe_blocks`]. + pub(crate) subscribers: AsmSubscribers, } impl AsmWorkerServiceState @@ -42,7 +48,15 @@ where S::Params: Send + Sync + 'static, { /// Creates a new service state, loading the latest anchor or creating genesis. - pub fn new(context: W, spec: S, params: S::Params) -> WorkerResult { + /// + /// Construction goes through [`crate::AsmWorkerBuilder`], which owns the + /// shared [`AsmSubscribers`] registry — hence `pub(crate)`. + pub(crate) fn new( + context: W, + spec: S, + params: S::Params, + subscribers: AsmSubscribers, + ) -> WorkerResult { let genesis_height = spec.genesis_l1_height(¶ms); // Align the manifest MMR with L1 heights before processing any block: @@ -70,6 +84,7 @@ where anchor, blkid, genesis_height, + subscribers, }) } @@ -195,8 +210,13 @@ mod tests { // 3. Set worker context and initialize service state let context = MockWorkerContext::new(); - let service_state = AsmWorkerServiceState::new(context.clone(), StrataAsmSpec, asm_params) - .expect("Failed to create service state"); + let service_state = AsmWorkerServiceState::new( + context.clone(), + StrataAsmSpec, + asm_params, + AsmSubscribers::default(), + ) + .expect("Failed to create service state"); println!("Service initialized with genesis at height 101"); diff --git a/crates/worker/src/subscription.rs b/crates/worker/src/subscription.rs new file mode 100644 index 00000000..8353a7cd --- /dev/null +++ b/crates/worker/src/subscription.rs @@ -0,0 +1,159 @@ +//! Per-block notification stream for the ASM worker. +//! +//! After every successful anchor-state commit, the worker fans the new +//! [`L1BlockCommitment`] out to all live subscribers over unbounded channels. +//! Consumers run on their own tasks and react to whatever block sequence the +//! worker commits — including any future reorg re-emission — without sitting in +//! the worker's hot path. +//! +//! `send` on an unbounded channel is one allocation plus an atomic, so the +//! worker never awaits a consumer: it fans out and returns to the next block. +//! The trade-off is that a stuck consumer is a memory leak; [`Subscription::backlog`] +//! exposes the queue depth so consumers (or alerting) can notice. + +use std::{ + pin::Pin, + sync::{Arc, Mutex}, + task::{Context, Poll}, +}; + +use futures::Stream; +use strata_identifiers::L1BlockCommitment; +use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; + +/// A live stream of items emitted by the ASM worker, one per committed block. +/// +/// Implements [`Stream`] and also exposes [`Subscription::recv`] for use in a +/// `select!` loop. There is no replay buffer: a subscription only sees events +/// emitted after it was created (see [`AsmWorkerHandle::subscribe_blocks`]). +/// +/// [`AsmWorkerHandle::subscribe_blocks`]: crate::AsmWorkerHandle::subscribe_blocks +#[derive(Debug)] +pub struct Subscription { + rx: UnboundedReceiver, +} + +impl Subscription { + fn new(rx: UnboundedReceiver) -> Self { + Self { rx } + } + + /// Number of emitted items queued but not yet consumed. + /// + /// A persistently growing backlog means the consumer is falling behind the + /// worker; the channel is unbounded, so this is the only back-pressure signal. + pub fn backlog(&self) -> usize { + self.rx.len() + } + + /// Receives the next emitted item, or `None` once the worker has shut down + /// (every sender dropped). + pub async fn recv(&mut self) -> Option { + self.rx.recv().await + } +} + +impl Stream for Subscription { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.rx.poll_recv(cx) + } +} + +/// Producer-side registry of ASM-commit subscribers. +/// +/// Subscribers are notified only *after* the ASM worker has successfully +/// processed and durably committed a block — not when a raw L1 block arrives. +/// +/// Cloned so the service state (which emits) and the worker handle (which +/// registers new subscribers) share one list. Each entry is the sending half of +/// a [`Subscription`]'s channel; dead receivers are pruned lazily on the next +/// [`emit`](Self::emit). Mirrors `strata-bridge`'s `btc-tracker` subscriber pattern. +#[derive(Clone, Default, Debug)] +pub(crate) struct AsmSubscribers { + inner: Arc>>>, +} + +impl AsmSubscribers { + /// Registers a new subscriber and returns its [`Subscription`]. + pub(crate) fn subscribe(&self) -> Subscription { + let (tx, rx) = mpsc::unbounded_channel(); + self.inner + .lock() + .expect("subscribers lock poisoned") + .push(tx); + Subscription::new(rx) + } + + /// Fans a commitment out to every live subscriber, pruning any whose + /// receiver has been dropped. Never blocks: each `send` is an unbounded + /// enqueue. + pub(crate) fn emit(&self, block: L1BlockCommitment) { + self.inner + .lock() + .expect("subscribers lock poisoned") + .retain(|tx| tx.send(block).is_ok()); + } +} + +#[cfg(test)] +mod tests { + use strata_identifiers::L1BlockId; + + use super::*; + + fn commitment(height: u32) -> L1BlockCommitment { + L1BlockCommitment::new(height, L1BlockId::default()) + } + + #[tokio::test] + async fn emit_delivers_to_subscriber_in_order() { + let subs = AsmSubscribers::default(); + let mut sub = subs.subscribe(); + + subs.emit(commitment(1)); + subs.emit(commitment(2)); + + assert_eq!(sub.backlog(), 2); + assert_eq!(sub.recv().await, Some(commitment(1))); + assert_eq!(sub.recv().await, Some(commitment(2))); + assert_eq!(sub.backlog(), 0); + } + + #[tokio::test] + async fn fans_out_to_multiple_subscribers() { + let subs = AsmSubscribers::default(); + let mut a = subs.subscribe(); + let mut b = subs.subscribe(); + + subs.emit(commitment(7)); + + assert_eq!(a.recv().await, Some(commitment(7))); + assert_eq!(b.recv().await, Some(commitment(7))); + } + + #[tokio::test] + async fn dropped_subscriber_is_pruned_on_next_emit() { + let subs = AsmSubscribers::default(); + let live = subs.subscribe(); + let dead = subs.subscribe(); + + assert_eq!(subs.inner.lock().unwrap().len(), 2); + + drop(dead); + // The emit that follows the drop prunes the dead slot. + subs.emit(commitment(1)); + + assert_eq!(subs.inner.lock().unwrap().len(), 1); + drop(live); + } + + #[tokio::test] + async fn recv_returns_none_once_all_senders_dropped() { + let subs = AsmSubscribers::default(); + let mut sub = subs.subscribe(); + drop(subs); + assert_eq!(sub.recv().await, None); + } +} From 751f6fa8ef38e1235edbfcef964f15213b7f7c9e Mon Sep 17 00:00:00 2001 From: Prajwol Gyawali Date: Sun, 7 Jun 2026 14:41:43 +0545 Subject: [PATCH 2/6] refactor(prover): relocate proof-types crate under extensions/prover Begin grouping the prover stack under a single crates/extensions/prover tree. Move the proof types crate there and rename it strata-asm-prover-types so the naming matches the forthcoming prover-worker and prover-storage crates. Pure relocation and rename; no logic changes. --- Cargo.lock | 26 +++++++++---------- Cargo.toml | 4 +-- bin/asm-runner/Cargo.toml | 2 +- bin/asm-runner/src/block_watcher.rs | 2 +- bin/asm-runner/src/prover/input.rs | 2 +- bin/asm-runner/src/prover/orchestrator.rs | 4 +-- bin/asm-runner/src/prover/proof_store.rs | 2 +- bin/asm-runner/src/prover/queue.rs | 4 +-- bin/asm-runner/src/rpc_server.rs | 2 +- .../prover}/types/Cargo.toml | 2 +- .../prover}/types/src/lib.rs | 0 crates/proof/db/Cargo.toml | 2 +- crates/proof/db/src/lib.rs | 4 +-- crates/proof/db/src/proof_db.rs | 2 +- crates/proof/db/src/remote_mapping.rs | 2 +- crates/proof/db/src/remote_status.rs | 2 +- crates/proof/db/src/sled/mod.rs | 4 +-- crates/proof/db/src/sled/proof_db.rs | 2 +- crates/proof/db/src/sled/remote_mapping.rs | 4 +-- crates/proof/db/src/sled/remote_status.rs | 2 +- crates/rpc/Cargo.toml | 2 +- crates/rpc/src/traits.rs | 2 +- 22 files changed, 39 insertions(+), 39 deletions(-) rename crates/{proof => extensions/prover}/types/Cargo.toml (85%) rename crates/{proof => extensions/prover}/types/src/lib.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index dd4522fd..3ff6b8a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7736,7 +7736,7 @@ dependencies = [ "proptest", "sled", "ssz", - "strata-asm-proof-types", + "strata-asm-prover-types", "strata-identifiers", "strata-predicate", "tempfile", @@ -7771,16 +7771,6 @@ dependencies = [ "zkaleido-native-adapter", ] -[[package]] -name = "strata-asm-proof-types" -version = "0.1.0" -dependencies = [ - "borsh", - "serde", - "strata-identifiers", - "zkaleido", -] - [[package]] name = "strata-asm-proto-admin" version = "0.1.0" @@ -8025,16 +8015,26 @@ dependencies = [ "zkaleido-sp1-host", ] +[[package]] +name = "strata-asm-prover-types" +version = "0.1.0" +dependencies = [ + "borsh", + "serde", + "strata-identifiers", + "zkaleido", +] + [[package]] name = "strata-asm-rpc" version = "0.1.0" dependencies = [ "bitcoin", "jsonrpsee", - "strata-asm-proof-types", "strata-asm-proto-bridge-v1", "strata-asm-proto-bridge-v1-types", "strata-asm-proto-checkpoint-types", + "strata-asm-prover-types", "strata-asm-worker", ] @@ -8069,13 +8069,13 @@ dependencies = [ "strata-asm-params", "strata-asm-proof-db", "strata-asm-proof-impl", - "strata-asm-proof-types", "strata-asm-proto-bridge-v1", "strata-asm-proto-bridge-v1-txs", "strata-asm-proto-bridge-v1-types", "strata-asm-proto-checkpoint", "strata-asm-proto-checkpoint-txs", "strata-asm-proto-checkpoint-types", + "strata-asm-prover-types", "strata-asm-rpc", "strata-asm-spec", "strata-asm-worker", diff --git a/Cargo.toml b/Cargo.toml index fbbe00c0..94aa4409 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ members = [ "crates/params", "crates/proof/db", "crates/proof/statements", - "crates/proof/types", + "crates/extensions/prover/types", "crates/rpc", "crates/spec", "crates/spec-debug", @@ -64,7 +64,6 @@ strata-asm-manifest-types = { path = "crates/manifest-types" } strata-asm-params = { path = "crates/params" } strata-asm-proof-db = { path = "crates/proof/db" } strata-asm-proof-impl = { path = "crates/proof/statements" } -strata-asm-proof-types = { path = "crates/proof/types" } strata-asm-proto-admin = { path = "crates/subprotocols/admin/subprotocol" } strata-asm-proto-admin-txs = { path = "crates/subprotocols/admin/txs" } strata-asm-proto-bridge-v1 = { path = "crates/subprotocols/bridge-v1/subprotocol" } @@ -77,6 +76,7 @@ strata-asm-proto-checkpoint-txs = { path = "crates/subprotocols/checkpoint/txs" strata-asm-proto-checkpoint-types = { path = "crates/subprotocols/checkpoint/types" } strata-asm-proto-debug-v1 = { path = "crates/subprotocols/debug-v1/subprotocol" } strata-asm-proto-txs-test-utils = { path = "crates/subprotocols/txs-test-utils" } +strata-asm-prover-types = { path = "crates/extensions/prover/types" } strata-asm-rpc = { path = "crates/rpc" } strata-asm-spec = { path = "crates/spec" } strata-asm-spec-debug = { path = "crates/spec-debug" } diff --git a/bin/asm-runner/Cargo.toml b/bin/asm-runner/Cargo.toml index dda2c6d0..da9e3851 100644 --- a/bin/asm-runner/Cargo.toml +++ b/bin/asm-runner/Cargo.toml @@ -19,7 +19,7 @@ strata-asm-logs.workspace = true strata-asm-params.workspace = true strata-asm-proof-db.workspace = true strata-asm-proof-impl.workspace = true -strata-asm-proof-types.workspace = true +strata-asm-prover-types.workspace = true strata-asm-proto-bridge-v1.workspace = true strata-asm-proto-bridge-v1-txs.workspace = true strata-asm-proto-bridge-v1-types.workspace = true diff --git a/bin/asm-runner/src/block_watcher.rs b/bin/asm-runner/src/block_watcher.rs index 5d9d9c78..8d716468 100644 --- a/bin/asm-runner/src/block_watcher.rs +++ b/bin/asm-runner/src/block_watcher.rs @@ -15,7 +15,7 @@ use bitcoin::Block; use bitcoincore_zmq::{Message, SocketMessage, subscribe_async_wait_handshake}; use bitcoind_async_client::{Client, traits::Reader}; use futures::StreamExt; -use strata_asm_proof_types::{L1Range, ProofId}; +use strata_asm_prover_types::{L1Range, ProofId}; use strata_asm_worker::AsmWorkerHandle; use strata_btc_types::BlockHashExt; use strata_identifiers::L1BlockCommitment; diff --git a/bin/asm-runner/src/prover/input.rs b/bin/asm-runner/src/prover/input.rs index 7d6e289c..4393463c 100644 --- a/bin/asm-runner/src/prover/input.rs +++ b/bin/asm-runner/src/prover/input.rs @@ -13,7 +13,7 @@ use moho_types::{MohoState, RecursiveMohoProof, StepMohoAttestation, StepMohoPro use ssz::{Decode, Encode}; use strata_asm_proof_db::{MohoStateDb, ProofDb, SledMohoStateDb, SledProofDb}; use strata_asm_proof_impl::moho_program::input::AsmStepInput; -use strata_asm_proof_types::L1Range; +use strata_asm_prover_types::L1Range; use strata_btc_types::{BlockHashExt, L1BlockIdBitcoinExt}; use strata_btc_verification::{self, TxidInclusionProof}; use strata_identifiers::L1BlockCommitment; diff --git a/bin/asm-runner/src/prover/orchestrator.rs b/bin/asm-runner/src/prover/orchestrator.rs index d6fbcc20..71136dda 100644 --- a/bin/asm-runner/src/prover/orchestrator.rs +++ b/bin/asm-runner/src/prover/orchestrator.rs @@ -9,7 +9,7 @@ use async_trait::async_trait; use moho_recursive_proof::MohoRecursiveProgram; use strata_asm_proof_db::{RemoteProofMappingDb, RemoteProofStatusDb, SledProofDb}; use strata_asm_proof_impl::program::AsmStfProofProgram; -use strata_asm_proof_types::{ProofId, RemoteProofId}; +use strata_asm_prover_types::{ProofId, RemoteProofId}; use strata_tasks::ShutdownGuard; use tokio::{sync::mpsc, time}; use tracing::{debug, error, info, warn}; @@ -385,7 +385,7 @@ fn to_typed_proof_id(remote_id: &RemoteProofId) -> Result L1BlockCommitment { #[cfg(test)] pub(crate) mod test_util { use proptest::{collection::vec, prelude::*}; - use strata_asm_proof_types::{AsmProof, L1Range, MohoProof}; + use strata_asm_prover_types::{AsmProof, L1Range, MohoProof}; use strata_identifiers::{Buf32, L1BlockCommitment, L1BlockId}; use zkaleido::{ ProgramId, Proof, ProofMetadata, ProofReceipt, ProofReceiptWithMetadata, ProofType, diff --git a/crates/proof/db/src/sled/proof_db.rs b/crates/proof/db/src/sled/proof_db.rs index ffbc3c03..c7012850 100644 --- a/crates/proof/db/src/sled/proof_db.rs +++ b/crates/proof/db/src/sled/proof_db.rs @@ -1,7 +1,7 @@ //! [`ProofDb`] implementation for [`SledProofDb`]. use borsh::BorshDeserialize; -use strata_asm_proof_types::{AsmProof, L1Range, MohoProof}; +use strata_asm_prover_types::{AsmProof, L1Range, MohoProof}; use strata_identifiers::L1BlockCommitment; use super::{SledProofDb, decode_moho_key, encode_asm_key, encode_moho_key}; diff --git a/crates/proof/db/src/sled/remote_mapping.rs b/crates/proof/db/src/sled/remote_mapping.rs index bd302549..2a9b7c37 100644 --- a/crates/proof/db/src/sled/remote_mapping.rs +++ b/crates/proof/db/src/sled/remote_mapping.rs @@ -3,7 +3,7 @@ use std::{error::Error, fmt}; use borsh::BorshDeserialize; -use strata_asm_proof_types::{ProofId, RemoteProofId}; +use strata_asm_prover_types::{ProofId, RemoteProofId}; use super::SledProofDb; use crate::RemoteProofMappingDb; @@ -112,7 +112,7 @@ mod tests { use std::collections::HashSet; use proptest::{collection::vec, prelude::*}; - use strata_asm_proof_types::ProofId; + use strata_asm_prover_types::ProofId; use tokio::runtime::Runtime; use super::*; diff --git a/crates/proof/db/src/sled/remote_status.rs b/crates/proof/db/src/sled/remote_status.rs index 2bae51a6..8b14ea66 100644 --- a/crates/proof/db/src/sled/remote_status.rs +++ b/crates/proof/db/src/sled/remote_status.rs @@ -3,7 +3,7 @@ use std::{error::Error, fmt}; use borsh::BorshDeserialize; -use strata_asm_proof_types::RemoteProofId; +use strata_asm_prover_types::RemoteProofId; use zkaleido::RemoteProofStatus; use super::SledProofDb; diff --git a/crates/rpc/Cargo.toml b/crates/rpc/Cargo.toml index 50152021..55498823 100644 --- a/crates/rpc/Cargo.toml +++ b/crates/rpc/Cargo.toml @@ -7,7 +7,7 @@ version = "0.1.0" workspace = true [dependencies] -strata-asm-proof-types.workspace = true +strata-asm-prover-types.workspace = true strata-asm-proto-bridge-v1.workspace = true strata-asm-proto-bridge-v1-types.workspace = true strata-asm-proto-checkpoint-types.workspace = true diff --git a/crates/rpc/src/traits.rs b/crates/rpc/src/traits.rs index 86ee654a..8c9b7126 100644 --- a/crates/rpc/src/traits.rs +++ b/crates/rpc/src/traits.rs @@ -2,10 +2,10 @@ use bitcoin::BlockHash; use jsonrpsee::{core::RpcResult, proc_macros::rpc}; -use strata_asm_proof_types::{AsmProof, MohoProof}; use strata_asm_proto_bridge_v1::{AssignmentEntry, DepositEntry}; use strata_asm_proto_bridge_v1_types::SafeHarbour; use strata_asm_proto_checkpoint_types::CheckpointTip; +use strata_asm_prover_types::{AsmProof, MohoProof}; use strata_asm_worker::{AsmState, AsmWorkerStatus}; /// Control-plane ASM RPCs: liveness and overall worker status. From 958108f7f91930d9ed7f8e3073bf617d54a87539 Mon Sep 17 00:00:00 2001 From: Prajwol Gyawali Date: Sun, 7 Jun 2026 14:42:57 +0545 Subject: [PATCH 3/6] refactor(prover): relocate proof-db crate as extensions/prover/storage Move the proof database crate alongside the relocated types crate and rename it strata-asm-prover-storage, framing it as the prover's storage layer (the analogue of asm-storage for the ASM worker). Pure relocation and rename; the storage traits and sled implementations are unchanged. --- Cargo.lock | 36 +++++++++---------- Cargo.toml | 4 +-- bin/asm-runner/Cargo.toml | 2 +- bin/asm-runner/src/bootstrap.rs | 2 +- bin/asm-runner/src/prover/input.rs | 2 +- bin/asm-runner/src/prover/orchestrator.rs | 2 +- bin/asm-runner/src/prover/proof_store.rs | 2 +- bin/asm-runner/src/rpc_server.rs | 2 +- bin/asm-runner/src/worker_context.rs | 2 +- .../prover/storage}/Cargo.toml | 2 +- .../prover/storage}/src/lib.rs | 0 .../prover/storage}/src/moho_state.rs | 0 .../prover/storage}/src/proof_db.rs | 0 .../prover/storage}/src/remote_mapping.rs | 0 .../prover/storage}/src/remote_status.rs | 0 .../prover/storage}/src/sled/mod.rs | 0 .../prover/storage}/src/sled/moho_state.rs | 0 .../prover/storage}/src/sled/proof_db.rs | 0 .../storage}/src/sled/remote_mapping.rs | 0 .../prover/storage}/src/sled/remote_status.rs | 0 20 files changed, 28 insertions(+), 28 deletions(-) rename crates/{proof/db => extensions/prover/storage}/Cargo.toml (92%) rename crates/{proof/db => extensions/prover/storage}/src/lib.rs (100%) rename crates/{proof/db => extensions/prover/storage}/src/moho_state.rs (100%) rename crates/{proof/db => extensions/prover/storage}/src/proof_db.rs (100%) rename crates/{proof/db => extensions/prover/storage}/src/remote_mapping.rs (100%) rename crates/{proof/db => extensions/prover/storage}/src/remote_status.rs (100%) rename crates/{proof/db => extensions/prover/storage}/src/sled/mod.rs (100%) rename crates/{proof/db => extensions/prover/storage}/src/sled/moho_state.rs (100%) rename crates/{proof/db => extensions/prover/storage}/src/sled/proof_db.rs (100%) rename crates/{proof/db => extensions/prover/storage}/src/sled/remote_mapping.rs (100%) rename crates/{proof/db => extensions/prover/storage}/src/sled/remote_status.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index 3ff6b8a2..9d79d7c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7727,23 +7727,6 @@ dependencies = [ "strata-predicate", ] -[[package]] -name = "strata-asm-proof-db" -version = "0.1.0" -dependencies = [ - "borsh", - "moho-types", - "proptest", - "sled", - "ssz", - "strata-asm-prover-types", - "strata-identifiers", - "strata-predicate", - "tempfile", - "tokio", - "zkaleido", -] - [[package]] name = "strata-asm-proof-impl" version = "0.1.0" @@ -8015,6 +7998,23 @@ dependencies = [ "zkaleido-sp1-host", ] +[[package]] +name = "strata-asm-prover-storage" +version = "0.1.0" +dependencies = [ + "borsh", + "moho-types", + "proptest", + "sled", + "ssz", + "strata-asm-prover-types", + "strata-identifiers", + "strata-predicate", + "tempfile", + "tokio", + "zkaleido", +] + [[package]] name = "strata-asm-prover-types" version = "0.1.0" @@ -8067,7 +8067,6 @@ dependencies = [ "strata-asm-common", "strata-asm-logs", "strata-asm-params", - "strata-asm-proof-db", "strata-asm-proof-impl", "strata-asm-proto-bridge-v1", "strata-asm-proto-bridge-v1-txs", @@ -8075,6 +8074,7 @@ dependencies = [ "strata-asm-proto-checkpoint", "strata-asm-proto-checkpoint-txs", "strata-asm-proto-checkpoint-types", + "strata-asm-prover-storage", "strata-asm-prover-types", "strata-asm-rpc", "strata-asm-spec", diff --git a/Cargo.toml b/Cargo.toml index 94aa4409..7504af37 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,8 +7,8 @@ members = [ "crates/logs", "crates/manifest-types", "crates/params", - "crates/proof/db", "crates/proof/statements", + "crates/extensions/prover/storage", "crates/extensions/prover/types", "crates/rpc", "crates/spec", @@ -62,7 +62,6 @@ strata-asm-common = { path = "crates/common" } strata-asm-logs = { path = "crates/logs" } strata-asm-manifest-types = { path = "crates/manifest-types" } strata-asm-params = { path = "crates/params" } -strata-asm-proof-db = { path = "crates/proof/db" } strata-asm-proof-impl = { path = "crates/proof/statements" } strata-asm-proto-admin = { path = "crates/subprotocols/admin/subprotocol" } strata-asm-proto-admin-txs = { path = "crates/subprotocols/admin/txs" } @@ -76,6 +75,7 @@ strata-asm-proto-checkpoint-txs = { path = "crates/subprotocols/checkpoint/txs" strata-asm-proto-checkpoint-types = { path = "crates/subprotocols/checkpoint/types" } strata-asm-proto-debug-v1 = { path = "crates/subprotocols/debug-v1/subprotocol" } strata-asm-proto-txs-test-utils = { path = "crates/subprotocols/txs-test-utils" } +strata-asm-prover-storage = { path = "crates/extensions/prover/storage" } strata-asm-prover-types = { path = "crates/extensions/prover/types" } strata-asm-rpc = { path = "crates/rpc" } strata-asm-spec = { path = "crates/spec" } diff --git a/bin/asm-runner/Cargo.toml b/bin/asm-runner/Cargo.toml index da9e3851..ca21927b 100644 --- a/bin/asm-runner/Cargo.toml +++ b/bin/asm-runner/Cargo.toml @@ -17,7 +17,7 @@ strata-btc-verification.workspace = true strata-asm-common.workspace = true strata-asm-logs.workspace = true strata-asm-params.workspace = true -strata-asm-proof-db.workspace = true +strata-asm-prover-storage.workspace = true strata-asm-proof-impl.workspace = true strata-asm-prover-types.workspace = true strata-asm-proto-bridge-v1.workspace = true diff --git a/bin/asm-runner/src/bootstrap.rs b/bin/asm-runner/src/bootstrap.rs index 43c8c5e4..6cbd20c5 100644 --- a/bin/asm-runner/src/bootstrap.rs +++ b/bin/asm-runner/src/bootstrap.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use anyhow::Result; use bitcoind_async_client::{Auth, Client}; use strata_asm_params::AsmParams; -use strata_asm_proof_db::{SledMohoStateDb, SledProofDb}; +use strata_asm_prover_storage::{SledMohoStateDb, SledProofDb}; use strata_asm_spec::StrataAsmSpec; use strata_asm_worker::AsmWorkerBuilder; use strata_tasks::TaskExecutor; diff --git a/bin/asm-runner/src/prover/input.rs b/bin/asm-runner/src/prover/input.rs index 4393463c..049bb997 100644 --- a/bin/asm-runner/src/prover/input.rs +++ b/bin/asm-runner/src/prover/input.rs @@ -11,8 +11,8 @@ use moho_recursive_proof::{MohoRecursiveInput, MohoRecursiveOutput}; use moho_runtime_impl::RuntimeInput; use moho_types::{MohoState, RecursiveMohoProof, StepMohoAttestation, StepMohoProof}; use ssz::{Decode, Encode}; -use strata_asm_proof_db::{MohoStateDb, ProofDb, SledMohoStateDb, SledProofDb}; use strata_asm_proof_impl::moho_program::input::AsmStepInput; +use strata_asm_prover_storage::{MohoStateDb, ProofDb, SledMohoStateDb, SledProofDb}; use strata_asm_prover_types::L1Range; use strata_btc_types::{BlockHashExt, L1BlockIdBitcoinExt}; use strata_btc_verification::{self, TxidInclusionProof}; diff --git a/bin/asm-runner/src/prover/orchestrator.rs b/bin/asm-runner/src/prover/orchestrator.rs index 71136dda..c40789b4 100644 --- a/bin/asm-runner/src/prover/orchestrator.rs +++ b/bin/asm-runner/src/prover/orchestrator.rs @@ -7,8 +7,8 @@ use anyhow::{Context, Result}; use async_trait::async_trait; use moho_recursive_proof::MohoRecursiveProgram; -use strata_asm_proof_db::{RemoteProofMappingDb, RemoteProofStatusDb, SledProofDb}; use strata_asm_proof_impl::program::AsmStfProofProgram; +use strata_asm_prover_storage::{RemoteProofMappingDb, RemoteProofStatusDb, SledProofDb}; use strata_asm_prover_types::{ProofId, RemoteProofId}; use strata_tasks::ShutdownGuard; use tokio::{sync::mpsc, time}; diff --git a/bin/asm-runner/src/prover/proof_store.rs b/bin/asm-runner/src/prover/proof_store.rs index 4dcbe9f8..f3280a72 100644 --- a/bin/asm-runner/src/prover/proof_store.rs +++ b/bin/asm-runner/src/prover/proof_store.rs @@ -4,7 +4,7 @@ //! orchestrator focused on coordination logic. use anyhow::{Context, Result}; -use strata_asm_proof_db::{ProofDb, SledProofDb}; +use strata_asm_prover_storage::{ProofDb, SledProofDb}; use strata_asm_prover_types::{AsmProof, MohoProof, ProofId}; use tracing::info; use zkaleido::ProofReceiptWithMetadata; diff --git a/bin/asm-runner/src/rpc_server.rs b/bin/asm-runner/src/rpc_server.rs index f398cecb..e5b04357 100644 --- a/bin/asm-runner/src/rpc_server.rs +++ b/bin/asm-runner/src/rpc_server.rs @@ -13,13 +13,13 @@ use jsonrpsee::{ types::{ErrorObject, ErrorObjectOwned}, }; use ssz::{Decode, Encode}; -use strata_asm_proof_db::{ProofDb, SledMohoStateDb, SledProofDb}; use strata_asm_proto_bridge_v1::{AssignmentEntry, BridgeV1State, DepositEntry}; use strata_asm_proto_bridge_v1_txs::BRIDGE_V1_SUBPROTOCOL_ID; use strata_asm_proto_bridge_v1_types::SafeHarbour; use strata_asm_proto_checkpoint::CheckpointState; use strata_asm_proto_checkpoint_txs::CHECKPOINT_SUBPROTOCOL_ID; use strata_asm_proto_checkpoint_types::CheckpointTip; +use strata_asm_prover_storage::{ProofDb, SledMohoStateDb, SledProofDb}; use strata_asm_prover_types::{AsmProof, L1Range, MohoProof}; use strata_asm_rpc::traits::{AsmControlApiServer, AsmProofApiServer, AsmStateApiServer}; use strata_asm_worker::{AsmState, AsmWorkerHandle, AsmWorkerStatus}; diff --git a/bin/asm-runner/src/worker_context.rs b/bin/asm-runner/src/worker_context.rs index dac16030..be2c7d80 100644 --- a/bin/asm-runner/src/worker_context.rs +++ b/bin/asm-runner/src/worker_context.rs @@ -24,10 +24,10 @@ use moho_runtime_interface::MohoProgram; use moho_types::{ExportState, MohoState}; use strata_asm_common::{AnchorState, AsmManifest, AsmManifestHash, AuxData}; use strata_asm_logs::NewExportEntry; -use strata_asm_proof_db::SledMohoStateDb; use strata_asm_proof_impl::moho_program::program::{ AsmStfProgram, advance_export_state_with_logs, extract_next_predicate_from_logs, }; +use strata_asm_prover_storage::SledMohoStateDb; use strata_asm_worker::{ AnchorStateStore, AsmState, AuxDataStore, L1DataProvider, ManifestMmrStore, WorkerError, WorkerResult, diff --git a/crates/proof/db/Cargo.toml b/crates/extensions/prover/storage/Cargo.toml similarity index 92% rename from crates/proof/db/Cargo.toml rename to crates/extensions/prover/storage/Cargo.toml index 04d76472..68a1c260 100644 --- a/crates/proof/db/Cargo.toml +++ b/crates/extensions/prover/storage/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "strata-asm-proof-db" +name = "strata-asm-prover-storage" version = "0.1.0" edition = "2024" diff --git a/crates/proof/db/src/lib.rs b/crates/extensions/prover/storage/src/lib.rs similarity index 100% rename from crates/proof/db/src/lib.rs rename to crates/extensions/prover/storage/src/lib.rs diff --git a/crates/proof/db/src/moho_state.rs b/crates/extensions/prover/storage/src/moho_state.rs similarity index 100% rename from crates/proof/db/src/moho_state.rs rename to crates/extensions/prover/storage/src/moho_state.rs diff --git a/crates/proof/db/src/proof_db.rs b/crates/extensions/prover/storage/src/proof_db.rs similarity index 100% rename from crates/proof/db/src/proof_db.rs rename to crates/extensions/prover/storage/src/proof_db.rs diff --git a/crates/proof/db/src/remote_mapping.rs b/crates/extensions/prover/storage/src/remote_mapping.rs similarity index 100% rename from crates/proof/db/src/remote_mapping.rs rename to crates/extensions/prover/storage/src/remote_mapping.rs diff --git a/crates/proof/db/src/remote_status.rs b/crates/extensions/prover/storage/src/remote_status.rs similarity index 100% rename from crates/proof/db/src/remote_status.rs rename to crates/extensions/prover/storage/src/remote_status.rs diff --git a/crates/proof/db/src/sled/mod.rs b/crates/extensions/prover/storage/src/sled/mod.rs similarity index 100% rename from crates/proof/db/src/sled/mod.rs rename to crates/extensions/prover/storage/src/sled/mod.rs diff --git a/crates/proof/db/src/sled/moho_state.rs b/crates/extensions/prover/storage/src/sled/moho_state.rs similarity index 100% rename from crates/proof/db/src/sled/moho_state.rs rename to crates/extensions/prover/storage/src/sled/moho_state.rs diff --git a/crates/proof/db/src/sled/proof_db.rs b/crates/extensions/prover/storage/src/sled/proof_db.rs similarity index 100% rename from crates/proof/db/src/sled/proof_db.rs rename to crates/extensions/prover/storage/src/sled/proof_db.rs diff --git a/crates/proof/db/src/sled/remote_mapping.rs b/crates/extensions/prover/storage/src/sled/remote_mapping.rs similarity index 100% rename from crates/proof/db/src/sled/remote_mapping.rs rename to crates/extensions/prover/storage/src/sled/remote_mapping.rs diff --git a/crates/proof/db/src/sled/remote_status.rs b/crates/extensions/prover/storage/src/sled/remote_status.rs similarity index 100% rename from crates/proof/db/src/sled/remote_status.rs rename to crates/extensions/prover/storage/src/sled/remote_status.rs From 90f1a6ab88d0585ff6521618cf96ebf7293a6f25 Mon Sep 17 00:00:00 2001 From: Prajwol Gyawali Date: Sun, 7 Jun 2026 14:43:33 +0545 Subject: [PATCH 4/6] refactor(prover): extract proof orchestration into a prover-worker crate Proof scheduling and reconciliation lived inside the asm-runner binary, hard-wired to the concrete SledProofDb, so the orchestrator could not be tested against fakes or reused outside the binary. Introduce strata-asm-prover-worker, mirroring the ASM worker's shape: it defines a ProverContext umbrella trait and drives the orchestrator generically over it, while the binary supplies the concrete impl (AsmProverContext) wiring the sled stores and the Bitcoin client. The proving backend is split into native and sp1 modules behind the existing feature gate. --- Cargo.lock | 50 +++- Cargo.toml | 4 +- bin/asm-runner/Cargo.toml | 25 +- bin/asm-runner/src/block_watcher.rs | 17 +- bin/asm-runner/src/bootstrap.rs | 112 ++++----- bin/asm-runner/src/config.rs | 3 +- bin/asm-runner/src/main.rs | 3 +- bin/asm-runner/src/prover/backend.rs | 209 ----------------- bin/asm-runner/src/prover/mod.rs | 15 -- bin/asm-runner/src/prover_context.rs | 221 ++++++++++++++++++ crates/extensions/prover/worker/Cargo.toml | 55 +++++ .../prover/worker/src/backend/mod.rs | 103 ++++++++ .../prover/worker/src/backend/native.rs | 49 ++++ .../prover/worker/src/backend/sp1.rs | 81 +++++++ .../extensions/prover/worker/src/builder.rs | 102 ++++++++ .../extensions/prover/worker/src}/config.rs | 6 +- crates/extensions/prover/worker/src/errors.rs | 21 ++ crates/extensions/prover/worker/src/handle.rs | 30 +++ .../extensions/prover/worker/src}/input.rs | 132 ++++++----- crates/extensions/prover/worker/src/lib.rs | 35 +++ .../prover/worker/src}/orchestrator.rs | 94 ++++---- .../prover/worker/src}/proof_store.rs | 26 ++- .../extensions/prover/worker/src}/queue.rs | 0 crates/extensions/prover/worker/src/traits.rs | 88 +++++++ crates/rpc/Cargo.toml | 2 +- 25 files changed, 1036 insertions(+), 447 deletions(-) delete mode 100644 bin/asm-runner/src/prover/backend.rs delete mode 100644 bin/asm-runner/src/prover/mod.rs create mode 100644 bin/asm-runner/src/prover_context.rs create mode 100644 crates/extensions/prover/worker/Cargo.toml create mode 100644 crates/extensions/prover/worker/src/backend/mod.rs create mode 100644 crates/extensions/prover/worker/src/backend/native.rs create mode 100644 crates/extensions/prover/worker/src/backend/sp1.rs create mode 100644 crates/extensions/prover/worker/src/builder.rs rename {bin/asm-runner/src/prover => crates/extensions/prover/worker/src}/config.rs (95%) create mode 100644 crates/extensions/prover/worker/src/errors.rs create mode 100644 crates/extensions/prover/worker/src/handle.rs rename {bin/asm-runner/src/prover => crates/extensions/prover/worker/src}/input.rs (65%) create mode 100644 crates/extensions/prover/worker/src/lib.rs rename {bin/asm-runner/src/prover => crates/extensions/prover/worker/src}/orchestrator.rs (90%) rename {bin/asm-runner/src/prover => crates/extensions/prover/worker/src}/proof_store.rs (65%) rename {bin/asm-runner/src/prover => crates/extensions/prover/worker/src}/queue.rs (100%) create mode 100644 crates/extensions/prover/worker/src/traits.rs diff --git a/Cargo.lock b/Cargo.lock index 9d79d7c5..f55d98aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8025,6 +8025,43 @@ dependencies = [ "zkaleido", ] +[[package]] +name = "strata-asm-prover-worker" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "bincode", + "bitcoin", + "hex", + "k256", + "moho-recursive-proof", + "moho-runtime-impl", + "moho-types", + "serde", + "sp1-sdk", + "sp1-verifier", + "ssz", + "strata-asm-common", + "strata-asm-proof-impl", + "strata-asm-prover-storage", + "strata-asm-prover-types", + "strata-btc-types", + "strata-btc-verification", + "strata-identifiers", + "strata-merkle", + "strata-predicate", + "strata-tasks", + "thiserror 2.0.18", + "tokio", + "tracing", + "tree_hash", + "zkaleido", + "zkaleido-native-adapter", + "zkaleido-sp1-groth16-verifier", + "zkaleido-sp1-host", +] + [[package]] name = "strata-asm-rpc" version = "0.1.0" @@ -8045,24 +8082,17 @@ dependencies = [ "anyhow", "asm-storage", "async-trait", - "bincode", "bitcoin", "bitcoincore-zmq", "bitcoind-async-client", "clap", "futures", - "hex", "jsonrpsee", - "k256", - "moho-recursive-proof", - "moho-runtime-impl", "moho-runtime-interface", "moho-types", "serde", "serde_json", "sled", - "sp1-sdk", - "sp1-verifier", "ssz", "strata-asm-common", "strata-asm-logs", @@ -8076,11 +8106,11 @@ dependencies = [ "strata-asm-proto-checkpoint-types", "strata-asm-prover-storage", "strata-asm-prover-types", + "strata-asm-prover-worker", "strata-asm-rpc", "strata-asm-spec", "strata-asm-worker", "strata-btc-types", - "strata-btc-verification", "strata-identifiers", "strata-logging", "strata-merkle", @@ -8091,11 +8121,7 @@ dependencies = [ "tokio", "toml 1.1.2+spec-1.1.0", "tracing", - "tree_hash", "zkaleido", - "zkaleido-native-adapter", - "zkaleido-sp1-groth16-verifier", - "zkaleido-sp1-host", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 7504af37..5862f8c4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,9 +7,10 @@ members = [ "crates/logs", "crates/manifest-types", "crates/params", - "crates/proof/statements", "crates/extensions/prover/storage", "crates/extensions/prover/types", + "crates/extensions/prover/worker", + "crates/proof/statements", "crates/rpc", "crates/spec", "crates/spec-debug", @@ -77,6 +78,7 @@ strata-asm-proto-debug-v1 = { path = "crates/subprotocols/debug-v1/subprotocol" strata-asm-proto-txs-test-utils = { path = "crates/subprotocols/txs-test-utils" } strata-asm-prover-storage = { path = "crates/extensions/prover/storage" } strata-asm-prover-types = { path = "crates/extensions/prover/types" } +strata-asm-prover-worker = { path = "crates/extensions/prover/worker" } strata-asm-rpc = { path = "crates/rpc" } strata-asm-spec = { path = "crates/spec" } strata-asm-spec-debug = { path = "crates/spec-debug" } diff --git a/bin/asm-runner/Cargo.toml b/bin/asm-runner/Cargo.toml index ca21927b..8c50d973 100644 --- a/bin/asm-runner/Cargo.toml +++ b/bin/asm-runner/Cargo.toml @@ -7,25 +7,23 @@ edition = "2024" workspace = true [dependencies] -moho-recursive-proof.workspace = true -moho-runtime-impl.workspace = true moho-runtime-interface.workspace = true moho-types.workspace = true strata-asm-rpc.workspace = true -strata-btc-verification.workspace = true strata-asm-common.workspace = true strata-asm-logs.workspace = true strata-asm-params.workspace = true -strata-asm-prover-storage.workspace = true strata-asm-proof-impl.workspace = true -strata-asm-prover-types.workspace = true strata-asm-proto-bridge-v1.workspace = true strata-asm-proto-bridge-v1-txs.workspace = true strata-asm-proto-bridge-v1-types.workspace = true strata-asm-proto-checkpoint.workspace = true strata-asm-proto-checkpoint-txs.workspace = true strata-asm-proto-checkpoint-types.workspace = true +strata-asm-prover-storage.workspace = true +strata-asm-prover-types.workspace = true +strata-asm-prover-worker.workspace = true strata-asm-spec.workspace = true strata-asm-worker.workspace = true strata-btc-types.workspace = true @@ -37,43 +35,28 @@ strata-identifiers.workspace = true strata-merkle.workspace = true strata-predicate.workspace = true strata-tasks.workspace = true -tree_hash.workspace = true anyhow.workspace = true async-trait.workspace = true -bincode = { workspace = true, optional = true } bitcoin.workspace = true bitcoincore-zmq = { version = "1.5.2", features = ["async"] } bitcoind-async-client.workspace = true clap.workspace = true futures.workspace = true -hex.workspace = true jsonrpsee = { workspace = true, features = ["server", "macros"] } -k256 = { workspace = true, features = ["schnorr"] } serde.workspace = true serde_json.workspace = true sled.workspace = true -sp1-sdk = { workspace = true, optional = true } -sp1-verifier = { workspace = true, optional = true } ssz.workspace = true thiserror.workspace = true tokio.workspace = true toml.workspace = true tracing.workspace = true zkaleido.workspace = true -zkaleido-native-adapter.workspace = true -zkaleido-sp1-groth16-verifier = { workspace = true, optional = true } -zkaleido-sp1-host = { workspace = true, optional = true } [dev-dependencies] tempfile.workspace = true [features] default = [] -sp1 = [ - "dep:zkaleido-sp1-host", - "dep:zkaleido-sp1-groth16-verifier", - "dep:sp1-verifier", - "dep:sp1-sdk", - "dep:bincode", -] +sp1 = ["strata-asm-prover-worker/sp1"] diff --git a/bin/asm-runner/src/block_watcher.rs b/bin/asm-runner/src/block_watcher.rs index 8d716468..02113c52 100644 --- a/bin/asm-runner/src/block_watcher.rs +++ b/bin/asm-runner/src/block_watcher.rs @@ -16,11 +16,12 @@ use bitcoincore_zmq::{Message, SocketMessage, subscribe_async_wait_handshake}; use bitcoind_async_client::{Client, traits::Reader}; use futures::StreamExt; use strata_asm_prover_types::{L1Range, ProofId}; +use strata_asm_prover_worker::ProverWorkerHandle; use strata_asm_worker::AsmWorkerHandle; use strata_btc_types::BlockHashExt; use strata_identifiers::L1BlockCommitment; use strata_tasks::ShutdownGuard; -use tokio::{sync::mpsc, time::timeout}; +use tokio::time::timeout; use tracing::{debug, error, info, warn}; use crate::config::BitcoinConfig; @@ -37,7 +38,7 @@ pub(crate) async fn drive_asm_from_bitcoin( bitcoin_client: Arc, asm_worker: Arc, start_height: u64, - proof_tx: Option>, + proof_handle: Option, shutdown: ShutdownGuard, ) -> Result<()> { info!(%start_height, "starting ASM block watcher"); @@ -105,7 +106,7 @@ pub(crate) async fn drive_asm_from_bitcoin( for height in cursor..received_height { match fetch_block_at_height(&bitcoin_client, height).await { Ok(fetched) => { - if let Err(err) = submit_block(&asm_worker, &proof_tx, fetched).await { + if let Err(err) = submit_block(&asm_worker, &proof_handle, fetched).await { error!(%height, ?err, "failed to submit backfill block"); // Stop backfilling on failure so we don't hand the // worker a gap. The next ZMQ event will retry. @@ -120,7 +121,7 @@ pub(crate) async fn drive_asm_from_bitcoin( } } - if let Err(err) = submit_block(&asm_worker, &proof_tx, block).await { + if let Err(err) = submit_block(&asm_worker, &proof_handle, block).await { error!(%received_height, ?err, "failed to submit block from ZMQ"); } cursor = received_height + 1; @@ -143,7 +144,7 @@ async fn fetch_block_at_height(client: &Client, height: u64) -> Result { /// Submit a block to the ASM worker and, optionally, enqueue a proof request. async fn submit_block( asm_worker: &AsmWorkerHandle, - proof_tx: &Option>, + proof_handle: &Option, block: Block, ) -> Result<()> { let height = block.bip34_block_height().unwrap_or(0); @@ -158,13 +159,13 @@ async fn submit_block( debug!(%height, %hash, "submitted block to ASM worker"); - if let Some(tx) = proof_tx { + if let Some(handle) = proof_handle { let asm_proof_id = ProofId::Asm(L1Range::single(commitment)); - if let Err(err) = tx.send(asm_proof_id) { + if let Err(err) = handle.request_proof(asm_proof_id) { warn!(%height, %hash, ?err, "failed to enqueue ASM proof request"); } let moho_proof_id = ProofId::Moho(commitment); - if let Err(err) = tx.send(moho_proof_id) { + if let Err(err) = handle.request_proof(moho_proof_id) { warn!(%height, %hash, ?err, "failed to enqueue Moho proof request"); } } diff --git a/bin/asm-runner/src/bootstrap.rs b/bin/asm-runner/src/bootstrap.rs index 6cbd20c5..1c9fc3d2 100644 --- a/bin/asm-runner/src/bootstrap.rs +++ b/bin/asm-runner/src/bootstrap.rs @@ -4,19 +4,19 @@ use anyhow::Result; use bitcoind_async_client::{Auth, Client}; use strata_asm_params::AsmParams; use strata_asm_prover_storage::{SledMohoStateDb, SledProofDb}; +use strata_asm_prover_worker::{InputBuilder, ProofBackend, ProverWorkerBuilder}; use strata_asm_spec::StrataAsmSpec; use strata_asm_worker::AsmWorkerBuilder; use strata_tasks::TaskExecutor; use tokio::{ runtime::{Builder as RuntimeBuilder, Handle}, - sync::mpsc, task::{self, LocalSet}, }; use crate::{ block_watcher::drive_asm_from_bitcoin, config::{AsmRpcConfig, BitcoinConfig}, - prover::{InputBuilder, ProofBackend, ProofOrchestrator}, + prover_context::AsmProverContext, rpc_server::{AsmProofRpcDeps, run_rpc_server}, storage::create_storage, worker_context::{AsmWorkerContext, MohoStorage}, @@ -83,61 +83,61 @@ pub(crate) async fn bootstrap( let asm_worker = Arc::new(asm_worker); // 7. Finish orchestrator wiring if it was configured. - let (proof_tx, proof_rpc_deps) = if let Some((orch_config, proof_db, moho_state_db, backend)) = - orch_prep - { - let (tx, rx) = mpsc::unbounded_channel(); - let rpc_deps = AsmProofRpcDeps { - proof_db: proof_db.clone(), - moho_state_db: moho_state_db.clone(), - export_entries_db: export_entries_db.clone(), + let (proof_handle, proof_rpc_deps) = + if let Some((orch_config, proof_db, moho_state_db, backend)) = orch_prep { + let rpc_deps = AsmProofRpcDeps { + proof_db: proof_db.clone(), + moho_state_db: moho_state_db.clone(), + export_entries_db: export_entries_db.clone(), + }; + + let ProofBackend { + asm_host, + moho_host, + asm_predicate, + moho_predicate, + } = backend; + + // The prover context wires the proof store, moho-state store, ASM + // anchor-state store, and Bitcoin client into the worker's traits. + let prover_ctx = AsmProverContext::new( + proof_db, + moho_state_db, + state_db.clone(), + bitcoin_client.clone(), + ); + let input_builder = + InputBuilder::new(params.anchor.block, asm_predicate, moho_predicate); + + let (handle, mut orchestrator) = ProverWorkerBuilder::new() + .with_context(prover_ctx) + .with_hosts(asm_host, moho_host) + .with_config(orch_config) + .with_input_builder(input_builder) + .build()?; + + // ZkVmRemoteProver is !Send (#[async_trait(?Send)]), so the orchestrator + // future cannot be spawned on a multi-threaded runtime directly. We run it + // on a dedicated thread with a single-threaded runtime + LocalSet. + executor.spawn_critical_async_with_shutdown( + "proof_orchestrator", + move |shutdown| async move { + task::spawn_blocking(move || { + let rt = RuntimeBuilder::new_current_thread().enable_all().build()?; + let local = LocalSet::new(); + rt.block_on( + local.run_until(async move { orchestrator.run(shutdown).await }), + ) + }) + .await? + }, + ); + + (Some(handle), Some(rpc_deps)) + } else { + (None, None) }; - let ProofBackend { - asm_host, - moho_host, - asm_predicate, - moho_predicate, - } = backend; - - let input_builder = InputBuilder::new( - state_db.clone(), - bitcoin_client.clone(), - proof_db.clone(), - moho_state_db, - params.anchor.block, - asm_predicate, - moho_predicate, - ); - let mut orchestrator = ProofOrchestrator::new( - proof_db, - asm_host, - moho_host, - orch_config, - input_builder, - rx, - ); - - // ZkVmRemoteProver is !Send (#[async_trait(?Send)]), so the orchestrator - // future cannot be spawned on a multi-threaded runtime directly. We run it - // on a dedicated thread with a single-threaded runtime + LocalSet. - executor.spawn_critical_async_with_shutdown( - "proof_orchestrator", - move |shutdown| async move { - task::spawn_blocking(move || { - let rt = RuntimeBuilder::new_current_thread().enable_all().build()?; - let local = LocalSet::new(); - rt.block_on(local.run_until(async move { orchestrator.run(shutdown).await })) - }) - .await? - }, - ); - - (Some(tx), Some(rpc_deps)) - } else { - (None, None) - }; - // 8. Spawn block watcher as a critical task. let asm_worker_for_driver = asm_worker.clone(); let bitcoin_config = config.bitcoin.clone(); @@ -148,7 +148,7 @@ pub(crate) async fn bootstrap( bitcoin_client_for_driver, asm_worker_for_driver, start_height as u64, - proof_tx, + proof_handle, shutdown, ) }); diff --git a/bin/asm-runner/src/config.rs b/bin/asm-runner/src/config.rs index 15fa2863..0aa9c94f 100644 --- a/bin/asm-runner/src/config.rs +++ b/bin/asm-runner/src/config.rs @@ -3,8 +3,9 @@ use std::{path::PathBuf, time::Duration}; use serde::{Deserialize, Serialize}; +use strata_asm_prover_worker::OrchestratorConfig; -use crate::{prover::config::OrchestratorConfig, retry::RetryConfig}; +use crate::retry::RetryConfig; /// Main configuration structure #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/bin/asm-runner/src/main.rs b/bin/asm-runner/src/main.rs index f88ddfa8..eaa3194b 100644 --- a/bin/asm-runner/src/main.rs +++ b/bin/asm-runner/src/main.rs @@ -6,7 +6,7 @@ mod block_watcher; mod bootstrap; mod config; -mod prover; +mod prover_context; mod retry; mod rpc_server; mod storage; @@ -21,7 +21,6 @@ use strata_logging::{LoggingInitConfig, finalize, init_logging_from_config}; use strata_tasks::TaskManager; use tokio::runtime::{Builder, Handle}; use tracing::{error, info}; -use zkaleido_native_adapter as _; use crate::{ bootstrap::bootstrap, diff --git a/bin/asm-runner/src/prover/backend.rs b/bin/asm-runner/src/prover/backend.rs deleted file mode 100644 index 97b289c3..00000000 --- a/bin/asm-runner/src/prover/backend.rs +++ /dev/null @@ -1,209 +0,0 @@ -//! ZK proof backend setup for the runner. -//! -//! Bundles the feature-gated selection of the ZK proof backend in one place: -//! host construction (SP1 or native), and derivation of the [`PredicateKey`] -//! that authorizes proofs from each host. The result is exposed as a single -//! [`ProofBackend`] value that the runner builds once at startup and threads -//! into the proof orchestrator and the input builder. - -use std::path::Path; - -use anyhow::{Result, bail}; -use k256::schnorr::SigningKey; -use strata_predicate::{PredicateKey, PredicateTypeId}; -use zkaleido::{ZkVm, ZkVmHost}; -#[cfg(feature = "sp1")] -use { - anyhow::Context, - sp1_sdk::{HashableKey, SP1VerifyingKey}, - sp1_verifier::{GROTH16_VK_BYTES, VK_ROOT_BYTES}, - zkaleido_sp1_groth16_verifier::SP1Groth16Verifier, - zkaleido_sp1_host::SP1Host, -}; - -use crate::prover::config::BackendConfig; - -/// Concrete host type used by the proof orchestrator. -/// -/// Resolves to [`SP1Host`] when the `sp1` feature is enabled, otherwise to -/// the in-process [`zkaleido_native_adapter::NativeHost`]. -#[cfg(feature = "sp1")] -pub(crate) type ProofHost = SP1Host; - -#[cfg(not(feature = "sp1"))] -pub(crate) type ProofHost = zkaleido_native_adapter::NativeHost; - -/// ZK proof backend used by the runner. -/// -/// Bundles the `(asm, moho)` host pair together with the [`PredicateKey`] that -/// each one's proofs verify against. Constructed once at startup via -/// [`ProofBackend::new`] and consumed by the proof orchestrator (hosts) and -/// the input builder (predicates). -#[derive(Debug)] -pub(crate) struct ProofBackend { - pub(crate) asm_host: ProofHost, - pub(crate) moho_host: ProofHost, - pub(crate) asm_predicate: PredicateKey, - pub(crate) moho_predicate: PredicateKey, -} - -impl ProofBackend { - /// Builds the ZK proof backend. - /// - /// Constructs both proof hosts and resolves the [`PredicateKey`] each - /// host's proofs verify against. - /// - /// # Errors - /// - /// - Returns an error if the requested [`BackendConfig`] variant does not match the binary's - /// build features (e.g. `Sp1` requested without the `sp1` feature). - /// - Returns an error if either host cannot be constructed (e.g. a guest ELF cannot be read in - /// `sp1` builds) or if either host's verifying key cannot be turned into a [`PredicateKey`]. - pub(crate) async fn new(cfg: &BackendConfig) -> Result { - let (asm_host, moho_host) = build_proof_hosts(cfg).await?; - let asm_predicate = resolve_predicate(&asm_host)?; - let moho_predicate = resolve_predicate(&moho_host)?; - Ok(Self { - asm_host, - moho_host, - asm_predicate, - moho_predicate, - }) - } -} - -/// Builds the `(asm, moho)` host pair used by the proof orchestrator. -/// -/// Dispatches on the [`BackendConfig`] variant. If the variant does not -/// match the binary's build features, surfaces a clear startup error rather -/// than failing later in the proving path. -async fn build_proof_hosts(cfg: &BackendConfig) -> Result<(ProofHost, ProofHost)> { - match cfg { - BackendConfig::Sp1 { - asm_elf_path, - moho_elf_path, - } => build_sp1_hosts(asm_elf_path, moho_elf_path).await, - BackendConfig::Native { - asm_schnorr_signing_key, - moho_schnorr_signing_key, - } => build_native_hosts(asm_schnorr_signing_key, moho_schnorr_signing_key).await, - } -} - -#[cfg(feature = "sp1")] -async fn build_sp1_hosts( - asm_elf_path: &Path, - moho_elf_path: &Path, -) -> Result<(ProofHost, ProofHost)> { - use std::fs; - - let asm_elf = fs::read(asm_elf_path) - .with_context(|| format!("failed to read ASM guest ELF at {}", asm_elf_path.display()))?; - let moho_elf = fs::read(moho_elf_path).with_context(|| { - format!( - "failed to read Moho guest ELF at {}", - moho_elf_path.display() - ) - })?; - - Ok(( - SP1Host::init(&asm_elf).await, - SP1Host::init(&moho_elf).await, - )) -} - -#[cfg(not(feature = "sp1"))] -async fn build_sp1_hosts( - _asm_elf_path: &Path, - _moho_elf_path: &Path, -) -> Result<(ProofHost, ProofHost)> { - bail!("sp1 backend requested but binary was built without the `sp1` feature"); -} - -#[cfg(feature = "sp1")] -async fn build_native_hosts( - _asm_signing_key: &SigningKey, - _moho_signing_key: &SigningKey, -) -> Result<(ProofHost, ProofHost)> { - bail!("native backend requested but binary was built with the `sp1` feature"); -} - -#[cfg(not(feature = "sp1"))] -async fn build_native_hosts( - asm_signing_key: &SigningKey, - moho_signing_key: &SigningKey, -) -> Result<(ProofHost, ProofHost)> { - // Bypass the `*::native_host()` convenience constructors: they call - // `NativeHost::new_with_random_key`, which would make each host's - // verifying key — and therefore its derived `PredicateKey` — different - // on every restart. The orchestrator needs stable predicate identities - // across runs, so we construct `NativeHost` directly with the keys - // supplied by config. - use moho_recursive_proof::process_recursive_moho_proof; - use strata_asm_proof_impl::statements::process_asm_stf; - use zkaleido_native_adapter::NativeHost; - - Ok(( - NativeHost::new(asm_signing_key.clone(), process_asm_stf), - NativeHost::new(moho_signing_key.clone(), process_recursive_moho_proof), - )) -} - -/// Resolves the [`PredicateKey`] for proofs produced by `host`. -/// -/// The returned key carries both the predicate type (matching the host's -/// [`ZkVm`] backend) and the encoded verifying-key material required to -/// validate proofs from that host. -/// -/// # Errors -/// -/// - For SP1 hosts, returns an error if the host's verifying key cannot be deserialized into an -/// `SP1VerifyingKey` or if the SP1 Groth16 verifier cannot be loaded for the resulting program -/// hash. -/// - For Risc0 hosts, returns an error because predicate resolution is not yet implemented for that -/// backend. -/// - When built without the `sp1` feature, an SP1 host returns an error because the SP1 -/// verifying-key handling is gated behind that feature. -fn resolve_predicate(host: &impl ZkVmHost) -> Result { - match host.zkvm() { - // Native execution does not produce a real cryptographic proof; the - // predicate simply carries the verifying-key bytes verbatim under the - // BIP-340 Schnorr type as a placeholder identifier. - ZkVm::Native => Ok(PredicateKey::new( - PredicateTypeId::Bip340Schnorr, - host.vk().as_bytes().to_vec(), - )), - - // SP1 proofs are wrapped in a Groth16 proof, so the on-chain - // predicate must identify the SP1 Groth16 verifying key (not the SP1 - // program vk itself). The conversion is: - // 1. Decode the SP1 verifying key from the host's raw bytes. - // 2. Hash it to obtain the program commitment expected by the Groth16 verifier. - // 3. Load the matching Groth16 verifier and serialize its vk into the predicate key. - #[cfg(feature = "sp1")] - ZkVm::SP1 => { - let vk = host.vk(); - let sp1_vk: SP1VerifyingKey = bincode::deserialize(vk.as_bytes()) - .context("failed to deserialize SP1 verifying key")?; - - let verifier = SP1Groth16Verifier::load( - &GROTH16_VK_BYTES, - sp1_vk.bytes32_raw(), - *VK_ROOT_BYTES, - true, - ) - .context("failed to load SP1 Groth16 verifier")?; - - Ok(PredicateKey::new( - PredicateTypeId::Sp1Groth16, - verifier.to_uncompressed_bytes(), - )) - } - #[cfg(not(feature = "sp1"))] - ZkVm::SP1 => bail!("SP1 predicate key resolution requires the `sp1` feature"), - - // Risc0 support is not yet wired up; surface a clear error rather - // than panicking so callers can fail gracefully. - ZkVm::Risc0 => bail!("predicate key resolution is not implemented for Risc0"), - } -} diff --git a/bin/asm-runner/src/prover/mod.rs b/bin/asm-runner/src/prover/mod.rs deleted file mode 100644 index c2cbce6e..00000000 --- a/bin/asm-runner/src/prover/mod.rs +++ /dev/null @@ -1,15 +0,0 @@ -//! Proof orchestration for the ASM runner. -//! -//! Manages the lifecycle of ASM step proofs and Moho recursive proofs by -//! scheduling jobs on a remote prover service and reconciling results. - -mod backend; -pub(crate) mod config; -mod input; -mod orchestrator; -mod proof_store; -mod queue; - -pub(crate) use self::{ - backend::ProofBackend, input::InputBuilder, orchestrator::ProofOrchestrator, -}; diff --git a/bin/asm-runner/src/prover_context.rs b/bin/asm-runner/src/prover_context.rs new file mode 100644 index 00000000..82132f63 --- /dev/null +++ b/bin/asm-runner/src/prover_context.rs @@ -0,0 +1,221 @@ +//! Concrete [`ProverContext`] implementation for the ASM runner. +//! +//! [`AsmProverContext`] is the prover-side analogue of +//! [`AsmWorkerContext`](crate::worker_context::AsmWorkerContext): it wires the +//! sled-backed proof store, the Moho-state store, the ASM anchor-state store, +//! and the Bitcoin client into the concern traits the prover worker drives +//! against. The proof-store traits are delegated to [`SledProofDb`]; Moho-state +//! reads to [`SledMohoStateDb`]; anchor/aux reads to [`AsmStateDb`]; and L1 +//! block reads to the Bitcoin [`Client`]. +//! +//! [`ProverContext`]: strata_asm_prover_worker::ProverContext + +use std::sync::Arc; + +use anyhow::Context; +use asm_storage::AsmStateDb; +use bitcoin::{Block, block::Header}; +use bitcoind_async_client::{Client, traits::Reader}; +use moho_types::MohoState; +use strata_asm_common::{AnchorState, AuxData}; +use strata_asm_prover_storage::{ + MohoStateDb, ProofDb, RemoteProofMappingDb, RemoteProofMappingError, RemoteProofStatusDb, + RemoteProofStatusError, SledMohoStateDb, SledProofDb, +}; +use strata_asm_prover_types::{AsmProof, L1Range, MohoProof, ProofId, RemoteProofId}; +use strata_asm_prover_worker::{AnchorStateReader, AuxDataReader, L1BlockProvider}; +use strata_asm_worker::AsmState; +use strata_btc_types::L1BlockIdBitcoinExt; +use strata_identifiers::{L1BlockCommitment, L1BlockId}; +use zkaleido::RemoteProofStatus; + +/// Concrete prover context for the ASM runner. +/// +/// Implements every concern trait the prover worker needs, so it satisfies the +/// `ProverContext` umbrella via its blanket impl. The bitcoin reads are async +/// and hit the client directly — the orchestrator drives this from a +/// single-threaded runtime, so blocking on the client would deadlock. +pub(crate) struct AsmProverContext { + proof_db: SledProofDb, + moho_state_db: SledMohoStateDb, + state_db: Arc, + bitcoin_client: Arc, +} + +impl AsmProverContext { + pub(crate) fn new( + proof_db: SledProofDb, + moho_state_db: SledMohoStateDb, + state_db: Arc, + bitcoin_client: Arc, + ) -> Self { + Self { + proof_db, + moho_state_db, + state_db, + bitcoin_client, + } + } +} + +// ---- Proof persistence: delegate to the sled proof store ------------------ + +impl ProofDb for AsmProverContext { + type Error = sled::Error; + + async fn store_asm_proof(&self, range: L1Range, proof: AsmProof) -> Result<(), Self::Error> { + self.proof_db.store_asm_proof(range, proof).await + } + + async fn get_asm_proof(&self, range: L1Range) -> Result, Self::Error> { + self.proof_db.get_asm_proof(range).await + } + + async fn store_moho_proof( + &self, + l1ref: L1BlockCommitment, + proof: MohoProof, + ) -> Result<(), Self::Error> { + self.proof_db.store_moho_proof(l1ref, proof).await + } + + async fn get_moho_proof( + &self, + l1ref: L1BlockCommitment, + ) -> Result, Self::Error> { + self.proof_db.get_moho_proof(l1ref).await + } + + async fn get_latest_moho_proof( + &self, + ) -> Result, Self::Error> { + self.proof_db.get_latest_moho_proof().await + } + + async fn prune(&self, before_height: u32) -> Result<(), Self::Error> { + self.proof_db.prune(before_height).await + } +} + +impl RemoteProofMappingDb for AsmProverContext { + type Error = RemoteProofMappingError; + + async fn get_remote_proof_id(&self, id: ProofId) -> Result, Self::Error> { + self.proof_db.get_remote_proof_id(id).await + } + + async fn get_proof_id( + &self, + remote_id: &RemoteProofId, + ) -> Result, Self::Error> { + self.proof_db.get_proof_id(remote_id).await + } + + async fn put_remote_proof_id( + &self, + id: ProofId, + remote_id: RemoteProofId, + ) -> Result<(), Self::Error> { + self.proof_db.put_remote_proof_id(id, remote_id).await + } +} + +impl RemoteProofStatusDb for AsmProverContext { + type Error = RemoteProofStatusError; + + async fn put_status( + &self, + remote_id: &RemoteProofId, + status: RemoteProofStatus, + ) -> Result<(), Self::Error> { + self.proof_db.put_status(remote_id, status).await + } + + async fn update_status( + &self, + remote_id: &RemoteProofId, + status: RemoteProofStatus, + ) -> Result<(), Self::Error> { + self.proof_db.update_status(remote_id, status).await + } + + async fn get_status( + &self, + remote_id: &RemoteProofId, + ) -> Result, Self::Error> { + self.proof_db.get_status(remote_id).await + } + + async fn get_all_in_progress( + &self, + ) -> Result, Self::Error> { + self.proof_db.get_all_in_progress().await + } + + async fn remove(&self, remote_id: &RemoteProofId) -> Result<(), Self::Error> { + self.proof_db.remove(remote_id).await + } +} + +// ---- Moho-state reads: delegate to the sled moho-state store -------------- + +impl MohoStateDb for AsmProverContext { + type Error = sled::Error; + + async fn store_moho_state( + &self, + l1ref: L1BlockCommitment, + state: MohoState, + ) -> Result<(), Self::Error> { + self.moho_state_db.store_moho_state(l1ref, state).await + } + + async fn get_moho_state( + &self, + l1ref: L1BlockCommitment, + ) -> Result, Self::Error> { + self.moho_state_db.get_moho_state(l1ref).await + } + + async fn prune(&self, before_height: u32) -> Result<(), Self::Error> { + self.moho_state_db.prune(before_height).await + } +} + +// ---- Chain/state reads for the input builder ------------------------------ + +impl AnchorStateReader for AsmProverContext { + fn get_anchor_state(&self, blockid: &L1BlockCommitment) -> anyhow::Result { + let asm_state: AsmState = self + .state_db + .get(blockid)? + .context("anchor state not found")?; + Ok(asm_state.state().clone()) + } +} + +impl AuxDataReader for AsmProverContext { + fn get_aux_data(&self, blockid: &L1BlockCommitment) -> anyhow::Result { + self.state_db + .get_aux_data(blockid)? + .context("aux data not found for block") + } +} + +impl L1BlockProvider for AsmProverContext { + async fn get_l1_block(&self, blockid: &L1BlockId) -> anyhow::Result { + let hash = blockid.to_block_hash(); + self.bitcoin_client + .get_block(&hash) + .await + .context("failed to fetch Bitcoin block") + } + + async fn get_l1_block_header(&self, blockid: &L1BlockId) -> anyhow::Result
{ + let hash = blockid.to_block_hash(); + self.bitcoin_client + .get_block_header(&hash) + .await + .context("failed to fetch Bitcoin block header") + } +} diff --git a/crates/extensions/prover/worker/Cargo.toml b/crates/extensions/prover/worker/Cargo.toml new file mode 100644 index 00000000..846f1fa7 --- /dev/null +++ b/crates/extensions/prover/worker/Cargo.toml @@ -0,0 +1,55 @@ +[package] +name = "strata-asm-prover-worker" +version = "0.1.0" +edition = "2024" + +[lints] +workspace = true + +[dependencies] +moho-recursive-proof.workspace = true +moho-runtime-impl.workspace = true +moho-types.workspace = true +strata-asm-common.workspace = true +strata-asm-proof-impl.workspace = true +strata-asm-prover-storage.workspace = true +strata-asm-prover-types.workspace = true +strata-btc-types.workspace = true +strata-btc-verification.workspace = true +strata-identifiers.workspace = true +strata-merkle = { workspace = true, features = ["ssz"] } +strata-predicate.workspace = true +strata-tasks.workspace = true + +anyhow.workspace = true +async-trait.workspace = true +bitcoin.workspace = true +hex.workspace = true +k256 = { workspace = true, features = ["schnorr"] } +serde.workspace = true +ssz.workspace = true +thiserror.workspace = true +tokio.workspace = true +tracing.workspace = true +tree_hash.workspace = true +zkaleido.workspace = true +zkaleido-native-adapter.workspace = true + +bincode = { workspace = true, optional = true } +sp1-sdk = { workspace = true, optional = true } +sp1-verifier = { workspace = true, optional = true } +zkaleido-sp1-groth16-verifier = { workspace = true, optional = true } +zkaleido-sp1-host = { workspace = true, optional = true } + +[dev-dependencies] +tokio.workspace = true + +[features] +default = [] +sp1 = [ + "dep:zkaleido-sp1-host", + "dep:zkaleido-sp1-groth16-verifier", + "dep:sp1-verifier", + "dep:sp1-sdk", + "dep:bincode", +] diff --git a/crates/extensions/prover/worker/src/backend/mod.rs b/crates/extensions/prover/worker/src/backend/mod.rs new file mode 100644 index 00000000..80f459a3 --- /dev/null +++ b/crates/extensions/prover/worker/src/backend/mod.rs @@ -0,0 +1,103 @@ +//! ZK proof backend setup for the prover worker. +//! +//! Bundles the feature-gated selection of the ZK proof backend in one place: +//! host construction (SP1 or native, in [`sp1`] / [`native`]) and derivation of +//! the [`PredicateKey`] that authorizes proofs from each host. The result is +//! exposed as a single [`ProofBackend`] value that the runner builds once at +//! startup and threads into the proof orchestrator and the input builder. + +mod native; +mod sp1; + +use anyhow::{Result, bail}; +use strata_predicate::PredicateKey; +use zkaleido::{ZkVm, ZkVmHost}; +#[cfg(feature = "sp1")] +use zkaleido_sp1_host::SP1Host; + +use crate::config::BackendConfig; + +/// Concrete host type used by the proof orchestrator. +/// +/// Resolves to [`SP1Host`] when the `sp1` feature is enabled, otherwise to +/// the in-process [`zkaleido_native_adapter::NativeHost`]. +#[cfg(feature = "sp1")] +pub type ProofHost = SP1Host; + +#[cfg(not(feature = "sp1"))] +pub type ProofHost = zkaleido_native_adapter::NativeHost; + +/// ZK proof backend used by the runner. +/// +/// Bundles the `(asm, moho)` host pair together with the [`PredicateKey`] that +/// each one's proofs verify against. Constructed once at startup via +/// [`ProofBackend::new`] and consumed by the proof orchestrator (hosts) and +/// the input builder (predicates). +#[derive(Debug)] +pub struct ProofBackend { + pub asm_host: ProofHost, + pub moho_host: ProofHost, + pub asm_predicate: PredicateKey, + pub moho_predicate: PredicateKey, +} + +impl ProofBackend { + /// Builds the ZK proof backend. + /// + /// Constructs both proof hosts and resolves the [`PredicateKey`] each + /// host's proofs verify against. + /// + /// # Errors + /// + /// - Returns an error if the requested [`BackendConfig`] variant does not match the binary's + /// build features (e.g. `Sp1` requested without the `sp1` feature). + /// - Returns an error if either host cannot be constructed (e.g. a guest ELF cannot be read in + /// `sp1` builds) or if either host's verifying key cannot be turned into a [`PredicateKey`]. + pub async fn new(cfg: &BackendConfig) -> Result { + let (asm_host, moho_host) = build_proof_hosts(cfg).await?; + let asm_predicate = resolve_predicate(&asm_host)?; + let moho_predicate = resolve_predicate(&moho_host)?; + Ok(Self { + asm_host, + moho_host, + asm_predicate, + moho_predicate, + }) + } +} + +/// Builds the `(asm, moho)` host pair used by the proof orchestrator. +/// +/// Dispatches on the [`BackendConfig`] variant. If the variant does not +/// match the binary's build features, the corresponding builder surfaces a +/// clear startup error rather than failing later in the proving path. +async fn build_proof_hosts(cfg: &BackendConfig) -> Result<(ProofHost, ProofHost)> { + match cfg { + BackendConfig::Sp1 { + asm_elf_path, + moho_elf_path, + } => sp1::build_sp1_hosts(asm_elf_path, moho_elf_path).await, + BackendConfig::Native { + asm_schnorr_signing_key, + moho_schnorr_signing_key, + } => native::build_native_hosts(asm_schnorr_signing_key, moho_schnorr_signing_key).await, + } +} + +/// Resolves the [`PredicateKey`] for proofs produced by `host`, dispatching on +/// its [`ZkVm`] backend. +/// +/// # Errors +/// +/// - For SP1 hosts, returns an error if the verifying key cannot be decoded or the Groth16 verifier +/// cannot be loaded (and, when built without the `sp1` feature, that the feature is required). +/// - For Risc0 hosts, returns an error because predicate resolution is not yet implemented. +fn resolve_predicate(host: &impl ZkVmHost) -> Result { + match host.zkvm() { + ZkVm::Native => native::resolve_native_predicate(host), + ZkVm::SP1 => sp1::resolve_sp1_predicate(host), + // Risc0 support is not yet wired up; surface a clear error rather + // than panicking so callers can fail gracefully. + ZkVm::Risc0 => bail!("predicate key resolution is not implemented for Risc0"), + } +} diff --git a/crates/extensions/prover/worker/src/backend/native.rs b/crates/extensions/prover/worker/src/backend/native.rs new file mode 100644 index 00000000..35292e73 --- /dev/null +++ b/crates/extensions/prover/worker/src/backend/native.rs @@ -0,0 +1,49 @@ +//! Native (in-process) proof host construction and predicate resolution. + +use anyhow::Result; +use k256::schnorr::SigningKey; +use strata_predicate::{PredicateKey, PredicateTypeId}; +use zkaleido::ZkVmHost; + +use super::ProofHost; + +/// Resolves the [`PredicateKey`] for a native host. +/// +/// Native execution does not produce a real cryptographic proof; the predicate +/// simply carries the verifying-key bytes verbatim under the BIP-340 Schnorr +/// type as a placeholder identifier. +pub(super) fn resolve_native_predicate(host: &impl ZkVmHost) -> Result { + Ok(PredicateKey::new( + PredicateTypeId::Bip340Schnorr, + host.vk().as_bytes().to_vec(), + )) +} + +#[cfg(feature = "sp1")] +pub(super) async fn build_native_hosts( + _asm_signing_key: &SigningKey, + _moho_signing_key: &SigningKey, +) -> Result<(ProofHost, ProofHost)> { + anyhow::bail!("native backend requested but binary was built with the `sp1` feature"); +} + +#[cfg(not(feature = "sp1"))] +pub(super) async fn build_native_hosts( + asm_signing_key: &SigningKey, + moho_signing_key: &SigningKey, +) -> Result<(ProofHost, ProofHost)> { + // Bypass the `*::native_host()` convenience constructors: they call + // `NativeHost::new_with_random_key`, which would make each host's + // verifying key — and therefore its derived `PredicateKey` — different + // on every restart. The orchestrator needs stable predicate identities + // across runs, so we construct `NativeHost` directly with the keys + // supplied by config. + use moho_recursive_proof::process_recursive_moho_proof; + use strata_asm_proof_impl::statements::process_asm_stf; + use zkaleido_native_adapter::NativeHost; + + Ok(( + NativeHost::new(asm_signing_key.clone(), process_asm_stf), + NativeHost::new(moho_signing_key.clone(), process_recursive_moho_proof), + )) +} diff --git a/crates/extensions/prover/worker/src/backend/sp1.rs b/crates/extensions/prover/worker/src/backend/sp1.rs new file mode 100644 index 00000000..27b3f70f --- /dev/null +++ b/crates/extensions/prover/worker/src/backend/sp1.rs @@ -0,0 +1,81 @@ +//! SP1 proof host construction and predicate resolution. + +use std::path::Path; + +use anyhow::Result; +use strata_predicate::PredicateKey; +use zkaleido::ZkVmHost; +#[cfg(feature = "sp1")] +use { + anyhow::Context, + sp1_sdk::{HashableKey, SP1VerifyingKey}, + sp1_verifier::{GROTH16_VK_BYTES, VK_ROOT_BYTES}, + strata_predicate::PredicateTypeId, + zkaleido_sp1_groth16_verifier::SP1Groth16Verifier, + zkaleido_sp1_host::SP1Host, +}; + +use super::ProofHost; + +#[cfg(feature = "sp1")] +pub(super) async fn build_sp1_hosts( + asm_elf_path: &Path, + moho_elf_path: &Path, +) -> Result<(ProofHost, ProofHost)> { + use std::fs; + + let asm_elf = fs::read(asm_elf_path) + .with_context(|| format!("failed to read ASM guest ELF at {}", asm_elf_path.display()))?; + let moho_elf = fs::read(moho_elf_path).with_context(|| { + format!( + "failed to read Moho guest ELF at {}", + moho_elf_path.display() + ) + })?; + + Ok(( + SP1Host::init(&asm_elf).await, + SP1Host::init(&moho_elf).await, + )) +} + +#[cfg(not(feature = "sp1"))] +pub(super) async fn build_sp1_hosts( + _asm_elf_path: &Path, + _moho_elf_path: &Path, +) -> Result<(ProofHost, ProofHost)> { + anyhow::bail!("sp1 backend requested but binary was built without the `sp1` feature"); +} + +/// Resolves the [`PredicateKey`] for an SP1 host. +/// +/// SP1 proofs are wrapped in a Groth16 proof, so the on-chain predicate must +/// identify the SP1 Groth16 verifying key (not the SP1 program vk itself). The +/// conversion is: +/// 1. Decode the SP1 verifying key from the host's raw bytes. +/// 2. Hash it to obtain the program commitment expected by the Groth16 verifier. +/// 3. Load the matching Groth16 verifier and serialize its vk into the predicate key. +#[cfg(feature = "sp1")] +pub(super) fn resolve_sp1_predicate(host: &impl ZkVmHost) -> Result { + let vk = host.vk(); + let sp1_vk: SP1VerifyingKey = + bincode::deserialize(vk.as_bytes()).context("failed to deserialize SP1 verifying key")?; + + let verifier = SP1Groth16Verifier::load( + &GROTH16_VK_BYTES, + sp1_vk.bytes32_raw(), + *VK_ROOT_BYTES, + true, + ) + .context("failed to load SP1 Groth16 verifier")?; + + Ok(PredicateKey::new( + PredicateTypeId::Sp1Groth16, + verifier.to_uncompressed_bytes(), + )) +} + +#[cfg(not(feature = "sp1"))] +pub(super) fn resolve_sp1_predicate(_host: &impl ZkVmHost) -> Result { + anyhow::bail!("SP1 predicate key resolution requires the `sp1` feature"); +} diff --git a/crates/extensions/prover/worker/src/builder.rs b/crates/extensions/prover/worker/src/builder.rs new file mode 100644 index 00000000..397c16ae --- /dev/null +++ b/crates/extensions/prover/worker/src/builder.rs @@ -0,0 +1,102 @@ +//! Builder for assembling a prover worker. + +use strata_asm_prover_types::ProofId; +use tokio::sync::mpsc; +use zkaleido::ZkVmRemoteHost; + +use crate::{ + InputBuilder, ProofOrchestrator, ProverContext, ProverWorkerHandle, + config::OrchestratorConfig, + errors::{ProverError, ProverResult}, +}; + +/// Builder for assembling a prover worker. +/// +/// Wires the context, remote hosts, config, and input builder into a +/// [`ProofOrchestrator`], returning it alongside a [`ProverWorkerHandle`] over a +/// freshly created request channel. +/// +/// The orchestrator is *not* spawned here: its run loop is `!Send` (the +/// upstream `ZkVmRemoteProver` is `#[async_trait(?Send)]`), so the caller must +/// drive [`ProofOrchestrator::run`] itself — typically on a dedicated thread +/// with a single-threaded runtime and a `LocalSet`. +#[derive(Debug)] +pub struct ProverWorkerBuilder { + ctx: Option, + asm_host: Option, + moho_host: Option, + config: Option, + input_builder: Option, +} + +impl ProverWorkerBuilder { + /// Creates a new, empty builder. + pub fn new() -> Self { + Self { + ctx: None, + asm_host: None, + moho_host: None, + config: None, + input_builder: None, + } + } + + /// Sets the prover context (implements [`ProverContext`]). + pub fn with_context(mut self, ctx: C) -> Self { + self.ctx = Some(ctx); + self + } + + /// Sets the `(asm, moho)` remote host pair. + pub fn with_hosts(mut self, asm_host: H, moho_host: H) -> Self { + self.asm_host = Some(asm_host); + self.moho_host = Some(moho_host); + self + } + + /// Sets the orchestrator configuration. + pub fn with_config(mut self, config: OrchestratorConfig) -> Self { + self.config = Some(config); + self + } + + /// Sets the input builder used to assemble ZkVM inputs. + pub fn with_input_builder(mut self, input_builder: InputBuilder) -> Self { + self.input_builder = Some(input_builder); + self + } +} + +impl ProverWorkerBuilder { + /// Validates the supplied dependencies and assembles the orchestrator. + /// + /// Returns the [`ProverWorkerHandle`] for enqueuing proof requests and the + /// [`ProofOrchestrator`] for the caller to drive. + pub fn build(self) -> ProverResult<(ProverWorkerHandle, ProofOrchestrator)> { + let ctx = self.ctx.ok_or(ProverError::MissingDependency("context"))?; + let asm_host = self + .asm_host + .ok_or(ProverError::MissingDependency("asm_host"))?; + let moho_host = self + .moho_host + .ok_or(ProverError::MissingDependency("moho_host"))?; + let config = self + .config + .ok_or(ProverError::MissingDependency("config"))?; + let input_builder = self + .input_builder + .ok_or(ProverError::MissingDependency("input_builder"))?; + + let (tx, rx) = mpsc::unbounded_channel::(); + let orchestrator = + ProofOrchestrator::new(ctx, asm_host, moho_host, config, input_builder, rx); + + Ok((ProverWorkerHandle::new(tx), orchestrator)) + } +} + +impl Default for ProverWorkerBuilder { + fn default() -> Self { + Self::new() + } +} diff --git a/bin/asm-runner/src/prover/config.rs b/crates/extensions/prover/worker/src/config.rs similarity index 95% rename from bin/asm-runner/src/prover/config.rs rename to crates/extensions/prover/worker/src/config.rs index d5e06557..e847effb 100644 --- a/bin/asm-runner/src/prover/config.rs +++ b/crates/extensions/prover/worker/src/config.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; /// Configuration for the proof orchestrator. #[derive(Debug, Clone, Serialize, Deserialize)] -pub(crate) struct OrchestratorConfig { +pub struct OrchestratorConfig { /// Interval between orchestrator ticks. pub tick_interval: Duration, @@ -26,7 +26,7 @@ pub(crate) struct OrchestratorConfig { /// Tagged with `kind` so the same config schema is valid regardless of /// which features the binary was built with. If the selected variant does /// not match the build (e.g. `sp1` requested in a binary built without the -/// `sp1` feature), [`crate::prover::backend::ProofBackend::new`] surfaces a +/// `sp1` feature), [`ProofBackend::new`](crate::ProofBackend::new) surfaces a /// startup error. #[derive(Clone, Serialize, Deserialize)] #[serde(tag = "kind", rename_all = "snake_case")] @@ -34,7 +34,7 @@ pub(crate) struct OrchestratorConfig { clippy::large_enum_variant, reason = "BackendConfig is parsed once at startup; boxing a SigningKey to save a few bytes on a singleton value is not worth the indirection" )] -pub(crate) enum BackendConfig { +pub enum BackendConfig { /// SP1 backend. Loads the ASM and Moho guest ELFs from explicit paths at startup. Sp1 { asm_elf_path: PathBuf, diff --git a/crates/extensions/prover/worker/src/errors.rs b/crates/extensions/prover/worker/src/errors.rs new file mode 100644 index 00000000..402cccca --- /dev/null +++ b/crates/extensions/prover/worker/src/errors.rs @@ -0,0 +1,21 @@ +//! Error types for the prover worker setup surface. +//! +//! The orchestration loop itself stays `anyhow`-based (it logs and continues on +//! transient failures); these typed errors cover the builder/launch path. + +use thiserror::Error; + +/// Result alias for prover-worker setup operations. +pub type ProverResult = Result; + +/// Errors surfaced while building or launching the prover worker. +#[derive(Debug, Error)] +pub enum ProverError { + /// A required dependency was not supplied to the builder. + #[error("missing required dependency: {0}")] + MissingDependency(&'static str), + + /// Any other error, typically from backend construction. + #[error(transparent)] + Other(#[from] anyhow::Error), +} diff --git a/crates/extensions/prover/worker/src/handle.rs b/crates/extensions/prover/worker/src/handle.rs new file mode 100644 index 00000000..a46fca04 --- /dev/null +++ b/crates/extensions/prover/worker/src/handle.rs @@ -0,0 +1,30 @@ +//! Handle for requesting proofs from the prover worker. + +use strata_asm_prover_types::ProofId; +use tokio::sync::mpsc; + +/// Handle for submitting proof requests to a running +/// [`ProofOrchestrator`](crate::ProofOrchestrator). +/// +/// Wraps the sender side of the orchestrator's request channel. The block +/// watcher uses this to enqueue ASM/Moho proofs as new L1 blocks are processed. +/// Cloneable and cheap to pass around; dropping every clone signals the +/// orchestrator to drain and shut down. +#[derive(Debug, Clone)] +pub struct ProverWorkerHandle { + tx: mpsc::UnboundedSender, +} + +impl ProverWorkerHandle { + pub(crate) fn new(tx: mpsc::UnboundedSender) -> Self { + Self { tx } + } + + /// Requests generation of the proof identified by `proof_id`. + /// + /// Returns the unsent request as an error if the orchestrator has shut down + /// (its receiver was dropped). + pub fn request_proof(&self, proof_id: ProofId) -> Result<(), mpsc::error::SendError> { + self.tx.send(proof_id) + } +} diff --git a/bin/asm-runner/src/prover/input.rs b/crates/extensions/prover/worker/src/input.rs similarity index 65% rename from bin/asm-runner/src/prover/input.rs rename to crates/extensions/prover/worker/src/input.rs index 049bb997..5b31283e 100644 --- a/bin/asm-runner/src/prover/input.rs +++ b/crates/extensions/prover/worker/src/input.rs @@ -1,70 +1,68 @@ //! Input preparation for proof generation. //! -//! Builds the [`RuntimeInput`] required by the ZkVM program for each proof type. - -use std::sync::Arc; +//! Builds the [`RuntimeInput`] required by the ZkVM program for each proof type, +//! reading every dependency (proofs, Moho state, anchor state, aux data, L1 +//! blocks) through the [`ProverContext`] rather than holding concrete handles. use anyhow::{Context, Result}; -use asm_storage::AsmStateDb; -use bitcoind_async_client::{Client, traits::Reader}; use moho_recursive_proof::{MohoRecursiveInput, MohoRecursiveOutput}; use moho_runtime_impl::RuntimeInput; use moho_types::{MohoState, RecursiveMohoProof, StepMohoAttestation, StepMohoProof}; use ssz::{Decode, Encode}; use strata_asm_proof_impl::moho_program::input::AsmStepInput; -use strata_asm_prover_storage::{MohoStateDb, ProofDb, SledMohoStateDb, SledProofDb}; use strata_asm_prover_types::L1Range; -use strata_btc_types::{BlockHashExt, L1BlockIdBitcoinExt}; -use strata_btc_verification::{self, TxidInclusionProof}; +use strata_btc_types::BlockHashExt; +use strata_btc_verification::TxidInclusionProof; use strata_identifiers::L1BlockCommitment; use strata_merkle::{BinaryMerkleTree, MerkleProofB32, Sha256NoPrefixHasher}; use strata_predicate::PredicateKey; use tree_hash::{Sha256Hasher as TreeSha256Hasher, TreeHash}; +use crate::ProverContext; + /// Builds [`RuntimeInput`] for proof generation, dispatching by proof type. -pub(crate) struct InputBuilder { - state_db: Arc, - bitcoin_client: Arc, - proof_db: SledProofDb, - moho_state_db: SledMohoStateDb, +/// +/// Holds only the values that are fixed for the lifetime of the prover (the +/// genesis commitment and the two predicate keys); all per-block data is read +/// from the [`ProverContext`] passed to each method. +#[derive(Debug)] +pub struct InputBuilder { genesis: L1BlockCommitment, asm_predicate: PredicateKey, moho_predicate: PredicateKey, } -pub(crate) struct MohoPrerequisite { +/// Prerequisites required to build a Moho recursive proof input for a block: +/// the inner ASM step proof and (unless genesis) the previous Moho proof. +#[derive(Debug)] +pub struct MohoPrerequisite { prev_moho_proof: Option, incremental_step_proof: StepMohoProof, } impl InputBuilder { - pub(crate) fn new( - state_db: Arc, - bitcoin_client: Arc, - proof_db: SledProofDb, - moho_state_db: SledMohoStateDb, + /// Creates a new input builder. + pub fn new( genesis: L1BlockCommitment, asm_predicate: PredicateKey, moho_predicate: PredicateKey, ) -> Self { Self { - state_db, - bitcoin_client, - proof_db, - moho_state_db, genesis, asm_predicate, moho_predicate, } } - async fn get_parent_commitment(&self, l1_ref: L1BlockCommitment) -> Result { - let block_hash = l1_ref.blkid().to_block_hash(); - let header = self - .bitcoin_client - .get_block_header(&block_hash) + async fn get_parent_commitment( + &self, + ctx: &C, + l1_ref: L1BlockCommitment, + ) -> Result { + let header = ctx + .get_l1_block_header(l1_ref.blkid()) .await - .context("failed to fetch Bitcoin block")?; + .context("failed to fetch Bitcoin block header")?; let parent_hash = header.prev_blockhash; let parent_height = l1_ref @@ -77,46 +75,55 @@ impl InputBuilder { } /// Fetches the persisted [`MohoState`] for the given L1 block. The worker - /// materializes this alongside each anchor state — see `AsmWorkerContext::store_anchor_state`. - async fn get_moho_state(&self, l1_ref: L1BlockCommitment) -> Result { - self.moho_state_db - .get_moho_state(l1_ref) + /// materializes this alongside each anchor state — see the runner's + /// `AsmWorkerContext::store_anchor_state`. + async fn get_moho_state( + &self, + ctx: &C, + l1_ref: L1BlockCommitment, + ) -> Result { + ctx.get_moho_state(l1_ref) .await .context("failed to fetch moho state")? .context("moho state not found for block") } - pub(crate) async fn check_moho_prerequisite( + /// Checks whether the prerequisites for a Moho recursive proof at `block` + /// are available, returning them if so. + pub async fn check_moho_prerequisite( &self, + ctx: &C, block: L1BlockCommitment, ) -> Result { // 1. ASM step proof is required. - let asm_proof = self - .proof_db + let asm_proof = ctx .get_asm_proof(L1Range::single(block)) - .await? + .await + .context("failed to fetch ASM step proof")? .context("ASM step proof not available yet for this block")?; let asm_receipt = asm_proof.0.receipt(); let asm_attestation = StepMohoAttestation::from_ssz_bytes(asm_receipt.public_values().as_bytes()) - .context("invalid ASM attestation in stored proof")?; + .map_err(|e| anyhow::anyhow!("invalid ASM attestation in stored proof: {e:?}"))?; let asm_step_proof = StepMohoProof::new(asm_attestation, asm_receipt.proof().as_bytes().to_vec()); // 2. Previous moho proof: required unless this is the genesis block. - let parent = self.get_parent_commitment(block).await?; + let parent = self.get_parent_commitment(ctx, block).await?; let prev_moho_proof = if parent == self.genesis { None } else { - let proof = self - .proof_db + let proof = ctx .get_moho_proof(parent) - .await? + .await + .context("failed to fetch previous moho proof")? .context("previous moho recursive proof not available yet")?; let receipt = proof.0.receipt(); let output = MohoRecursiveOutput::from_ssz_bytes(receipt.public_values().as_bytes()) - .context("invalid moho recursive output in stored proof")?; + .map_err(|e| { + anyhow::anyhow!("invalid moho recursive output in stored proof: {e:?}") + })?; Some(RecursiveMohoProof::new( output.attestation().clone(), receipt.proof().as_bytes().to_vec(), @@ -133,23 +140,23 @@ impl InputBuilder { /// /// This fetches the Bitcoin block and auxiliary data, reconstructs the /// pre-state, and assembles the input the ZkVM program expects. - pub(crate) async fn build_asm_runtime_input(&self, range: &L1Range) -> Result { + pub async fn build_asm_runtime_input( + &self, + ctx: &C, + range: &L1Range, + ) -> Result { let commitment = range.start(); // 1. Fetch the Bitcoin block. - let block_hash = commitment.blkid().to_block_hash(); - let block = self - .bitcoin_client - .get_block(&block_hash) + let block = ctx + .get_l1_block(commitment.blkid()) .await .context("failed to fetch Bitcoin block")?; // 2. Fetch the auxiliary data stored during STF execution. - let aux_data = self - .state_db + let aux_data = ctx .get_aux_data(&commitment) - .context("failed to fetch aux data")? - .context("aux data not found for block")?; + .context("failed to fetch aux data")?; let coinbase_inclusion_proof = match block.witness_root() { Some(_) => Some(TxidInclusionProof::generate(&block.txdata, 0)), @@ -160,17 +167,14 @@ impl InputBuilder { let step_input = AsmStepInput::new(block.clone(), aux_data, coinbase_inclusion_proof); // 4. Fetch the pre-state (anchor state for the parent block). - let parent_commitment = self.get_parent_commitment(commitment).await?; + let parent_commitment = self.get_parent_commitment(ctx, commitment).await?; - let asm_state = self - .state_db - .get(&parent_commitment) - .context("failed to fetch parent anchor state")? - .context("parent anchor state not found")?; - let anchor_state = asm_state.state(); + let anchor_state = ctx + .get_anchor_state(&parent_commitment) + .context("failed to fetch parent anchor state")?; // 5. Compute the Moho pre-state from the anchor state. - let moho_pre_state = self.get_moho_state(parent_commitment).await?; + let moho_pre_state = self.get_moho_state(ctx, parent_commitment).await?; // 6. Build RuntimeInput. let runtime_input = RuntimeInput::new( @@ -182,8 +186,10 @@ impl InputBuilder { Ok(runtime_input) } - pub(crate) async fn build_moho_runtime_input( + /// Builds the [`MohoRecursiveInput`] for a Moho recursive proof at `l1_ref`. + pub async fn build_moho_runtime_input( &self, + ctx: &C, prerequisite: MohoPrerequisite, l1_ref: L1BlockCommitment, ) -> Result { @@ -198,8 +204,8 @@ impl InputBuilder { // the ASM predicate. let step_predicate = self.asm_predicate.clone(); - let parent = self.get_parent_commitment(l1_ref).await?; - let parent_state = self.get_moho_state(parent).await?; + let parent = self.get_parent_commitment(ctx, l1_ref).await?; + let parent_state = self.get_moho_state(ctx, parent).await?; let leaves = vec![ <_ as TreeHash>::tree_hash_root::(&parent_state.inner_state) diff --git a/crates/extensions/prover/worker/src/lib.rs b/crates/extensions/prover/worker/src/lib.rs new file mode 100644 index 00000000..0b0a1918 --- /dev/null +++ b/crates/extensions/prover/worker/src/lib.rs @@ -0,0 +1,35 @@ +//! # strata-asm-prover-worker +//! +//! Orchestrates remote ASM step proofs and Moho recursive proofs. +//! +//! The worker defines a [`ProverContext`] umbrella trait abstracting its +//! storage and chain-data dependencies, and a [`ProofOrchestrator`] that drives +//! proof scheduling and reconciliation generically over it — mirroring how the +//! ASM worker (`strata-asm-worker`) is built. Concrete sled-backed storage +//! lives in the sibling `strata-asm-prover-storage` crate; the binary supplies +//! the `ProverContext` impl that wires storage and the Bitcoin client together. + +mod backend; +mod builder; +mod config; +mod errors; +mod handle; +mod input; +mod orchestrator; +mod proof_store; +mod queue; +mod traits; + +pub use backend::{ProofBackend, ProofHost}; +pub use builder::ProverWorkerBuilder; +pub use config::{BackendConfig, OrchestratorConfig}; +pub use errors::{ProverError, ProverResult}; +pub use handle::ProverWorkerHandle; +pub use input::{InputBuilder, MohoPrerequisite}; +pub use orchestrator::ProofOrchestrator; +pub use traits::{AnchorStateReader, AuxDataReader, L1BlockProvider, ProverContext}; +// In `sp1` builds the native host path is compiled out, leaving the +// `zkaleido-native-adapter` dependency otherwise unused; this keeps the +// `unused_crate_dependencies` lint satisfied. +#[cfg(feature = "sp1")] +use zkaleido_native_adapter as _; diff --git a/bin/asm-runner/src/prover/orchestrator.rs b/crates/extensions/prover/worker/src/orchestrator.rs similarity index 90% rename from bin/asm-runner/src/prover/orchestrator.rs rename to crates/extensions/prover/worker/src/orchestrator.rs index c40789b4..b95b0e9b 100644 --- a/bin/asm-runner/src/prover/orchestrator.rs +++ b/crates/extensions/prover/worker/src/orchestrator.rs @@ -8,40 +8,41 @@ use anyhow::{Context, Result}; use async_trait::async_trait; use moho_recursive_proof::MohoRecursiveProgram; use strata_asm_proof_impl::program::AsmStfProofProgram; -use strata_asm_prover_storage::{RemoteProofMappingDb, RemoteProofStatusDb, SledProofDb}; use strata_asm_prover_types::{ProofId, RemoteProofId}; use strata_tasks::ShutdownGuard; use tokio::{sync::mpsc, time}; use tracing::{debug, error, info, warn}; use zkaleido::{RemoteProofStatus, ZkVmRemoteHost, ZkVmRemoteProgram}; -use super::{ - config::OrchestratorConfig, input::InputBuilder, proof_store, queue::PendingProofQueue, +use crate::{ + ProverContext, config::OrchestratorConfig, input::InputBuilder, proof_store, + queue::PendingProofQueue, }; /// Orchestrates remote proof generation for ASM and Moho proofs. -pub(crate) struct ProofOrchestrator { - db: SledProofDb, +#[derive(Debug)] +pub struct ProofOrchestrator { + ctx: C, queue: PendingProofQueue, rx: mpsc::UnboundedReceiver, - asm: Host, - moho: Host, + asm: H, + moho: H, config: OrchestratorConfig, input_builder: InputBuilder, } -impl ProofOrchestrator { +impl ProofOrchestrator { /// Creates a new orchestrator. - pub(crate) fn new( - db: SledProofDb, - asm: R, - moho: R, + pub fn new( + ctx: C, + asm: H, + moho: H, config: OrchestratorConfig, input_builder: InputBuilder, rx: mpsc::UnboundedReceiver, ) -> Self { Self { - db, + ctx, queue: PendingProofQueue::new(), rx, asm, @@ -52,7 +53,7 @@ impl ProofOrchestrator { } /// Runs the orchestrator loop until shutdown is requested or the channel is closed. - pub(crate) async fn run(&mut self, shutdown: ShutdownGuard) -> Result<()> { + pub async fn run(&mut self, shutdown: ShutdownGuard) -> Result<()> { info!("proof orchestrator started"); loop { if let Err(e) = self.tick().await { @@ -107,7 +108,7 @@ impl ProofOrchestrator { /// Polls all in-progress remote proofs and stores any that have completed. async fn reconcile_active_proofs(&mut self) -> Result<()> { let in_progress = self - .db + .ctx .get_all_in_progress() .await .context("failed to query in-progress proofs")?; @@ -126,11 +127,11 @@ impl ProofOrchestrator { remote_id: &RemoteProofId, old_status: &RemoteProofStatus, ) -> Result<()> { - let typed_id = to_typed_proof_id::(remote_id)?; + let typed_id = to_typed_proof_id::(remote_id)?; // NOTE: We use `self.asm` here but this could be any `ZkVmRemoteHost` instance. // `get_status` only requires a network client and proof ID — not the ELF or - // proving key. Since the orchestrator is generic over a single `R: ZkVmRemoteHost`, + // proving key. Since the orchestrator is generic over a single `H: ZkVmRemoteHost`, // both `asm` and `moho` share the same concrete type, so either works. let new_status = self .asm @@ -155,13 +156,13 @@ impl ProofOrchestrator { } RemoteProofStatus::Failed(reason) => { error!(?remote_id, %reason, "remote proof generation failed"); - self.db + self.ctx .remove(remote_id) .await .context("failed to remove failed proof status")?; } _ => { - self.db + self.ctx .update_status(remote_id, new_status) .await .context("failed to update proof status")?; @@ -170,15 +171,15 @@ impl ProofOrchestrator { Ok(()) } - /// Retrieves a completed proof and stores it in the proof DB. + /// Retrieves a completed proof and stores it in the proof store. async fn handle_completed( &self, remote_id: &RemoteProofId, - typed_id: &R::ProofId, + typed_id: &H::ProofId, ) -> Result<()> { // NOTE: We use `self.asm` here but this could be any `ZkVmRemoteHost` instance. // `get_proof` only requires a network client and proof ID — not the ELF or - // proving key. Since the orchestrator is generic over a single `R: ZkVmRemoteHost`, + // proving key. Since the orchestrator is generic over a single `H: ZkVmRemoteHost`, // both `asm` and `moho` share the same concrete type, so either works. let receipt = self .asm @@ -187,15 +188,15 @@ impl ProofOrchestrator { .map_err(|e| anyhow::anyhow!("failed to retrieve completed proof: {e}"))?; let proof_id = self - .db + .ctx .get_proof_id(remote_id) .await .context("failed to look up proof ID from remote ID")? .context("no mapping found for completed remote proof")?; - proof_store::store_completed_proof(&self.db, proof_id, receipt).await?; + proof_store::store_completed_proof(&self.ctx, proof_id, receipt).await?; - self.db + self.ctx .remove(remote_id) .await .context("failed to remove completed proof status")?; @@ -213,7 +214,7 @@ impl ProofOrchestrator { /// unit-tested with a fake submitter. async fn schedule_proofs(&mut self) -> Result<()> { let in_flight = self - .db + .ctx .get_all_in_progress() .await .context("failed to query in-progress proofs")? @@ -225,7 +226,7 @@ impl ProofOrchestrator { } let mut submitter = OrchestratorSubmitter { - db: &self.db, + ctx: &self.ctx, asm: &self.asm, moho: &self.moho, input_builder: &self.input_builder, @@ -256,7 +257,7 @@ enum SubmitOutcome { /// return non-`Send` futures to accommodate backends whose clients hold /// non-`Send` state across `.await`. Awaiting them here transitively makes /// `try_submit` non-`Send`. This is fine because the orchestrator is driven -/// from a `LocalSet` (see `bootstrap.rs`). +/// from a `LocalSet` (see the runner's `bootstrap`). #[async_trait(?Send)] trait ProofSubmitter { async fn try_submit(&mut self, proof_id: ProofId) -> Result; @@ -300,22 +301,22 @@ async fn schedule_with( } } -/// [`ProofSubmitter`] backed by the orchestrator's DB, hosts, and input +/// [`ProofSubmitter`] backed by the orchestrator's context, hosts, and input /// builder. Constructed inline by [`ProofOrchestrator::schedule_proofs`] for /// the duration of one scheduling cycle. -struct OrchestratorSubmitter<'a, R: ZkVmRemoteHost> { - db: &'a SledProofDb, - asm: &'a R, - moho: &'a R, +struct OrchestratorSubmitter<'a, C: ProverContext, H: ZkVmRemoteHost> { + ctx: &'a C, + asm: &'a H, + moho: &'a H, input_builder: &'a InputBuilder, } #[async_trait(?Send)] -impl ProofSubmitter for OrchestratorSubmitter<'_, R> { +impl ProofSubmitter for OrchestratorSubmitter<'_, C, H> { async fn try_submit(&mut self, proof_id: ProofId) -> Result { // Skip if already submitted. if self - .db + .ctx .get_remote_proof_id(proof_id) .await .context("failed to check remote proof mapping")? @@ -326,7 +327,7 @@ impl ProofSubmitter for OrchestratorSubmitter<'_, R> { } // Skip if proof already exists locally. - if proof_store::proof_exists(self.db, &proof_id).await? { + if proof_store::proof_exists(self.ctx, &proof_id).await? { debug!(?proof_id, "proof already exists, skipping"); return Ok(SubmitOutcome::Skipped); } @@ -334,13 +335,20 @@ impl ProofSubmitter for OrchestratorSubmitter<'_, R> { // Build input and submit to remote prover, dispatching by proof type. let typed_id = match &proof_id { ProofId::Asm(range) => { - let runtime_input = self.input_builder.build_asm_runtime_input(range).await?; + let runtime_input = self + .input_builder + .build_asm_runtime_input(self.ctx, range) + .await?; AsmStfProofProgram::start_proving(&runtime_input, self.asm) .await .map_err(|e| anyhow::anyhow!("failed to submit proof to remote prover: {e}"))? } ProofId::Moho(block) => { - let prerequisite = match self.input_builder.check_moho_prerequisite(*block).await { + let prerequisite = match self + .input_builder + .check_moho_prerequisite(self.ctx, *block) + .await + { Ok(prereq) => prereq, Err(e) => { debug!(?proof_id, %e, "moho prerequisite not ready, deferring"); @@ -349,7 +357,7 @@ impl ProofSubmitter for OrchestratorSubmitter<'_, R> { }; let input = self .input_builder - .build_moho_runtime_input(prerequisite, *block) + .build_moho_runtime_input(self.ctx, prerequisite, *block) .await?; MohoRecursiveProgram::start_proving(&input, self.moho) .await @@ -361,12 +369,12 @@ impl ProofSubmitter for OrchestratorSubmitter<'_, R> { info!(?proof_id, %typed_id, "proof submitted to remote prover"); // Store mapping and initial status. - self.db + self.ctx .put_remote_proof_id(proof_id, remote_id.clone()) .await .context("failed to store proof mapping")?; - self.db + self.ctx .put_status(&remote_id, RemoteProofStatus::Requested) .await .context("failed to store initial proof status")?; @@ -376,8 +384,8 @@ impl ProofSubmitter for OrchestratorSubmitter<'_, R> { } /// Converts a persisted [`RemoteProofId`] back into the host's typed proof ID. -fn to_typed_proof_id(remote_id: &RemoteProofId) -> Result { - R::ProofId::try_from(remote_id.0.clone()) +fn to_typed_proof_id(remote_id: &RemoteProofId) -> Result { + H::ProofId::try_from(remote_id.0.clone()) .map_err(|_| anyhow::anyhow!("failed to decode remote proof ID")) } diff --git a/bin/asm-runner/src/prover/proof_store.rs b/crates/extensions/prover/worker/src/proof_store.rs similarity index 65% rename from bin/asm-runner/src/prover/proof_store.rs rename to crates/extensions/prover/worker/src/proof_store.rs index f3280a72..bb889e2b 100644 --- a/bin/asm-runner/src/prover/proof_store.rs +++ b/crates/extensions/prover/worker/src/proof_store.rs @@ -1,19 +1,21 @@ -//! Proof DB helpers that dispatch on [`ProofId`] variants. +//! Proof-store helpers that dispatch on [`ProofId`] variants. //! //! These free functions encapsulate the match-on-ProofId pattern, keeping the -//! orchestrator focused on coordination logic. +//! orchestrator focused on coordination logic. They operate through the +//! [`ProofDb`] surface of the [`ProverContext`]. use anyhow::{Context, Result}; -use strata_asm_prover_storage::{ProofDb, SledProofDb}; use strata_asm_prover_types::{AsmProof, MohoProof, ProofId}; use tracing::info; use zkaleido::ProofReceiptWithMetadata; -/// Returns `true` if the proof already exists in the local proof DB. -pub(crate) async fn proof_exists(db: &SledProofDb, proof_id: &ProofId) -> Result { +use crate::ProverContext; + +/// Returns `true` if the proof already exists in the local proof store. +pub(crate) async fn proof_exists(ctx: &C, proof_id: &ProofId) -> Result { match proof_id { ProofId::Asm(range) => { - let exists = db + let exists = ctx .get_asm_proof(*range) .await .context("failed to check ASM proof")? @@ -21,7 +23,7 @@ pub(crate) async fn proof_exists(db: &SledProofDb, proof_id: &ProofId) -> Result Ok(exists) } ProofId::Moho(commitment) => { - let exists = db + let exists = ctx .get_moho_proof(*commitment) .await .context("failed to check Moho proof")? @@ -31,22 +33,22 @@ pub(crate) async fn proof_exists(db: &SledProofDb, proof_id: &ProofId) -> Result } } -/// Stores a completed proof receipt in the appropriate DB table. -pub(crate) async fn store_completed_proof( - db: &SledProofDb, +/// Stores a completed proof receipt in the appropriate proof-store table. +pub(crate) async fn store_completed_proof( + ctx: &C, proof_id: ProofId, receipt: ProofReceiptWithMetadata, ) -> Result<()> { match proof_id { ProofId::Asm(range) => { info!(?range, "storing completed ASM proof"); - db.store_asm_proof(range, AsmProof(receipt)) + ctx.store_asm_proof(range, AsmProof(receipt)) .await .context("failed to store ASM proof")?; } ProofId::Moho(commitment) => { info!(?commitment, "storing completed Moho proof"); - db.store_moho_proof(commitment, MohoProof(receipt)) + ctx.store_moho_proof(commitment, MohoProof(receipt)) .await .context("failed to store Moho proof")?; } diff --git a/bin/asm-runner/src/prover/queue.rs b/crates/extensions/prover/worker/src/queue.rs similarity index 100% rename from bin/asm-runner/src/prover/queue.rs rename to crates/extensions/prover/worker/src/queue.rs diff --git a/crates/extensions/prover/worker/src/traits.rs b/crates/extensions/prover/worker/src/traits.rs new file mode 100644 index 00000000..ff0f65e0 --- /dev/null +++ b/crates/extensions/prover/worker/src/traits.rs @@ -0,0 +1,88 @@ +//! Traits the prover worker uses to interface with the underlying system. +//! +//! The orchestrator's dependencies split into concerns, each backed by a +//! distinct subsystem in production: +//! +//! - Proof persistence and remote-job tracking — reused verbatim from `strata-asm-prover-storage` +//! ([`ProofDb`], [`RemoteProofMappingDb`], [`RemoteProofStatusDb`], [`MohoStateDb`]). +//! - [`AnchorStateReader`] — reads persisted ASM anchor states. +//! - [`AuxDataReader`] — reads per-block auxiliary data captured during STF execution. +//! - [`L1BlockProvider`] — reads L1 blocks/headers from the Bitcoin source. +//! +//! [`ProverContext`] is the umbrella that combines all of them. It has a blanket +//! impl, so an implementor just implements the concern traits and gets +//! `ProverContext` for free — mirroring +//! [`WorkerContext`](https://docs.rs/strata-asm-worker) in the ASM worker. + +use std::error::Error as StdError; + +use bitcoin::{Block, block::Header}; +use strata_asm_common::{AnchorState, AuxData}; +use strata_asm_prover_storage::{MohoStateDb, ProofDb, RemoteProofMappingDb, RemoteProofStatusDb}; +use strata_identifiers::{L1BlockCommitment, L1BlockId}; + +/// Reads the persisted ASM anchor state for a given L1 block. +pub trait AnchorStateReader { + /// Fetches the [`AnchorState`] for the block at `blockid`. + /// + /// Errors if the state is missing — the orchestrator only requests proofs + /// for blocks the ASM worker has already processed. + fn get_anchor_state(&self, blockid: &L1BlockCommitment) -> anyhow::Result; +} + +/// Reads per-block auxiliary data captured during STF execution. +pub trait AuxDataReader { + /// Fetches the [`AuxData`] stored for the block at `blockid`. + fn get_aux_data(&self, blockid: &L1BlockCommitment) -> anyhow::Result; +} + +/// Fetches L1 blocks and headers from the backing Bitcoin source. +/// +/// Async because the backing client is async, and the orchestrator drives this +/// from a single-threaded runtime where blocking on the client would deadlock. +pub trait L1BlockProvider { + /// Fetches the full Bitcoin [`Block`] for `blockid`. + fn get_l1_block(&self, blockid: &L1BlockId) -> impl Future>; + + /// Fetches just the [`Header`] for `blockid`. + /// + /// Used to resolve a block's parent commitment (`prev_blockhash`) without + /// pulling the full transaction data. + fn get_l1_block_header( + &self, + blockid: &L1BlockId, + ) -> impl Future>; +} + +/// Umbrella context the [`ProofOrchestrator`](crate::ProofOrchestrator) runs +/// against. +/// +/// Combines proof persistence and remote-job tracking (reused from +/// `strata-asm-prover-storage`), Moho-state reads, and the chain/state reads the +/// [`InputBuilder`](crate::InputBuilder) needs. The blanket impl means any type +/// that implements all of the concern traits automatically implements +/// `ProverContext`, so implementors never name it directly. +/// +/// The associated `Error` bounds let the orchestrator surface storage failures +/// through `anyhow::Context`; every concrete backend already satisfies them. +pub trait ProverContext: + ProofDb + + RemoteProofMappingDb + + RemoteProofStatusDb + + MohoStateDb + + AnchorStateReader + + AuxDataReader + + L1BlockProvider +{ +} + +impl ProverContext for T where + T: ProofDb + + RemoteProofMappingDb + + RemoteProofStatusDb + + MohoStateDb + + AnchorStateReader + + AuxDataReader + + L1BlockProvider +{ +} diff --git a/crates/rpc/Cargo.toml b/crates/rpc/Cargo.toml index 55498823..f6b7cd68 100644 --- a/crates/rpc/Cargo.toml +++ b/crates/rpc/Cargo.toml @@ -7,10 +7,10 @@ version = "0.1.0" workspace = true [dependencies] -strata-asm-prover-types.workspace = true strata-asm-proto-bridge-v1.workspace = true strata-asm-proto-bridge-v1-types.workspace = true strata-asm-proto-checkpoint-types.workspace = true +strata-asm-prover-types.workspace = true strata-asm-worker.workspace = true bitcoin.workspace = true From 996cedee12ce1c3ab3eb0aa8f0088d2089c7b3a7 Mon Sep 17 00:00:00 2001 From: Prajwol Gyawali Date: Sun, 7 Jun 2026 16:31:07 +0545 Subject: [PATCH 5/6] feat(worker): add Subscription::try_recv for non-blocking drain A periodically-ticking consumer (the prover orchestrator) needs to drain every buffered commitment at the top of each tick without parking on the channel, and to learn that the worker has shut down once the backlog is empty. recv() can only await; try_recv() covers both via Empty/Disconnected. --- crates/worker/src/subscription.rs | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/crates/worker/src/subscription.rs b/crates/worker/src/subscription.rs index 8353a7cd..048e360e 100644 --- a/crates/worker/src/subscription.rs +++ b/crates/worker/src/subscription.rs @@ -19,7 +19,7 @@ use std::{ use futures::Stream; use strata_identifiers::L1BlockCommitment; -use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; +use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender, error::TryRecvError}; /// A live stream of items emitted by the ASM worker, one per committed block. /// @@ -51,6 +51,16 @@ impl Subscription { pub async fn recv(&mut self) -> Option { self.rx.recv().await } + + /// Pulls the next already-queued item without awaiting. + /// + /// Returns [`TryRecvError::Empty`] when nothing is queued right now, and + /// [`TryRecvError::Disconnected`] once the worker has shut down and the + /// backlog is drained. Lets a periodically-ticking consumer drain everything + /// buffered at the top of each tick without parking on the channel. + pub fn try_recv(&mut self) -> Result { + self.rx.try_recv() + } } impl Stream for Subscription { @@ -156,4 +166,22 @@ mod tests { drop(subs); assert_eq!(sub.recv().await, None); } + + #[tokio::test] + async fn try_recv_drains_then_reports_empty_and_disconnected() { + let subs = AsmSubscribers::default(); + let mut sub = subs.subscribe(); + + subs.emit(commitment(1)); + subs.emit(commitment(2)); + + assert_eq!(sub.try_recv(), Ok(commitment(1))); + assert_eq!(sub.try_recv(), Ok(commitment(2))); + // Backlog drained but the producer is still live. + assert_eq!(sub.try_recv(), Err(TryRecvError::Empty)); + + // Once every sender is gone, a drained subscription reports disconnect. + drop(subs); + assert_eq!(sub.try_recv(), Err(TryRecvError::Disconnected)); + } } From a0e97377e50fef6535cb4492aa6914d7ebeef5e5 Mon Sep 17 00:00:00 2001 From: Prajwol Gyawali Date: Sun, 7 Jun 2026 16:31:22 +0545 Subject: [PATCH 6/6] feat(prover): drive proof orchestration from the ASM commit subscription Proof requests were enqueued from the block watcher before the ASM worker had committed the block. Subscribe the orchestrator to the worker's post-commit stream instead: it expands each committed L1BlockCommitment into its ASM step proof and Moho recursive proof internally, so proofs are requested only for durably-stored blocks. The subscription becomes the orchestrator's sole input, dropping the ProverWorkerHandle/request_proof channel; the block watcher returns to only feeding blocks to the worker. --- Cargo.lock | 1 + bin/asm-runner/src/block_watcher.rs | 30 ++--- bin/asm-runner/src/bootstrap.rs | 109 +++++++++--------- crates/extensions/prover/worker/Cargo.toml | 1 + .../extensions/prover/worker/src/builder.rs | 52 ++++++--- crates/extensions/prover/worker/src/handle.rs | 30 ----- crates/extensions/prover/worker/src/lib.rs | 6 +- .../prover/worker/src/orchestrator.rs | 57 ++++++--- 8 files changed, 145 insertions(+), 141 deletions(-) delete mode 100644 crates/extensions/prover/worker/src/handle.rs diff --git a/Cargo.lock b/Cargo.lock index f55d98aa..b23da6ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8046,6 +8046,7 @@ dependencies = [ "strata-asm-proof-impl", "strata-asm-prover-storage", "strata-asm-prover-types", + "strata-asm-worker", "strata-btc-types", "strata-btc-verification", "strata-identifiers", diff --git a/bin/asm-runner/src/block_watcher.rs b/bin/asm-runner/src/block_watcher.rs index 02113c52..827f51fc 100644 --- a/bin/asm-runner/src/block_watcher.rs +++ b/bin/asm-runner/src/block_watcher.rs @@ -15,8 +15,6 @@ use bitcoin::Block; use bitcoincore_zmq::{Message, SocketMessage, subscribe_async_wait_handshake}; use bitcoind_async_client::{Client, traits::Reader}; use futures::StreamExt; -use strata_asm_prover_types::{L1Range, ProofId}; -use strata_asm_prover_worker::ProverWorkerHandle; use strata_asm_worker::AsmWorkerHandle; use strata_btc_types::BlockHashExt; use strata_identifiers::L1BlockCommitment; @@ -38,7 +36,6 @@ pub(crate) async fn drive_asm_from_bitcoin( bitcoin_client: Arc, asm_worker: Arc, start_height: u64, - proof_handle: Option, shutdown: ShutdownGuard, ) -> Result<()> { info!(%start_height, "starting ASM block watcher"); @@ -106,7 +103,7 @@ pub(crate) async fn drive_asm_from_bitcoin( for height in cursor..received_height { match fetch_block_at_height(&bitcoin_client, height).await { Ok(fetched) => { - if let Err(err) = submit_block(&asm_worker, &proof_handle, fetched).await { + if let Err(err) = submit_block(&asm_worker, fetched).await { error!(%height, ?err, "failed to submit backfill block"); // Stop backfilling on failure so we don't hand the // worker a gap. The next ZMQ event will retry. @@ -121,7 +118,7 @@ pub(crate) async fn drive_asm_from_bitcoin( } } - if let Err(err) = submit_block(&asm_worker, &proof_handle, block).await { + if let Err(err) = submit_block(&asm_worker, block).await { error!(%received_height, ?err, "failed to submit block from ZMQ"); } cursor = received_height + 1; @@ -141,12 +138,12 @@ async fn fetch_block_at_height(client: &Client, height: u64) -> Result { Ok(block) } -/// Submit a block to the ASM worker and, optionally, enqueue a proof request. -async fn submit_block( - asm_worker: &AsmWorkerHandle, - proof_handle: &Option, - block: Block, -) -> Result<()> { +/// Submit a block to the ASM worker. +/// +/// Proof requests are not issued here: the prover worker subscribes to the ASM +/// worker's commit stream and derives them from each *committed* block, so they +/// fire only after the block is durably stored. +async fn submit_block(asm_worker: &AsmWorkerHandle, block: Block) -> Result<()> { let height = block.bip34_block_height().unwrap_or(0); let hash = block.block_hash(); let block_id = hash.to_l1_block_id(); @@ -159,16 +156,5 @@ async fn submit_block( debug!(%height, %hash, "submitted block to ASM worker"); - if let Some(handle) = proof_handle { - let asm_proof_id = ProofId::Asm(L1Range::single(commitment)); - if let Err(err) = handle.request_proof(asm_proof_id) { - warn!(%height, %hash, ?err, "failed to enqueue ASM proof request"); - } - let moho_proof_id = ProofId::Moho(commitment); - if let Err(err) = handle.request_proof(moho_proof_id) { - warn!(%height, %hash, ?err, "failed to enqueue Moho proof request"); - } - } - Ok(()) } diff --git a/bin/asm-runner/src/bootstrap.rs b/bin/asm-runner/src/bootstrap.rs index 1c9fc3d2..1aa92674 100644 --- a/bin/asm-runner/src/bootstrap.rs +++ b/bin/asm-runner/src/bootstrap.rs @@ -83,61 +83,63 @@ pub(crate) async fn bootstrap( let asm_worker = Arc::new(asm_worker); // 7. Finish orchestrator wiring if it was configured. - let (proof_handle, proof_rpc_deps) = - if let Some((orch_config, proof_db, moho_state_db, backend)) = orch_prep { - let rpc_deps = AsmProofRpcDeps { - proof_db: proof_db.clone(), - moho_state_db: moho_state_db.clone(), - export_entries_db: export_entries_db.clone(), - }; - - let ProofBackend { - asm_host, - moho_host, - asm_predicate, - moho_predicate, - } = backend; - - // The prover context wires the proof store, moho-state store, ASM - // anchor-state store, and Bitcoin client into the worker's traits. - let prover_ctx = AsmProverContext::new( - proof_db, - moho_state_db, - state_db.clone(), - bitcoin_client.clone(), - ); - let input_builder = - InputBuilder::new(params.anchor.block, asm_predicate, moho_predicate); - - let (handle, mut orchestrator) = ProverWorkerBuilder::new() - .with_context(prover_ctx) - .with_hosts(asm_host, moho_host) - .with_config(orch_config) - .with_input_builder(input_builder) - .build()?; - - // ZkVmRemoteProver is !Send (#[async_trait(?Send)]), so the orchestrator - // future cannot be spawned on a multi-threaded runtime directly. We run it - // on a dedicated thread with a single-threaded runtime + LocalSet. - executor.spawn_critical_async_with_shutdown( - "proof_orchestrator", - move |shutdown| async move { - task::spawn_blocking(move || { - let rt = RuntimeBuilder::new_current_thread().enable_all().build()?; - let local = LocalSet::new(); - rt.block_on( - local.run_until(async move { orchestrator.run(shutdown).await }), - ) - }) - .await? - }, - ); - - (Some(handle), Some(rpc_deps)) - } else { - (None, None) + let proof_rpc_deps = if let Some((orch_config, proof_db, moho_state_db, backend)) = orch_prep { + let rpc_deps = AsmProofRpcDeps { + proof_db: proof_db.clone(), + moho_state_db: moho_state_db.clone(), + export_entries_db: export_entries_db.clone(), }; + let ProofBackend { + asm_host, + moho_host, + asm_predicate, + moho_predicate, + } = backend; + + // The prover context wires the proof store, moho-state store, ASM + // anchor-state store, and Bitcoin client into the worker's traits. + let prover_ctx = AsmProverContext::new( + proof_db, + moho_state_db, + state_db.clone(), + bitcoin_client.clone(), + ); + let input_builder = InputBuilder::new(params.anchor.block, asm_predicate, moho_predicate); + + // Subscribe before the block watcher (spawned below) starts feeding the + // worker. The stream has no replay buffer, but the worker only commits + // blocks the watcher hands it, so subscribing here misses nothing. + let block_subscription = asm_worker.subscribe_blocks(); + + let mut orchestrator = ProverWorkerBuilder::new() + .with_context(prover_ctx) + .with_hosts(asm_host, moho_host) + .with_config(orch_config) + .with_input_builder(input_builder) + .with_block_subscription(block_subscription) + .build()?; + + // ZkVmRemoteProver is !Send (#[async_trait(?Send)]), so the orchestrator + // future cannot be spawned on a multi-threaded runtime directly. We run it + // on a dedicated thread with a single-threaded runtime + LocalSet. + executor.spawn_critical_async_with_shutdown( + "proof_orchestrator", + move |shutdown| async move { + task::spawn_blocking(move || { + let rt = RuntimeBuilder::new_current_thread().enable_all().build()?; + let local = LocalSet::new(); + rt.block_on(local.run_until(async move { orchestrator.run(shutdown).await })) + }) + .await? + }, + ); + + Some(rpc_deps) + } else { + None + }; + // 8. Spawn block watcher as a critical task. let asm_worker_for_driver = asm_worker.clone(); let bitcoin_config = config.bitcoin.clone(); @@ -148,7 +150,6 @@ pub(crate) async fn bootstrap( bitcoin_client_for_driver, asm_worker_for_driver, start_height as u64, - proof_handle, shutdown, ) }); diff --git a/crates/extensions/prover/worker/Cargo.toml b/crates/extensions/prover/worker/Cargo.toml index 846f1fa7..49978572 100644 --- a/crates/extensions/prover/worker/Cargo.toml +++ b/crates/extensions/prover/worker/Cargo.toml @@ -14,6 +14,7 @@ strata-asm-common.workspace = true strata-asm-proof-impl.workspace = true strata-asm-prover-storage.workspace = true strata-asm-prover-types.workspace = true +strata-asm-worker.workspace = true strata-btc-types.workspace = true strata-btc-verification.workspace = true strata-identifiers.workspace = true diff --git a/crates/extensions/prover/worker/src/builder.rs b/crates/extensions/prover/worker/src/builder.rs index 397c16ae..b384c029 100644 --- a/crates/extensions/prover/worker/src/builder.rs +++ b/crates/extensions/prover/worker/src/builder.rs @@ -1,20 +1,21 @@ //! Builder for assembling a prover worker. -use strata_asm_prover_types::ProofId; -use tokio::sync::mpsc; +use strata_asm_worker::Subscription; +use strata_identifiers::L1BlockCommitment; use zkaleido::ZkVmRemoteHost; use crate::{ - InputBuilder, ProofOrchestrator, ProverContext, ProverWorkerHandle, + InputBuilder, ProofOrchestrator, ProverContext, config::OrchestratorConfig, errors::{ProverError, ProverResult}, }; /// Builder for assembling a prover worker. /// -/// Wires the context, remote hosts, config, and input builder into a -/// [`ProofOrchestrator`], returning it alongside a [`ProverWorkerHandle`] over a -/// freshly created request channel. +/// Wires the context, remote hosts, config, input builder, and the ASM worker's +/// commit subscription into a [`ProofOrchestrator`]. The orchestrator's only +/// input is that subscription: it turns each committed [`L1BlockCommitment`] +/// into the proofs the block requires. /// /// The orchestrator is *not* spawned here: its run loop is `!Send` (the /// upstream `ZkVmRemoteProver` is `#[async_trait(?Send)]`), so the caller must @@ -27,6 +28,7 @@ pub struct ProverWorkerBuilder { moho_host: Option, config: Option, input_builder: Option, + subscription: Option>, } impl ProverWorkerBuilder { @@ -38,6 +40,7 @@ impl ProverWorkerBuilder { moho_host: None, config: None, input_builder: None, + subscription: None, } } @@ -65,14 +68,25 @@ impl ProverWorkerBuilder { self.input_builder = Some(input_builder); self } + + /// Sets the ASM worker commit subscription that drives the orchestrator. + /// + /// Subscribe *before* the worker starts processing blocks — there is no + /// replay buffer, so any block committed before this subscription exists is + /// not seen. + pub fn with_block_subscription( + mut self, + subscription: Subscription, + ) -> Self { + self.subscription = Some(subscription); + self + } } impl ProverWorkerBuilder { - /// Validates the supplied dependencies and assembles the orchestrator. - /// - /// Returns the [`ProverWorkerHandle`] for enqueuing proof requests and the - /// [`ProofOrchestrator`] for the caller to drive. - pub fn build(self) -> ProverResult<(ProverWorkerHandle, ProofOrchestrator)> { + /// Validates the supplied dependencies and assembles the orchestrator for + /// the caller to drive. + pub fn build(self) -> ProverResult> { let ctx = self.ctx.ok_or(ProverError::MissingDependency("context"))?; let asm_host = self .asm_host @@ -86,12 +100,18 @@ impl ProverWorkerBuilder { let input_builder = self .input_builder .ok_or(ProverError::MissingDependency("input_builder"))?; + let subscription = self + .subscription + .ok_or(ProverError::MissingDependency("subscription"))?; - let (tx, rx) = mpsc::unbounded_channel::(); - let orchestrator = - ProofOrchestrator::new(ctx, asm_host, moho_host, config, input_builder, rx); - - Ok((ProverWorkerHandle::new(tx), orchestrator)) + Ok(ProofOrchestrator::new( + ctx, + asm_host, + moho_host, + config, + input_builder, + subscription, + )) } } diff --git a/crates/extensions/prover/worker/src/handle.rs b/crates/extensions/prover/worker/src/handle.rs deleted file mode 100644 index a46fca04..00000000 --- a/crates/extensions/prover/worker/src/handle.rs +++ /dev/null @@ -1,30 +0,0 @@ -//! Handle for requesting proofs from the prover worker. - -use strata_asm_prover_types::ProofId; -use tokio::sync::mpsc; - -/// Handle for submitting proof requests to a running -/// [`ProofOrchestrator`](crate::ProofOrchestrator). -/// -/// Wraps the sender side of the orchestrator's request channel. The block -/// watcher uses this to enqueue ASM/Moho proofs as new L1 blocks are processed. -/// Cloneable and cheap to pass around; dropping every clone signals the -/// orchestrator to drain and shut down. -#[derive(Debug, Clone)] -pub struct ProverWorkerHandle { - tx: mpsc::UnboundedSender, -} - -impl ProverWorkerHandle { - pub(crate) fn new(tx: mpsc::UnboundedSender) -> Self { - Self { tx } - } - - /// Requests generation of the proof identified by `proof_id`. - /// - /// Returns the unsent request as an error if the orchestrator has shut down - /// (its receiver was dropped). - pub fn request_proof(&self, proof_id: ProofId) -> Result<(), mpsc::error::SendError> { - self.tx.send(proof_id) - } -} diff --git a/crates/extensions/prover/worker/src/lib.rs b/crates/extensions/prover/worker/src/lib.rs index 0b0a1918..4ded7bb9 100644 --- a/crates/extensions/prover/worker/src/lib.rs +++ b/crates/extensions/prover/worker/src/lib.rs @@ -5,7 +5,9 @@ //! The worker defines a [`ProverContext`] umbrella trait abstracting its //! storage and chain-data dependencies, and a [`ProofOrchestrator`] that drives //! proof scheduling and reconciliation generically over it — mirroring how the -//! ASM worker (`strata-asm-worker`) is built. Concrete sled-backed storage +//! ASM worker (`strata-asm-worker`) is built. The orchestrator is fed by the ASM +//! worker's commit subscription: each committed block expands into the ASM step +//! proof and Moho recursive proof it requires. Concrete sled-backed storage //! lives in the sibling `strata-asm-prover-storage` crate; the binary supplies //! the `ProverContext` impl that wires storage and the Bitcoin client together. @@ -13,7 +15,6 @@ mod backend; mod builder; mod config; mod errors; -mod handle; mod input; mod orchestrator; mod proof_store; @@ -24,7 +25,6 @@ pub use backend::{ProofBackend, ProofHost}; pub use builder::ProverWorkerBuilder; pub use config::{BackendConfig, OrchestratorConfig}; pub use errors::{ProverError, ProverResult}; -pub use handle::ProverWorkerHandle; pub use input::{InputBuilder, MohoPrerequisite}; pub use orchestrator::ProofOrchestrator; pub use traits::{AnchorStateReader, AuxDataReader, L1BlockProvider, ProverContext}; diff --git a/crates/extensions/prover/worker/src/orchestrator.rs b/crates/extensions/prover/worker/src/orchestrator.rs index b95b0e9b..beb9cdc3 100644 --- a/crates/extensions/prover/worker/src/orchestrator.rs +++ b/crates/extensions/prover/worker/src/orchestrator.rs @@ -8,9 +8,11 @@ use anyhow::{Context, Result}; use async_trait::async_trait; use moho_recursive_proof::MohoRecursiveProgram; use strata_asm_proof_impl::program::AsmStfProofProgram; -use strata_asm_prover_types::{ProofId, RemoteProofId}; +use strata_asm_prover_types::{L1Range, ProofId, RemoteProofId}; +use strata_asm_worker::Subscription; +use strata_identifiers::L1BlockCommitment; use strata_tasks::ShutdownGuard; -use tokio::{sync::mpsc, time}; +use tokio::{sync::mpsc::error::TryRecvError, time}; use tracing::{debug, error, info, warn}; use zkaleido::{RemoteProofStatus, ZkVmRemoteHost, ZkVmRemoteProgram}; @@ -20,11 +22,18 @@ use crate::{ }; /// Orchestrates remote proof generation for ASM and Moho proofs. +/// +/// The orchestrator's only input is the ASM worker's commit stream: each +/// [`L1BlockCommitment`] arrives *after* the block has been durably stored, and +/// the orchestrator expands it into the ASM step proof and the Moho recursive +/// proof that block requires (see [`drain_committed_blocks`]). +/// +/// [`drain_committed_blocks`]: ProofOrchestrator::drain_committed_blocks #[derive(Debug)] pub struct ProofOrchestrator { ctx: C, queue: PendingProofQueue, - rx: mpsc::UnboundedReceiver, + subscription: Subscription, asm: H, moho: H, config: OrchestratorConfig, @@ -39,12 +48,12 @@ impl ProofOrchestrator { moho: H, config: OrchestratorConfig, input_builder: InputBuilder, - rx: mpsc::UnboundedReceiver, + subscription: Subscription, ) -> Self { Self { ctx, queue: PendingProofQueue::new(), - rx, + subscription, asm, moho, config, @@ -52,10 +61,15 @@ impl ProofOrchestrator { } } - /// Runs the orchestrator loop until shutdown is requested or the channel is closed. + /// Runs the orchestrator loop until shutdown is requested or the ASM + /// worker's commit stream closes. pub async fn run(&mut self, shutdown: ShutdownGuard) -> Result<()> { info!("proof orchestrator started"); loop { + // Pull every block committed since the last tick and expand each into + // its proof requests before doing the periodic reconcile/schedule work. + let closed = self.drain_committed_blocks(); + if let Err(e) = self.tick().await { error!(?e, "orchestrator tick failed"); } @@ -65,9 +79,9 @@ impl ProofOrchestrator { return Ok(()); } - // Exit once the sender side has been dropped (shutdown) and there is - // nothing left to process. - if self.rx.is_closed() && self.queue.is_empty() { + // The ASM worker dropped the commit stream (shutdown) and there is + // nothing left to prove. + if closed && self.queue.is_empty() { info!("proof orchestrator shutting down"); return Ok(()); } @@ -82,18 +96,29 @@ impl ProofOrchestrator { } } - /// Drains incoming proof requests from the channel into the pending queue. - fn drain_incoming(&mut self) { - while let Ok(id) = self.rx.try_recv() { - debug!(?id, "received proof request"); - self.queue.enqueue(id); + /// Drains committed-block notifications from the ASM worker into the pending + /// queue, expanding each block into the ASM step proof and the Moho + /// recursive proof it requires. + /// + /// Returns `true` once the subscription has closed — the ASM worker shut + /// down and the backlog is drained — so the caller can exit after the queue + /// empties. + fn drain_committed_blocks(&mut self) -> bool { + loop { + match self.subscription.try_recv() { + Ok(block) => { + debug!(%block, "ASM worker committed block, enqueuing proofs"); + self.queue.enqueue(ProofId::Asm(L1Range::single(block))); + self.queue.enqueue(ProofId::Moho(block)); + } + Err(TryRecvError::Empty) => return false, + Err(TryRecvError::Disconnected) => return true, + } } } /// Executes one orchestration cycle. async fn tick(&mut self) -> Result<()> { - self.drain_incoming(); - if !self.queue.is_empty() { debug!(pending = self.queue.len(), "orchestrator tick"); }