Conversation
📝 WalkthroughWalkthroughThis PR replaces the samod-based document storage and synchronization layer with a new async runtime architecture using subduction and sedimentree. It introduces typed identifiers (DocId32, PeerId32), removes the document change broker, eliminates iroh transport logic, and refactors document sync to use task-key-based scheduling and per-endpoint connection tracking across the am_utils_rs and daybook_core crates. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client
participant BRepo as BigRepo
participant Runtime as BigRepoRuntime
participant Subduction as Subduction<br/>(Storage)
participant Peer as RemotePeer
Client->>BRepo: sync_doc_with_peer(doc_id, peer_id)
BRepo->>Runtime: sync_doc_with_peer(doc_id, peer_id, timeout)
Runtime->>Subduction: load_doc(doc_id)
Subduction-->>Runtime: Option<Automerge>
Runtime->>Peer: sync_with_peer(endpoint, doc_id)
Note over Peer: Subduction<br/>diff/merge
Peer-->>Runtime: SyncDocOutcome
alt Success
Runtime->>Subduction: persist blobs
Runtime->>Runtime: reconstruct post-sync doc
Runtime->>BRepo: merge updated doc into<br/>live bundle
else NotFound/Error
Runtime->>BRepo: return SyncDocOutcome
end
BRepo-->>Client: SyncDocOutcome
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 8
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
src/am_utils_rs/Cargo.toml (1)
14-19:⚠️ Potential issue | 🟠 MajorKeep the new sync stack behind the existing feature flags.
These dependencies are unconditional now, so
am_utils_rsstill pulls the full subduction/keyhive/websocket stack even when a consumer opts out ofrepo/iroh. That breaks the crate’s currentdefault-features = falsesplit and can drag native-only build requirements into lighter targets.Also applies to: 56-67
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/am_utils_rs/Cargo.toml` around lines 14 - 19, Unconditional dependencies were added for the new sync stack; instead mark tokio-tungstenite, sqlx, serde_json, and irpc as optional and wire them into the existing feature flags (e.g., repo and iroh) so they are only pulled when those features are enabled. Concretely: make each dependency optional (optional = true) and add them to the appropriate [features] entries (e.g., include "tokio-tungstenite", "sqlx", "serde_json", "irpc" under the "repo" and/or "iroh" feature lists) so the crate’s default-features = false split is preserved.src/daybook_core/sync/full.rs (2)
1489-1494:⚠️ Potential issue | 🔴 CriticalKeep a single peer's delete event from cancelling every endpoint's doc sync.
self.scheduler.docs_to_stop.insert(doc_id.clone())is still doc-scoped, butbatch_stop_docs()now clears allDocSyncTaskKeys for thatdoc_id. If another endpoint still has that document queued or active, its sync gets cancelled and never re-queued from this path, leaving the remaining requester stuck stale.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/daybook_core/sync/full.rs` around lines 1489 - 1494, The current call to self.scheduler.docs_to_stop.insert(doc_id.clone()) is doc-scoped and causes batch_stop_docs() to clear every DocSyncTaskKey for that doc_id, cancelling syncs for other endpoints; change this to insert the endpoint-scoped DocSyncTaskKey for this particular requester (e.g., construct and insert DocSyncTaskKey::new(doc_id.clone(), endpoint_id.clone()) or call the scheduler API that accepts a DocSyncTaskKey) so batch_stop_docs() only clears the specific endpoint's task rather than every task for doc_id.
422-428:⚠️ Potential issue | 🟠 MajorSurface unexpected full-sync worker failures instead of only logging them.
This is the top-level actor task spawned at lines 422-428. When
fut.awaitreturns an error, the task only logs it and completes normally, leaving failures hidden from tokio's perspective. Replace the error logging with.unwrap()to ensure unexpected errors are surfaced immediately, following the pattern:tokio::spawn(async { fut.await.unwrap() }).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/daybook_core/sync/full.rs` around lines 422 - 428, The spawned FullSyncWorker task currently swallows errors by logging them in the block that awaits fut (see join_handle and fut), so replace the error-handling block with a direct unwrap of fut's Result to surface unexpected failures to Tokio; specifically, change the async closure passed to tokio::task::spawn so it simply awaits fut and calls .unwrap() (i.e., fut.await.unwrap()) instead of matching Err and calling warn!, and keep the tracing::info_span!("FullSyncWorker task") instrumentation.
🧹 Nitpick comments (4)
AGENTS.md (1)
133-133: Consider rephrasing for better readability.The phrase "to a task completion" is grammatically awkward. Consider "to complete a task" or "until task completion" instead.
♻️ Suggested improvement
-Don't hack and boil the ocean to a task completion, you'll be asked to do it again if you do it wrong. +Don't hack and boil the ocean to complete a task, you'll be asked to do it again if you do it wrong.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@AGENTS.md` at line 133, The sentence uses the awkward phrase "to a task completion"; update the wording to improve readability by replacing "to a task completion" with a clearer alternative such as "to complete a task" or "until task completion" and adjust surrounding words to keep the original meaning and tone (locate the phrase "to a task completion" and edit that clause accordingly).src/daybook_core/rt/dispatch.rs (1)
157-159: Avoid exposing the raw SQLite pool fromDispatchRepo.This accessor exists only so
src/daybook_core/rt/switch.rscan reach into dispatch storage, which couples two repos through internals. Prefer threading the pool, or the loadedSwitchStore, through the boot path instead of adding a repo escape hatch.Based on learnings: "If a requested change requires changing interfaces, break and change the interfaces instead of trying to shim around this. Shims break abstraction boundaries and make code confusing."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/daybook_core/rt/dispatch.rs` around lines 157 - 159, The db_pool() accessor on DispatchRepo exposes internal SqlitePool and should be removed; instead, pass the SqlitePool or the already-initialized SwitchStore through your boot/initialization path into the module that needs it (the code currently in switch.rs). Remove the pub(crate) fn db_pool(&self) -> &sqlx::SqlitePool method from DispatchRepo, add a constructor or function parameter to accept sqlx::SqlitePool (or the concrete SwitchStore) where SwitchRepo/switch.rs is created/used, and update any callers in switch.rs to consume that injected dependency rather than calling DispatchRepo::db_pool; this keeps the repository abstraction intact and eliminates the escape hatch.src/daybook_core/sync/bootstrap.rs (1)
475-492: Consider removing the polling loop after direct document import.After the changes at lines 456-473, the core documents are directly imported via
import_doc. The subsequent polling loop (lines 475-492) that waits for documents to appear may now be redundant, asimport_docis synchronous and the documents should be immediately available.However, this could serve as a defensive fallback if there's any async persistence lag. If intentional, consider adding a brief comment explaining why the loop is retained.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/daybook_core/sync/bootstrap.rs` around lines 475 - 492, The polling loop that repeatedly calls big_repo.find_doc_handle and sleeps (the loop using attempts, big_repo.find_doc_handle(&bootstrap.app_doc_id)/(&bootstrap.drawer_doc_id), the debug! call, and tokio::time::sleep) is redundant because core docs are already imported synchronously via import_doc; remove this entire loop and the attempts variable so you don’t wait again after import_doc completes, and delete the associated debug/logging and sleep; if you intended to keep a defensive fallback instead, replace the loop with a concise comment adjacent to import_doc explaining why an async persistence lag might require polling and keep only an explicit, documented retry path.src/xtask/main.rs (1)
21-22: Duplicated OCI media-type constants.These constants duplicate the values defined in
daybook_core::plugs(lines 621-623). If the values need to change, they must be updated in both locations, which could lead to drift.If
xtaskintentionally avoids depending ondaybook_corefor faster builds, consider adding a comment documenting that these must stay in sync with the canonical definitions. Otherwise, importing them directly would prevent accidental divergence.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/xtask/main.rs` around lines 21 - 22, The two duplicated constants OCI_PLUG_ARTIFACT_TYPE and OCI_PLUG_MANIFEST_LAYER_MEDIA_TYPE duplicate definitions in daybook_core::plugs; either replace these local constants by importing the canonical values from daybook_core::plugs (preferred) or, if xtask must avoid the dependency, add a clear comment above the constants stating they must stay in sync with daybook_core::plugs (include the module path daybook_core::plugs and a TODO to remove duplication), so future changes don’t drift.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@AGENTS.md`:
- Line 137: Update the sentence "If it looks or inelegant what you're doing,
you're very liekly misalinged and wasting effort." in AGENTS.md by correcting
the two misspellings: change "liekly" to "likely" and "misalinged" to
"misaligned" (leave the rest of the sentence as-is or adjust surrounding grammar
if desired); locate the exact line containing that sentence to apply the
replacements.
In `@src/am_utils_rs/repo.rs`:
- Around line 357-376: load_live_bundle currently races: two callers can miss
live_bundles, both await load_automerge and create distinct LiveDocBundle Arcs.
Fix by re-checking the map after the await before inserting so only one bundle
is materialized: after creating bundle (in load_live_bundle) call
self.live_bundles.get(document_id).and_then(|e| e.value().upgrade()) and if it
returns Some(existing) drop the newly created bundle and return existing;
otherwise insert the new Arc::downgrade(&bundle) and return it. Alternatively
implement an entry-style placeholder in live_bundles so only the first waiter
creates the LiveDocBundle; reference load_live_bundle, live_bundles,
LiveDocBundle, and runtime.acquire_doc_lease when applying the change.
- Around line 618-652: The current with_document_with_origin calls
operation(&mut doc) directly on the locked live Automerge document before
persisting, risking memory/state drift if apply_commit_delta fails; change this
to run the operation on a temporary clone of the document (obtain the live heads
with doc.get_heads(), clone the Automerge instance, call operation(&mut
doc_clone), compute after_heads, patches (doc_clone.diff), and changes
(doc_clone.get_changes) from the clone), then call
self.repo.apply_commit_delta(...) with those changes, and only after
apply_commit_delta returns successfully, merge or replace the live doc under
self.bundle.doc.lock().await with the committed changes (e.g., apply the change
bytes or swap in the clone) so the in-memory state only advances when
persistence succeeds.
In `@src/am_utils_rs/repo/runtime.rs`:
- Around line 529-555: When get_blobs() returns None we reconstruct before_doc
from storage for diffing but never rehydrate the runtime; ensure that when
load_blobs_from_storage(...) yields Some(blobs) you call
subduction.add_sedimentree(sedimentree_id, blobs.into_iter()) (or the
appropriate add method) before calling subduction.sync_with_peer(...). Locate
the block that sets before_doc (uses get_blobs, load_blobs_from_storage,
reconstruct_automerge_from_blobs) and after successfully obtaining blobs from
storage invoke subduction.add_sedimentree(...) with the same SedimentreeId so
the local runtime holds the blobs prior to sync_with_peer; preserve error
handling and only skip add_sedimentree when blobs are truly absent.
In `@src/daybook_core/plugs.rs`:
- Around line 750-755: The get_plugs_heads function currently panics on
with_document failures; change its signature to return Res<ChangeHashSet> (or
the module's Result alias) and propagate the error instead of calling expect:
await the with_document call and use ? (or map_err to convert the error into the
Res error type) to return Err when the handle read fails, e.g.
self.app_am_handle.with_document(|am_doc|
ChangeHashSet(am_doc.get_heads().into())).await.map_err(...)?, and update
callers accordingly; apply the same pattern used in diff_events/events_for_init
to the other similar sites (the other occurrences noted near lines 1006-1008) so
all handle-read failures are returned rather than panicking.
In `@src/daybook_core/sync.rs`:
- Around line 587-630: register_incoming_peer currently only does a read-side
contains_key and then proceeds to mutate connection tables, which allows
concurrent setups for the same endpoint and never emits the public
IncomingConnection event; make the registration atomic by taking the write lock
on active_peers up-front (or use the same reservation path as
connect_endpoint_addr) to check-and-reserve the endpoint before performing async
setup, emit the IncomingConnection event to subscribers after a successful
registration, and ensure cleanup on failure (remove_peer_connection and clear
any reserved state) if full_sync_handle.set_connection or other async steps
fail; reference register_incoming_peer, active_peers,
full_sync_handle.set_connection, rcx.big_repo.ensure_peer_connection and
rcx.big_repo.remove_peer_connection when making these changes.
In `@src/daybook_core/sync/full/scheduler.rs`:
- Around line 542-554: endpoint_has_doc_work currently only checks active_docs,
queued_tasks, and pending_tasks for SyncTask::Doc and thus misses active_imports
and queued/pending SyncTask::Import; update endpoint_has_doc_work to also
consider self.active_imports (matching endpoint_id) and any queued_tasks or
pending_tasks entries that are SyncTask::Import for the same endpoint_id so
refresh_peer_fully_synced_state (and PeerFullSynced decisions) see import work;
look for the function endpoint_has_doc_work, the fields active_imports,
queued_tasks, pending_tasks, and the enum variant SyncTask::Import to add those
checks in the same style as the existing Doc checks.
In `@src/xtask/Cargo.toml`:
- Around line 38-39: The Cargo manifest lists mismatched crate versions: irpc =
"0.13.0" and irpc-iroh = "0.12.0" (irpc-iroh depends on irpc ^0.12.0), which
will pull incompatible irpc versions; fix by aligning them—either change irpc to
"0.12.0" to match irpc-iroh or bump irpc-iroh to "0.13.0" so both use the same
release line; update the entries for irpc and/or irpc-iroh in Cargo.toml
accordingly.
---
Outside diff comments:
In `@src/am_utils_rs/Cargo.toml`:
- Around line 14-19: Unconditional dependencies were added for the new sync
stack; instead mark tokio-tungstenite, sqlx, serde_json, and irpc as optional
and wire them into the existing feature flags (e.g., repo and iroh) so they are
only pulled when those features are enabled. Concretely: make each dependency
optional (optional = true) and add them to the appropriate [features] entries
(e.g., include "tokio-tungstenite", "sqlx", "serde_json", "irpc" under the
"repo" and/or "iroh" feature lists) so the crate’s default-features = false
split is preserved.
In `@src/daybook_core/sync/full.rs`:
- Around line 1489-1494: The current call to
self.scheduler.docs_to_stop.insert(doc_id.clone()) is doc-scoped and causes
batch_stop_docs() to clear every DocSyncTaskKey for that doc_id, cancelling
syncs for other endpoints; change this to insert the endpoint-scoped
DocSyncTaskKey for this particular requester (e.g., construct and insert
DocSyncTaskKey::new(doc_id.clone(), endpoint_id.clone()) or call the scheduler
API that accepts a DocSyncTaskKey) so batch_stop_docs() only clears the specific
endpoint's task rather than every task for doc_id.
- Around line 422-428: The spawned FullSyncWorker task currently swallows errors
by logging them in the block that awaits fut (see join_handle and fut), so
replace the error-handling block with a direct unwrap of fut's Result to surface
unexpected failures to Tokio; specifically, change the async closure passed to
tokio::task::spawn so it simply awaits fut and calls .unwrap() (i.e.,
fut.await.unwrap()) instead of matching Err and calling warn!, and keep the
tracing::info_span!("FullSyncWorker task") instrumentation.
---
Nitpick comments:
In `@AGENTS.md`:
- Line 133: The sentence uses the awkward phrase "to a task completion"; update
the wording to improve readability by replacing "to a task completion" with a
clearer alternative such as "to complete a task" or "until task completion" and
adjust surrounding words to keep the original meaning and tone (locate the
phrase "to a task completion" and edit that clause accordingly).
In `@src/daybook_core/rt/dispatch.rs`:
- Around line 157-159: The db_pool() accessor on DispatchRepo exposes internal
SqlitePool and should be removed; instead, pass the SqlitePool or the
already-initialized SwitchStore through your boot/initialization path into the
module that needs it (the code currently in switch.rs). Remove the pub(crate) fn
db_pool(&self) -> &sqlx::SqlitePool method from DispatchRepo, add a constructor
or function parameter to accept sqlx::SqlitePool (or the concrete SwitchStore)
where SwitchRepo/switch.rs is created/used, and update any callers in switch.rs
to consume that injected dependency rather than calling DispatchRepo::db_pool;
this keeps the repository abstraction intact and eliminates the escape hatch.
In `@src/daybook_core/sync/bootstrap.rs`:
- Around line 475-492: The polling loop that repeatedly calls
big_repo.find_doc_handle and sleeps (the loop using attempts,
big_repo.find_doc_handle(&bootstrap.app_doc_id)/(&bootstrap.drawer_doc_id), the
debug! call, and tokio::time::sleep) is redundant because core docs are already
imported synchronously via import_doc; remove this entire loop and the attempts
variable so you don’t wait again after import_doc completes, and delete the
associated debug/logging and sleep; if you intended to keep a defensive fallback
instead, replace the loop with a concise comment adjacent to import_doc
explaining why an async persistence lag might require polling and keep only an
explicit, documented retry path.
In `@src/xtask/main.rs`:
- Around line 21-22: The two duplicated constants OCI_PLUG_ARTIFACT_TYPE and
OCI_PLUG_MANIFEST_LAYER_MEDIA_TYPE duplicate definitions in daybook_core::plugs;
either replace these local constants by importing the canonical values from
daybook_core::plugs (preferred) or, if xtask must avoid the dependency, add a
clear comment above the constants stating they must stay in sync with
daybook_core::plugs (include the module path daybook_core::plugs and a TODO to
remove duplication), so future changes don’t drift.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: b2c53125-1b3f-4a00-8472-3ba8eba64ac0
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lockand included by**/*
📒 Files selected for processing (44)
AGENTS.mdCargo.tomlsrc/am_utils_rs/Cargo.tomlsrc/am_utils_rs/ids.rssrc/am_utils_rs/lib.rssrc/am_utils_rs/repo.rssrc/am_utils_rs/repo/changes.rssrc/am_utils_rs/repo/changes/broker.rssrc/am_utils_rs/repo/iroh.rssrc/am_utils_rs/repo/iroh/codec.rssrc/am_utils_rs/repo/partition.rssrc/am_utils_rs/repo/runtime.rssrc/am_utils_rs/sync.rssrc/am_utils_rs/sync/peer.rssrc/daybook_cli/Cargo.tomlsrc/daybook_core/Cargo.tomlsrc/daybook_core/app.rssrc/daybook_core/config.rssrc/daybook_core/drawer.rssrc/daybook_core/drawer/events.rssrc/daybook_core/drawer/meta.rssrc/daybook_core/drawer/mutations.rssrc/daybook_core/drawer/queries.rssrc/daybook_core/drawer/tests.rssrc/daybook_core/index/doc_blobs.rssrc/daybook_core/lib.rssrc/daybook_core/plugs.rssrc/daybook_core/repo.rssrc/daybook_core/repos.rssrc/daybook_core/rt/dispatch.rssrc/daybook_core/rt/init.rssrc/daybook_core/rt/switch.rssrc/daybook_core/sync.rssrc/daybook_core/sync/bootstrap.rssrc/daybook_core/sync/full.rssrc/daybook_core/sync/full/doc_worker.rssrc/daybook_core/sync/full/import_worker.rssrc/daybook_core/sync/full/scheduler.rssrc/daybook_core/tables.rssrc/daybook_core/test_support.rssrc/daybook_ffi/Cargo.tomlsrc/daybook_server/Cargo.tomlsrc/xtask/Cargo.tomlsrc/xtask/main.rs
💤 Files with no reviewable changes (7)
- src/daybook_server/Cargo.toml
- src/daybook_cli/Cargo.toml
- src/daybook_ffi/Cargo.toml
- src/am_utils_rs/sync.rs
- src/am_utils_rs/repo/changes/broker.rs
- src/am_utils_rs/repo/iroh/codec.rs
- src/am_utils_rs/repo/iroh.rs
| Better to have good alignment with the operator as opposed to spending a million iterations on the same thing. | ||
|
|
||
| Try to be intelligent about the user's intention. | ||
| If it looks or inelegant what you're doing, you're very liekly misalinged and wasting effort. |
There was a problem hiding this comment.
Fix spelling errors.
Two spelling errors on this line: "liekly" should be "likely" and "misalinged" should be "misaligned".
✏️ Proposed fix
-If it looks or inelegant what you're doing, you're very liekly misalinged and wasting effort.
+If it looks or inelegant what you're doing, you're very likely misaligned and wasting effort.📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| If it looks or inelegant what you're doing, you're very liekly misalinged and wasting effort. | |
| If it looks or inelegant what you're doing, you're very likely misaligned and wasting effort. |
🧰 Tools
🪛 LanguageTool
[grammar] ~137-~137: Ensure spelling is correct
Context: ...nelegant what you're doing, you're very liekly misalinged and wasting effort. Make sure to get mo...
(QB_NEW_EN_ORTHOGRAPHY_ERROR_IDS_1)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@AGENTS.md` at line 137, Update the sentence "If it looks or inelegant what
you're doing, you're very liekly misalinged and wasting effort." in AGENTS.md by
correcting the two misspellings: change "liekly" to "likely" and "misalinged" to
"misaligned" (leave the rest of the sentence as-is or adjust surrounding grammar
if desired); locate the exact line containing that sentence to apply the
replacements.
| async fn load_live_bundle( | ||
| self: &Arc<Self>, | ||
| document_id: &DocumentId, | ||
| ) -> Res<Option<samod::DocHandle>> { | ||
| let handle = self | ||
| .repo | ||
| .find(document_id.clone()) | ||
| .await | ||
| .map_err(|err| ferr!("failed finding doc: {err}"))?; | ||
| let Some(handle) = handle else { | ||
| ) -> Res<Option<Arc<LiveDocBundle>>> { | ||
| if let Some(existing) = self | ||
| .live_bundles | ||
| .get(document_id) | ||
| .and_then(|entry| entry.value().upgrade()) | ||
| { | ||
| return Ok(Some(existing)); | ||
| } | ||
|
|
||
| let Some(doc) = self.load_automerge(document_id).await? else { | ||
| return Ok(None); | ||
| }; | ||
| Ok(Some(handle)) | ||
| let lease = self.runtime.acquire_doc_lease(*document_id).await?; | ||
| let bundle = Arc::new(LiveDocBundle::new(*document_id, doc, lease)); | ||
| self.live_bundles | ||
| .insert(*document_id, Arc::downgrade(&bundle)); | ||
| Ok(Some(bundle)) |
There was a problem hiding this comment.
Avoid creating multiple live bundles for the same document on concurrent cold loads.
This does a cache lookup, then awaits, then inserts. Two callers can both miss live_bundles, both hydrate the same DocumentId, and both return distinct Arc<LiveDocBundle> instances. That breaks the single-mutex/live-view invariant and leaves one handle permanently stale. Re-check after the await or use an entry-style placeholder so only one bundle gets materialized.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/am_utils_rs/repo.rs` around lines 357 - 376, load_live_bundle currently
races: two callers can miss live_bundles, both await load_automerge and create
distinct LiveDocBundle Arcs. Fix by re-checking the map after the await before
inserting so only one bundle is materialized: after creating bundle (in
load_live_bundle) call self.live_bundles.get(document_id).and_then(|e|
e.value().upgrade()) and if it returns Some(existing) drop the newly created
bundle and return existing; otherwise insert the new Arc::downgrade(&bundle) and
return it. Alternatively implement an entry-style placeholder in live_bundles so
only the first waiter creates the LiveDocBundle; reference load_live_bundle,
live_bundles, LiveDocBundle, and runtime.acquire_doc_lease when applying the
change.
| pub async fn with_document_with_origin<F, R>( | ||
| &self, | ||
| operation: F, | ||
| origin: BigRepoChangeOrigin, | ||
| ) -> Res<R> | ||
| where | ||
| F: FnOnce(&mut automerge::Automerge) -> R, | ||
| { | ||
| let (before_heads, out, after_heads) = self.inner.with_document(|doc| { | ||
| let before_heads = doc.get_heads(); | ||
| let out = operation(doc); | ||
| let after_heads = doc.get_heads(); | ||
| (before_heads, out, after_heads) | ||
| }); | ||
| if before_heads != after_heads { | ||
| self.repo | ||
| .on_doc_heads_changed(self.document_id(), after_heads) | ||
| .await?; | ||
| let mut doc = self.bundle.doc.lock().await; | ||
|
|
||
| let before_heads = doc.get_heads(); | ||
| let out = operation(&mut doc); | ||
| let after_heads = doc.get_heads(); | ||
| if before_heads == after_heads { | ||
| return Ok(out); | ||
| } | ||
|
|
||
| let patches = doc.diff(&before_heads, &after_heads); | ||
| let changes = doc | ||
| .get_changes(&before_heads) | ||
| .into_iter() | ||
| .map(|change| { | ||
| let head = CommitId::new(change.hash().0); | ||
| let parents = change | ||
| .deps() | ||
| .iter() | ||
| .map(|dep| CommitId::new(dep.0)) | ||
| .collect::<BTreeSet<_>>(); | ||
| (head, parents, change.raw_bytes().to_vec()) | ||
| }) | ||
| .collect::<Vec<_>>(); | ||
|
|
||
| self.repo | ||
| .apply_commit_delta(*self.document_id(), changes, after_heads, patches, origin) | ||
| .await?; |
There was a problem hiding this comment.
Don't mutate the live Automerge state before persistence succeeds.
operation(&mut doc) runs before apply_commit_delta(...). If the runtime/storage write fails, the handle keeps the new heads in memory while persistent state stays at the old heads. From that point on, reads and later writes are operating on a document state that was never committed.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/am_utils_rs/repo.rs` around lines 618 - 652, The current
with_document_with_origin calls operation(&mut doc) directly on the locked live
Automerge document before persisting, risking memory/state drift if
apply_commit_delta fails; change this to run the operation on a temporary clone
of the document (obtain the live heads with doc.get_heads(), clone the Automerge
instance, call operation(&mut doc_clone), compute after_heads, patches
(doc_clone.diff), and changes (doc_clone.get_changes) from the clone), then call
self.repo.apply_commit_delta(...) with those changes, and only after
apply_commit_delta returns successfully, merge or replace the live doc under
self.bundle.doc.lock().await with the committed changes (e.g., apply the change
bytes or swap in the clone) so the in-memory state only advances when
persistence succeeds.
| let out = async { | ||
| let sedimentree_id: SedimentreeId = doc_id.into(); | ||
| let before_doc = match subduction | ||
| .get_blobs(sedimentree_id) | ||
| .await | ||
| .map_err(|err| ferr!("failed reading blobs from subduction: {err}"))? | ||
| { | ||
| Some(blobs) => { | ||
| Some(reconstruct_automerge_from_blobs(blobs.into_iter())?) | ||
| } | ||
| None => { | ||
| match load_blobs_from_storage(&storage_for_reads, sedimentree_id) | ||
| .await | ||
| .map_err(|err| { | ||
| ferr!("failed reading blobs from storage: {err}") | ||
| })? { | ||
| Some(blobs) => { | ||
| Some(reconstruct_automerge_from_blobs(blobs)?) | ||
| } | ||
| None => None, | ||
| } | ||
| } | ||
| }; | ||
| let remote_peer_id: subduction_core::peer::id::PeerId = peer_id.into(); | ||
| let result = subduction | ||
| .sync_with_peer(&remote_peer_id, sedimentree_id, subscribe, timeout) | ||
| .await; |
There was a problem hiding this comment.
Hydrate storage-backed docs into subduction before syncing them.
When get_blobs() misses, this path reconstructs before_doc from storage only for diffing, then calls sync_with_peer() without first add_sedimentree(...)-ing those blobs back into the runtime. Unlike LoadDoc and AcquireDocLease in this same file, cold documents are therefore synced as if the local side were empty.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/am_utils_rs/repo/runtime.rs` around lines 529 - 555, When get_blobs()
returns None we reconstruct before_doc from storage for diffing but never
rehydrate the runtime; ensure that when load_blobs_from_storage(...) yields
Some(blobs) you call subduction.add_sedimentree(sedimentree_id,
blobs.into_iter()) (or the appropriate add method) before calling
subduction.sync_with_peer(...). Locate the block that sets before_doc (uses
get_blobs, load_blobs_from_storage, reconstruct_automerge_from_blobs) and after
successfully obtaining blobs from storage invoke subduction.add_sedimentree(...)
with the same SedimentreeId so the local runtime holds the blobs prior to
sync_with_peer; preserve error handling and only skip add_sedimentree when blobs
are truly absent.
| pub async fn get_plugs_heads(&self) -> ChangeHashSet { | ||
| self.app_am_handle | ||
| .with_document(|am_doc| ChangeHashSet(am_doc.get_heads().into())) | ||
| .await | ||
| .expect("with_document read should not fail") | ||
| } |
There was a problem hiding this comment.
Return with_document failures instead of panicking.
Line 754 turns a fallible document-handle read into a process panic, even though events_for_init already returns Res<_>. diff_events now propagates the same handle failure correctly, so this path should do the same instead of crashing on repo/doc lifecycle errors.
💡 Proposed fix
- pub async fn get_plugs_heads(&self) -> ChangeHashSet {
+ pub async fn get_plugs_heads(&self) -> Res<ChangeHashSet> {
self.app_am_handle
.with_document(|am_doc| ChangeHashSet(am_doc.get_heads().into()))
- .await
- .expect("with_document read should not fail")
+ .await
+ .wrap_err("failed reading plugs heads")
}
@@
- let heads = self.get_plugs_heads().await;
+ let heads = self.get_plugs_heads().await?;Also applies to: 1006-1008
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/daybook_core/plugs.rs` around lines 750 - 755, The get_plugs_heads
function currently panics on with_document failures; change its signature to
return Res<ChangeHashSet> (or the module's Result alias) and propagate the error
instead of calling expect: await the with_document call and use ? (or map_err to
convert the error into the Res error type) to return Err when the handle read
fails, e.g. self.app_am_handle.with_document(|am_doc|
ChangeHashSet(am_doc.get_heads().into())).await.map_err(...)?, and update
callers accordingly; apply the same pattern used in diff_events/events_for_init
to the other similar sites (the other occurrences noted near lines 1006-1008) so
all handle-read failures are returned rather than panicking.
| async fn register_incoming_peer( | ||
| &self, | ||
| endpoint_id: EndpointId, | ||
| peer_key: am_utils_rs::sync::protocol::PeerKey, | ||
| conn: am_utils_rs::repo::RepoConnection, | ||
| ) -> Res<()> { | ||
| let conn_id = conn.id; | ||
| if self.active_peers.read().await.contains_key(&endpoint_id) { | ||
| return Ok(()); | ||
| } | ||
| let partition_ids = self.peer_partition_ids(&peer_key); | ||
| self.sync_store | ||
| .allow_peer(peer_key.clone(), Some(endpoint_id)) | ||
| .await?; | ||
| let endpoint_addr = iroh::EndpointAddr::new(endpoint_id); | ||
| self.rcx | ||
| .big_repo | ||
| .ensure_peer_connection( | ||
| self.router.endpoint().clone(), | ||
| endpoint_addr.clone(), | ||
| am_utils_rs::repo::PeerId::new(*endpoint_id.as_bytes()), | ||
| ) | ||
| .await?; | ||
| if let Err(err) = self | ||
| .full_sync_handle | ||
| .set_connection(endpoint_id, conn_id, peer_key, partition_ids) | ||
| .set_connection( | ||
| endpoint_id, | ||
| endpoint_addr, | ||
| endpoint_id, | ||
| peer_key.clone(), | ||
| partition_ids, | ||
| ) | ||
| .await | ||
| { | ||
| self.clear_endpoint_if_connecting(endpoint_id).await; | ||
| self.rcx | ||
| .big_repo | ||
| .remove_peer_connection(am_utils_rs::repo::PeerId::new(*endpoint_id.as_bytes())) | ||
| .await | ||
| .ok(); | ||
| return Err(err); | ||
| } | ||
|
|
||
| let old = self | ||
| .active_samod_peers | ||
| self.active_peers | ||
| .write() | ||
| .await | ||
| .insert(endpoint_id, ActivePeerState::Connected(conn)); | ||
| assert!(matches!(old, Some(ActivePeerState::Connecting)), "fishy"); | ||
| .insert(endpoint_id, ActivePeerState::Connected { peer_key }); | ||
| Ok(()) |
There was a problem hiding this comment.
Restore inbound connection notification and make registration atomic.
connect_endpoint_addr reserves state and emits OutgoingConnection, but register_incoming_peer only does a read-side contains_key check and mutates the connection tables directly. That creates two problems: concurrent clone-provision requests for the same endpoint can both run setup, and subscribers never receive the public IncomingConnection event on the successful inbound path.
💡 Proposed fix
async fn register_incoming_peer(
&self,
endpoint_id: EndpointId,
peer_key: am_utils_rs::sync::protocol::PeerKey,
) -> Res<()> {
- if self.active_peers.read().await.contains_key(&endpoint_id) {
+ if !self.reserve_endpoint_connection(endpoint_id).await {
return Ok(());
}
let partition_ids = self.peer_partition_ids(&peer_key);
self.sync_store
.allow_peer(peer_key.clone(), Some(endpoint_id))
@@
if let Err(err) = self
.full_sync_handle
.set_connection(
endpoint_id,
endpoint_addr,
@@
self.rcx
.big_repo
.remove_peer_connection(am_utils_rs::repo::PeerId::new(*endpoint_id.as_bytes()))
.await
.ok();
+ self.clear_endpoint_if_connecting(endpoint_id).await;
return Err(err);
}
- self.active_peers
- .write()
- .await
- .insert(endpoint_id, ActivePeerState::Connected { peer_key });
+ let old = self
+ .active_peers
+ .write()
+ .await
+ .insert(endpoint_id, ActivePeerState::Connected { peer_key });
+ assert!(matches!(old, Some(ActivePeerState::Connecting)), "fishy");
+ self.registry.notify([IrohSyncEvent::IncomingConnection {
+ endpoint_id,
+ conn_id: endpoint_id,
+ peer_id: endpoint_id.to_string().into(),
+ }]);
Ok(())
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/daybook_core/sync.rs` around lines 587 - 630, register_incoming_peer
currently only does a read-side contains_key and then proceeds to mutate
connection tables, which allows concurrent setups for the same endpoint and
never emits the public IncomingConnection event; make the registration atomic by
taking the write lock on active_peers up-front (or use the same reservation path
as connect_endpoint_addr) to check-and-reserve the endpoint before performing
async setup, emit the IncomingConnection event to subscribers after a successful
registration, and ensure cleanup on failure (remove_peer_connection and clear
any reserved state) if full_sync_handle.set_connection or other async steps
fail; reference register_incoming_peer, active_peers,
full_sync_handle.set_connection, rcx.big_repo.ensure_peer_connection and
rcx.big_repo.remove_peer_connection when making these changes.
| pub fn endpoint_has_doc_work(&self, endpoint_id: EndpointId) -> bool { | ||
| self.active_docs | ||
| .keys() | ||
| .any(|key| key.endpoint_id == endpoint_id) | ||
| || self | ||
| .queued_tasks | ||
| .iter() | ||
| .any(|task| matches!(task, SyncTask::Doc(key) if key.endpoint_id == endpoint_id)) | ||
| || self | ||
| .pending_tasks | ||
| .keys() | ||
| .any(|task| matches!(task, SyncTask::Doc(key) if key.endpoint_id == endpoint_id)) | ||
| } |
There was a problem hiding this comment.
Count import work here too before declaring a peer fully synced.
endpoint_has_doc_work() ignores active_imports and queued/pending SyncTask::Import, but refresh_peer_fully_synced_state() in src/daybook_core/sync/full.rs uses this helper as part of the PeerFullSynced decision. A peer with outstanding import tasks can therefore emit PeerFullSynced while those imports are still running.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/daybook_core/sync/full/scheduler.rs` around lines 542 - 554,
endpoint_has_doc_work currently only checks active_docs, queued_tasks, and
pending_tasks for SyncTask::Doc and thus misses active_imports and
queued/pending SyncTask::Import; update endpoint_has_doc_work to also consider
self.active_imports (matching endpoint_id) and any queued_tasks or pending_tasks
entries that are SyncTask::Import for the same endpoint_id so
refresh_peer_fully_synced_state (and PeerFullSynced decisions) see import work;
look for the function endpoint_has_doc_work, the fields active_imports,
queued_tasks, pending_tasks, and the enum variant SyncTask::Import to add those
checks in the same style as the existing Doc checks.
| irpc = { version = "0.13.0", features = ["derive"] } | ||
| irpc-iroh = "0.13.0" | ||
| irpc-iroh = "0.12.0" |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, check the actual content of src/xtask/Cargo.toml
cat -n src/xtask/Cargo.toml | sed -n '35,45p'Repository: dman-os/townframe
Length of output: 566
🌐 Web query:
irpc-iroh 0.12.0 crate dependencies
💡 Result:
The Rust crate irpc-iroh version 0.12.0 (released 2026-01-28) has the following dependencies (normal unless noted): Normal dependencies: - getrandom ^0.3 - tracing ^0.1.41 - tokio ^1.44 - serde ^1 - postcard ^1.1.1 - n0-future ^0.3 - n0-error ^0.1 - irpc ^0.12.0 - iroh-base ^0.96 - iroh ^0.96 Dev dependencies: - tracing-subscriber ^0.3.20 - rand ^0.9.2 - irpc-derive ^0.9.0 - hex ^0.4.3 - futures-util ^0.3 - clap ^4.5.41 - anyhow ^1
Citations:
- 1: https://crates.io/crates/irpc-iroh/dependencies
- 2: https://docs.rs/crate/irpc-iroh/latest
- 3: https://crates.io/crates/irpc-iroh
🌐 Web query:
irpc 0.13.0 irpc-iroh compatibility
💡 Result:
irpc 0.13.0 is fully compatible with irpc-iroh. The irpc-iroh crate version 0.13.0 explicitly depends on irpc ^0.13.0, as shown in its Cargo.toml on crates.io. Both crates were released on the same date, 2026-03-16, ensuring version alignment. irpc-iroh provides the Iroh transport implementation for irpc, enabling RPC over iroh connections, and is designed for use with matching irpc versions. Official documentation and blog posts confirm irpc is a lightweight RPC system for iroh, with irpc-iroh handling the iroh-specific transport (e.g., IrohProtocol, IrohRemoteConnection). No compatibility issues or breaking changes are noted in available sources for these versions.
Citations:
- 1: https://crates.io/crates/irpc-iroh
- 2: https://crates.io/crates/irpc
- 3: https://github.com/n0-computer/irpc
- 4: https://www.iroh.computer/blog/irpc
- 5: https://docs.iroh.computer/protocols/rpc
- 6: https://docs.rs/irpc-iroh
Keep irpc and irpc-iroh on the same release line.
irpc-iroh 0.12.0 declares a dependency on irpc ^0.12.0. Pinning irpc = 0.13.0 alongside irpc-iroh = 0.12.0 will cause Cargo to pull incompatible versions into xtask. Either downgrade irpc to 0.12.0 or upgrade irpc-iroh to 0.13.0.
🔧 If `xtask` is meant to follow the rest of this PR's transport downgrade
-irpc = { version = "0.13.0", features = ["derive"] }
-irpc-iroh = "0.12.0"
+irpc = { version = "0.12.0", features = ["derive"] }
+irpc-iroh = "0.12.0"📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| irpc = { version = "0.13.0", features = ["derive"] } | |
| irpc-iroh = "0.13.0" | |
| irpc-iroh = "0.12.0" | |
| irpc = { version = "0.12.0", features = ["derive"] } | |
| irpc-iroh = "0.12.0" |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/xtask/Cargo.toml` around lines 38 - 39, The Cargo manifest lists
mismatched crate versions: irpc = "0.13.0" and irpc-iroh = "0.12.0" (irpc-iroh
depends on irpc ^0.12.0), which will pull incompatible irpc versions; fix by
aligning them—either change irpc to "0.12.0" to match irpc-iroh or bump
irpc-iroh to "0.13.0" so both use the same release line; update the entries for
irpc and/or irpc-iroh in Cargo.toml accordingly.
Summary by CodeRabbit
New Features
Bug Fixes
Refactor