Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 44 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ members = [
"crates/worker",
"crates/storage",

# extensions
"crates/extensions/moho/storage",
"crates/extensions/moho/worker",

# tests
"tests",

Expand Down Expand Up @@ -61,6 +65,8 @@ strata-ams-test-utils = { path = "crates/test-utils-btcio" }
strata-asm-common = { path = "crates/common" }
strata-asm-logs = { path = "crates/logs" }
strata-asm-manifest-types = { path = "crates/manifest-types" }
strata-asm-moho-storage = { path = "crates/extensions/moho/storage" }
strata-asm-moho-worker = { path = "crates/extensions/moho/worker" }
strata-asm-params = { path = "crates/params" }
strata-asm-proof-db = { path = "crates/proof/db" }
strata-asm-proof-impl = { path = "crates/proof/statements" }
Expand Down
4 changes: 2 additions & 2 deletions bin/asm-runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ workspace = true
[dependencies]
moho-recursive-proof.workspace = true
moho-runtime-impl.workspace = true
moho-runtime-interface.workspace = true
moho-types.workspace = true
strata-asm-rpc.workspace = true
strata-btc-verification.workspace = true

strata-asm-common.workspace = true
strata-asm-logs.workspace = true
strata-asm-moho-storage.workspace = true
strata-asm-moho-worker.workspace = true
strata-asm-params.workspace = true
strata-asm-proof-db.workspace = true
strata-asm-proof-impl.workspace = true
Expand Down
48 changes: 33 additions & 15 deletions bin/asm-runner/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ use std::sync::Arc;

use anyhow::Result;
use bitcoind_async_client::{Auth, Client};
use strata_asm_moho_storage::SledMohoStateDb;
use strata_asm_moho_worker::MohoWorkerBuilder;
use strata_asm_params::AsmParams;
use strata_asm_proof_db::{SledMohoStateDb, SledProofDb};
use strata_asm_proof_db::SledProofDb;
use strata_asm_spec::StrataAsmSpec;
use strata_asm_worker::AsmWorkerBuilder;
use strata_tasks::TaskExecutor;
Expand All @@ -16,10 +18,11 @@ use tokio::{
use crate::{
block_watcher::drive_asm_from_bitcoin,
config::{AsmRpcConfig, BitcoinConfig},
moho_context::MohoWorkerContextImpl,
prover::{InputBuilder, ProofBackend, ProofOrchestrator},
rpc_server::{AsmProofRpcDeps, run_rpc_server},
storage::create_storage,
worker_context::{AsmWorkerContext, MohoStorage},
worker_context::AsmWorkerContext,
};
pub(crate) async fn bootstrap(
config: AsmRpcConfig,
Expand All @@ -33,8 +36,7 @@ pub(crate) async fn bootstrap(
let bitcoin_client = Arc::new(connect_bitcoin(&config.bitcoin).await?);

// 3. If the orchestrator is configured, open proof storage and build the proof backend up front
// so the worker can receive the moho-state db and the asm predicate. The worker owns
// moho-state writes (including the genesis seed) — see [`MohoStorage`].
// so the Moho worker and orchestrator can receive the moho-state db and the asm predicate.
let runtime_handle = Handle::current();
let orch_prep = if let Some(orch_config) = config.orchestrator {
let sled_db = sled::open(&orch_config.proof_db_path)?;
Expand All @@ -46,14 +48,9 @@ pub(crate) async fn bootstrap(
None
};

// 4. Create the worker context, wiring moho storage when available.
let moho_storage = orch_prep.as_ref().map(|(_, _, db, backend)| MohoStorage {
db: db.clone(),
asm_predicate: backend.asm_predicate.clone(),
});
let export_entries_for_worker = orch_prep.as_ref().map(|_| export_entries_db.clone());
let genesis_height = params.anchor.block.height() as u64;

// 4. Create the ASM worker context. Moho state and the export-entries index are no longer
// materialized here; a dedicated Moho worker derives both from each ASM commit (step 7).
//
// The worker aligns the DB-side ASM manifest MMR with L1 heights during
// startup (`ManifestMmrStore::prefill_manifest_mmr`), so no prefill is
// needed here.
Expand All @@ -63,9 +60,6 @@ pub(crate) async fn bootstrap(
&config.bitcoin.retry_config,
state_db.clone(),
mmr_db.clone(),
export_entries_for_worker,
moho_storage,
genesis_height,
);

// 5. Launch ASM worker
Expand Down Expand Up @@ -100,6 +94,30 @@ pub(crate) async fn bootstrap(
moho_predicate,
} = backend;

// Spin the Moho worker off onto its own service task, driven by the ASM
// worker's per-block commit stream. It derives each block's MohoState
// (and the export-entry leaves its ExportState MMR commits to) from the
// anchor state the ASM worker committed, and persists both to the same
// stores the orchestrator and RPC read. Subscribe before the block
// watcher is spawned (step 8): the subscription has no replay, so a later
// subscriber would miss already-committed blocks. The genesis Moho state
// is seeded from the ASM genesis anchor during launch.
let moho_context = MohoWorkerContextImpl::new(
runtime_handle.clone(),
bitcoin_client.clone(),
&config.bitcoin.retry_config,
state_db.clone(),
moho_state_db.clone(),
export_entries_db.clone(),
);
let _moho_worker = MohoWorkerBuilder::new()
.with_context(moho_context)
.with_subscription(asm_worker.subscribe_blocks())
.with_genesis_block(params.anchor.block)
.with_asm_predicate(asm_predicate.clone())
.launch(&executor)
.await?;

let input_builder = InputBuilder::new(
state_db.clone(),
bitcoin_client.clone(),
Expand Down
1 change: 1 addition & 0 deletions bin/asm-runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
mod block_watcher;
mod bootstrap;
mod config;
mod moho_context;
mod prover;
mod retry;
mod rpc_server;
Expand Down
155 changes: 155 additions & 0 deletions bin/asm-runner/src/moho_context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
//! Moho worker-context implementation for the ASM runner.
//!
//! [`MohoWorkerContextImpl`] backs the three concern traits the Moho worker
//! interfaces through ([`AsmStateProvider`], [`L1ProviderContext`],
//! [`MohoStateStore`]). It reads the ASM anchor states and logs the ASM worker
//! already committed (via [`AsmStateDb`]), resolves L1 parents from the Bitcoin
//! node, and persists derived Moho states via [`SledMohoStateDb`].
//!
//! Unlike the ASM worker — which runs on its own thread and can block on Bitcoin
//! RPC directly — the Moho worker runs as an async service. The
//! [`MohoWorkerContext`](strata_asm_moho_worker::MohoWorkerContext) traits are
//! synchronous, so parent resolution bridges to the async client via
//! [`block_in_place`](task::block_in_place); see
//! [`MohoWorkerContextImpl::get_parent_block`].

use std::sync::Arc;

use asm_storage::AsmStateDb;
use bitcoin::BlockHash;
use bitcoind_async_client::{Client, error::ClientError, traits::Reader};
use moho_types::MohoState;
use strata_asm_common::{AnchorState, AsmLogEntry};
use strata_asm_moho_storage::{SledExportEntriesDb, SledMohoStateDb};
use strata_asm_moho_worker::{
AsmStateProvider, ExportEntryStore, L1ProviderContext, MohoStateStore, MohoWorkerError,
MohoWorkerResult,
};
use strata_asm_worker::AsmState;
use strata_btc_types::{BlockHashExt, L1BlockIdBitcoinExt};
use strata_identifiers::L1BlockCommitment;
use tokio::{runtime::Handle, task};

use crate::retry::{ExponentialBackoff, RetryConfig, retry_with_backoff_async};

/// Storage and L1 access the Moho worker derives per-block Moho states from.
pub(crate) struct MohoWorkerContextImpl {
runtime_handle: Handle,
bitcoin_client: Arc<Client>,
/// Backoff schedule for Bitcoin RPC calls.
rpc_backoff: ExponentialBackoff,
/// Maximum retry attempts per Bitcoin RPC call.
rpc_max_retries: u16,
/// ASM anchor states and logs the Moho state is derived from, committed by
/// the ASM worker.
state_db: Arc<AsmStateDb>,
/// Persistence for the derived per-block Moho states.
moho_state_db: SledMohoStateDb,
/// Persistence for the per-container export-entry leaves the Moho state's
/// `ExportState` MMR commits to.
export_entries_db: SledExportEntriesDb,
}

impl MohoWorkerContextImpl {
pub(crate) fn new(
runtime_handle: Handle,
bitcoin_client: Arc<Client>,
retry: &RetryConfig,
state_db: Arc<AsmStateDb>,
moho_state_db: SledMohoStateDb,
export_entries_db: SledExportEntriesDb,
) -> Self {
Self {
runtime_handle,
bitcoin_client,
rpc_backoff: retry.backoff(),
rpc_max_retries: retry.max_retries,
state_db,
moho_state_db,
export_entries_db,
}
}

/// Reads the ASM state the ASM worker committed for `blockid`, mapping a
/// miss to [`MohoWorkerError::MissingAsmState`].
fn anchor(&self, blockid: &L1BlockCommitment) -> MohoWorkerResult<AsmState> {
self.state_db
.get(blockid)
.map_err(|e| MohoWorkerError::Storage(e.to_string()))?
.ok_or(MohoWorkerError::MissingAsmState(*blockid))
}
}

impl AsmStateProvider for MohoWorkerContextImpl {
fn get_anchor_state(&self, blockid: &L1BlockCommitment) -> MohoWorkerResult<AnchorState> {
Ok(self.anchor(blockid)?.state().clone())
}

fn get_anchor_logs(&self, blockid: &L1BlockCommitment) -> MohoWorkerResult<Vec<AsmLogEntry>> {
Ok(self.anchor(blockid)?.logs().clone())
}
}

impl L1ProviderContext for MohoWorkerContextImpl {
fn get_parent_block(&self, block: &L1BlockCommitment) -> MohoWorkerResult<L1BlockCommitment> {
let block_hash: BlockHash = block.blkid().to_block_hash();
let client = &self.bitcoin_client;

// The context traits are synchronous but the Bitcoin RPC is async, and
// the Moho worker runs as an async service — a nested `Handle::block_on`
// would panic. `block_in_place` releases the current worker thread for
// the blocking call so the runtime keeps making progress; it requires
// the multi-threaded runtime the runner builds.
let header = task::block_in_place(|| {
self.runtime_handle.block_on(retry_with_backoff_async(
"btc_get_block_header",
self.rpc_max_retries,
&self.rpc_backoff,
|| async { client.get_block_header(&block_hash).await },
))
})
.map_err(|_: ClientError| MohoWorkerError::MissingParentBlock(*block))?;

let parent_id = header.prev_blockhash.to_l1_block_id();
Ok(L1BlockCommitment::new(block.height() - 1, parent_id))
}
}

impl MohoStateStore for MohoWorkerContextImpl {
fn get_latest_moho_state(&self) -> MohoWorkerResult<Option<(L1BlockCommitment, MohoState)>> {
self.moho_state_db
.get_latest()
.map_err(|e| MohoWorkerError::Storage(e.to_string()))
}

fn get_moho_state(&self, blockid: &L1BlockCommitment) -> MohoWorkerResult<MohoState> {
self.moho_state_db
.get(*blockid)
.map_err(|e| MohoWorkerError::Storage(e.to_string()))?
.ok_or(MohoWorkerError::MissingMohoState(*blockid))
}

fn store_moho_state(
&self,
blockid: &L1BlockCommitment,
state: &MohoState,
) -> MohoWorkerResult<()> {
self.moho_state_db
.store(*blockid, state.clone())
.map_err(|e| MohoWorkerError::Storage(e.to_string()))
}
}

impl ExportEntryStore for MohoWorkerContextImpl {
fn append_export_entry(
&self,
container_id: u8,
height: u32,
entry: [u8; 32],
) -> MohoWorkerResult<()> {
self.export_entries_db
.append(container_id, height, entry)
.map(|_index| ())
.map_err(|e| MohoWorkerError::Storage(e.to_string()))
}
}
Loading
Loading