Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 45 additions & 25 deletions bin/asm-runner/src/block_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
//! real-time block notification with `bury_depth=0` (no reorg tracking, no
//! tx monitoring). Written to avoid a painful dependency on `strata-bridge`.

use std::{sync::Arc, time::Duration};
use std::{ops::RangeInclusive, sync::Arc, time::Duration};

use anyhow::{Context, Result, bail};
use anyhow::{Context, Result};
use bitcoin::Block;
use bitcoincore_zmq::{Message, SocketMessage, subscribe_async_wait_handshake};
use bitcoind_async_client::{Client, traits::Reader};
Expand Down Expand Up @@ -54,6 +54,19 @@ pub(crate) async fn drive_asm_from_bitcoin(
let mut stream = stream;
let mut cursor = start_height;

// ZMQ only delivers blocks mined after we subscribe, so any heights between
// `cursor` and the current tip — typically blocks mined while we were down —
// would otherwise wait forever for a fresh ZMQ event that may never come.
// Poll the tip via RPC and fill the gap before entering the wait loop.
let tip_height = bitcoin_client
.get_block_count()
.await
.context("failed to query bitcoind tip for startup catchup")?;
if tip_height >= cursor {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't > sufficient? Was the duplication at the boundary (tip_height == cursor) deliberate?

backfill_range(&bitcoin_client, &asm_worker, &proof_tx, cursor..=tip_height).await?;
cursor = tip_height + 1;
Comment on lines +65 to +67
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Honor shutdown during startup catch-up

When tip_height >= cursor, the watcher runs backfill_range(...) before entering the tokio::select! loop that listens for shutdown.wait_for_shutdown(). If the node is far behind (large restart gap), this RPC/backfill loop can run for a long time and the process will ignore shutdown requests until it finishes, which can make service stop/restart operations hang under backlog conditions. Please make startup catch-up cancellable (e.g., check shutdown between heights or interleave catch-up with the shutdown wait).

Useful? React with 👍 / 👎.

}

loop {
let msg = tokio::select! {
_ = shutdown.wait_for_shutdown() => {
Expand Down Expand Up @@ -94,30 +107,17 @@ pub(crate) async fn drive_asm_from_bitcoin(
continue;
}

// Backfill any skipped heights [cursor, received_height). This covers
// the common case of starting after a downtime, or rare ZMQ drops.
// Backfill any skipped heights [cursor, received_height - 1]. Covers
// rare ZMQ drops between events; the startup-tip catchup above handles
// the restart case.
if received_height > cursor {
info!(
from = %cursor,
to = %received_height,
"backfilling skipped blocks"
);
for height in cursor..received_height {
match fetch_block_at_height(&bitcoin_client, height).await {
Ok(fetched) => {
if let Err(err) = submit_block(&asm_worker, &proof_tx, fetched).await {
error!(%height, ?err, "failed to submit backfill block");
// Stop backfilling on failure so we don't hand the
// worker a gap. The next ZMQ event will retry.
bail!("backfill interrupted at height {height}: {err}");
}
}
Err(err) => {
error!(%height, ?err, "failed to fetch backfill block");
bail!("backfill fetch failed at height {height}: {err}");
}
}
}
backfill_range(
&bitcoin_client,
&asm_worker,
&proof_tx,
cursor..=received_height - 1,
)
Comment on lines +114 to +119
Copy link
Copy Markdown

@Rajil1213 Rajil1213 May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This duplication hints at a deeper issue. Right now, the block watcher muddies the boundary between the bitcoin block fetcher and block submitter. A cleaner way to do this is to separate the two. If you look at the btc_tracker crate in strata-bridge, you'll find a loop that just ingests blocks from bitcoind, then there is a separate block that pushes blocks to subscribers. The backfilling logic belongs on the former (that is responsible for receiving blocks from bitcoind), the latter just takes those blocks and pushes them out.

This also means that you don't need a brittle functional test. You can just add a proptest that asserts that block production is contiguous.

Downstream consumers (in this case the asm worker) can just depend on the fact that provided a start height, it will always receive blocks in the order of block heights without any discontinuity.

.await?;
}

if let Err(err) = submit_block(&asm_worker, &proof_tx, block).await {
Expand All @@ -127,6 +127,26 @@ pub(crate) async fn drive_asm_from_bitcoin(
}
}

/// Fetch and submit every block in `range` via RPC, bailing on the first
/// failure so we never hand the worker a gap.
async fn backfill_range(
client: &Client,
asm_worker: &AsmWorkerHandle,
proof_tx: &Option<mpsc::UnboundedSender<ProofId>>,
range: RangeInclusive<u64>,
) -> Result<()> {
info!(from = %range.start(), to = %range.end(), "backfilling skipped blocks");
for height in range {
let block = fetch_block_at_height(client, height)
.await
.with_context(|| format!("backfill fetch failed at height {height}"))?;
submit_block(asm_worker, proof_tx, block)
.await
.with_context(|| format!("backfill submit failed at height {height}"))?;
}
Comment on lines +139 to +146
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this just depends on the block height, will there be issues if there is a fork around the current tip?

Ok(())
}

/// Fetch a single block by height via the bitcoind RPC client.
async fn fetch_block_at_height(client: &Client, height: u64) -> Result<Block> {
let hash = client
Expand Down
26 changes: 0 additions & 26 deletions crates/storage/src/export_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,32 +242,6 @@ mod tests {
assert_eq!(store.get(1, idx1).unwrap().unwrap(), (11, hash(0xa1)));
}

#[test]
fn persistence_across_reopen() {
let dir = tempfile::tempdir().unwrap();

{
let db = sled::open(dir.path()).unwrap();
let store = ExportEntriesDb::open(&db).unwrap();
store.append(2, 5, hash(0x42)).unwrap();
store.append(2, 6, hash(0x43)).unwrap();
// Drop tree handles before the db so its file lock is released
// synchronously (sled 0.34 can otherwise race on reopen on Linux).
drop(store);
db.flush().unwrap();
drop(db);
}

{
let db = sled::open(dir.path()).unwrap();
let store = ExportEntriesDb::open(&db).unwrap();
assert_eq!(store.num_entries(2).unwrap(), 2);
assert_eq!(store.get(2, 0).unwrap().unwrap(), (5, hash(0x42)));
assert_eq!(store.get(2, 1).unwrap().unwrap(), (6, hash(0x43)));
assert_eq!(store.find_index(2, &hash(0x43)).unwrap(), Some((1, 6)));
}
}

fn rebuild_compact_mmr(store: &ExportEntriesDb, container_id: u8, size: u64) -> Mmr64B32 {
let mut compact = Mmr64B32::new_empty();
for i in 0..size {
Expand Down
25 changes: 0 additions & 25 deletions crates/storage/src/mmr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,29 +245,4 @@ mod tests {
let proof = mmr_db.generate_proof(2, 4).unwrap();
assert!(compact_at_4.verify(&proof, &make_leaf(2).0));
}

#[test]
fn persistence_across_reopen() {
let dir = tempfile::tempdir().unwrap();

{
let db = sled::open(dir.path()).unwrap();
let mmr = MmrDb::open(&db).unwrap();
mmr.append_leaf(make_leaf(0x42)).unwrap();
mmr.append_leaf(make_leaf(0x43)).unwrap();
// Drop tree handles before the db so its file lock is released
// synchronously (sled 0.34 can otherwise race on reopen on Linux).
drop(mmr);
db.flush().unwrap();
drop(db);
}

{
let db = sled::open(dir.path()).unwrap();
let mmr = MmrDb::open(&db).unwrap();
assert_eq!(mmr.leaf_count().unwrap(), 2);
assert_eq!(mmr.get_leaf(0).unwrap().unwrap(), make_leaf(0x42));
assert_eq!(mmr.get_leaf(1).unwrap().unwrap(), make_leaf(0x43));
}
}
}
1 change: 1 addition & 0 deletions functional-tests/factory/asm_rpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def create_asm_rpc_service(
"rpc_port": rpc_port,
"rpc_url": f"http://127.0.0.1:{rpc_port}",
"db_path": db_path,
"log_path": logfile,
}

rpc_url = f"http://127.0.0.1:{rpc_port}"
Expand Down
2 changes: 1 addition & 1 deletion functional-tests/run_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ case "$ASM_PROVER_BACKEND" in
esac

pushd .. > /dev/null
cargo build --bin strata-asm-runner "${CARGO_ARGS[@]}"
cargo build --bin strata-asm-runner ${CARGO_ARGS[@]+"${CARGO_ARGS[@]}"}
if [[ "$ASM_PROVER_BACKEND" == "sp1" ]]; then
# Produces guest-builder/sp1/elfs/{asm,moho}.elf, which the runner reads at startup.
cargo build -p strata-asm-sp1-guest-builder --release
Expand Down
107 changes: 107 additions & 0 deletions functional-tests/tests/fn_asm_restart_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import logging
import os

import flexitest

from utils.utils import (
wait_until_asm_reaches_height,
wait_until_asm_ready,
wait_until_bitcoind_ready,
)

# Emitted by the worker only on first-bootstrap, gated on
# `!ctx.has_l1_manifest(pivot_block.blkid())` — so its absence in a post-restart
# log slice is direct evidence that the runner resumed from persisted state
# rather than replaying from genesis. See crates/worker/src/service.rs.
GENESIS_BOOTSTRAP_MARKER = "Created genesis manifest"


@flexitest.register
class AsmRestartTest(flexitest.Test):
"""End-to-end coverage of the runner's restart path.

Persistence belongs at the binary level — the worker reloads from sled,
resumes from the last persisted block, and reconnects to bitcoind. Unit
tests in `asm-storage` only exercise sled's own durability, which sled
already covers.

A naive "state at height H matches" assertion would also hold for a fresh
runner that just replayed the same chain from genesis. To distinguish
resume from replay, we read the worker log: the genesis-bootstrap line
must not appear after the restart.
"""

def __init__(self, ctx: flexitest.InitContext):
ctx.set_env("basic")

def main(self, ctx: flexitest.RunContext):
bitcoind_service = ctx.get_service("bitcoin")
asm_service = ctx.get_service("asm_rpc")
log_path = asm_service.props["log_path"]

bitcoin_rpc = bitcoind_service.create_rpc()
asm_rpc = asm_service.create_rpc()

wait_until_bitcoind_ready(bitcoin_rpc, timeout=30)
wait_until_asm_ready(asm_rpc)

# Drive ASM to a known height before restarting.
initial_btc_height = bitcoin_rpc.proxy.getblockcount()
wallet_addr = bitcoin_rpc.proxy.getnewaddress()
pre_blocks = 3
bitcoin_rpc.proxy.generatetoaddress(pre_blocks, wallet_addr)
pre_restart_height = initial_btc_height + pre_blocks
wait_until_asm_reaches_height(asm_rpc, min_height=pre_restart_height)

# Snapshot a processed block we expect to survive the restart.
snapshot_height = initial_btc_height + 1
snapshot_hash = bitcoin_rpc.proxy.getblockhash(snapshot_height)
pre_state = asm_rpc.strata_asm_getAsmState(snapshot_hash)
assert pre_state is not None, (
f"strata_asm_getAsmState returned None at height {snapshot_height} pre-restart"
)

# Mark where the post-restart slice of the log file begins. The runner
# appends to this file across stop/start, so a byte offset captured now
# cleanly partitions pre- vs post-restart output.
log_offset = os.path.getsize(log_path)

logging.info("stopping ASM runner at height %s", pre_restart_height)
asm_service.stop()

# Mine while the runner is down so it has to catch up on restart —
# exercises the watcher's gap-fill path, not just steady state.
catchup_blocks = 2
bitcoin_rpc.proxy.generatetoaddress(catchup_blocks, wallet_addr)
post_restart_target = pre_restart_height + catchup_blocks

logging.info("restarting ASM runner")
asm_service.start()
asm_rpc = asm_service.create_rpc()
wait_until_asm_ready(asm_rpc)
wait_until_asm_reaches_height(asm_rpc, min_height=post_restart_target)
logging.info("ASM caught up past restart to height %s", post_restart_target)

# Resume vs replay: the genesis-bootstrap line only fires when the
# worker can't find an existing genesis manifest. If the post-restart
# log slice contains it, the runner threw away persisted state and
# rebuilt from scratch — exactly the failure mode the test is for.
with open(log_path, "rb") as f:
f.seek(log_offset)
post_log = f.read().decode("utf-8", errors="replace")
assert GENESIS_BOOTSTRAP_MARKER not in post_log, (
f"runner re-emitted {GENESIS_BOOTSTRAP_MARKER!r} after restart — "
"it restarted from genesis instead of resuming from persisted state"
)
Comment on lines +56 to +95
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might also want to check out the log matcher utility in strata-bridge which wraps the above logic in wait_until (not necessary here because you're only checking the logs after the ASM catches up).

This is of course very brittle.


# Sanity: state for a pre-restart block is still queryable and
# identical post-restart. Weaker than the log check on its own (a
# fresh replay would produce the same payload on the same chain), but
# catches durability regressions where the data is gone entirely.
post_state = asm_rpc.strata_asm_getAsmState(snapshot_hash)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it also make sense to query for an intermediate height (that is one of the blocks that got backfilled)? I'm thinking not because the ASM cannot progress without block continuity.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, ASM cannot progress without block continuity.

assert post_state == pre_state, (
f"AsmState at height {snapshot_height} changed across restart: "
f"pre={pre_state!r} post={post_state!r}"
)

return True
Loading