From 0a78525e5b4546779116ef7d9099a180eead68c7 Mon Sep 17 00:00:00 2001 From: Alex Good Date: Tue, 10 Mar 2026 13:30:00 +0000 Subject: [PATCH 1/2] Encode peerMetadata under the correct key in the join message Problem: the automerge-repo JS implementation saves sync states of remote peers in local storage in order to speed up initial sync with previously connected peers. The sync state is saved if the initial join message from a peer contains a `peerMetadata` field with `isEphemeral: false` and a storage ID. We currently encode the peer metadata in the join message under the wrong key, which causes the JS implementation to ignore it and not save the sync state. Solution: encode the peer metadata under the correct key in the join message --- samod-core/src/network/wire_protocol.rs | 21 +++++++----- samod/interop-test-server/server.ts | 9 ++++++ samod/tests/js_interop/js_wrapper.rs | 10 ++++++ samod/tests/js_interop/main.rs | 43 +++++++++++++++++++++++++ 4 files changed, 75 insertions(+), 8 deletions(-) diff --git a/samod-core/src/network/wire_protocol.rs b/samod-core/src/network/wire_protocol.rs index b414413..82ee579 100644 --- a/samod-core/src/network/wire_protocol.rs +++ b/samod-core/src/network/wire_protocol.rs @@ -140,9 +140,9 @@ impl WireMessage { "count" | "timestamp" => { fields.insert(key, FieldValue::Uint(decode_u64_or_f64(&mut decoder)?)); } - "metadata" => { + "peerMetadata" | "metadata" => { let metadata = decode_metadata(&mut decoder)?; - fields.insert(key, FieldValue::Metadata(metadata)); + fields.insert("peerMetadata".to_string(), FieldValue::Metadata(metadata)); } "newHeads" => { let new_heads = decode_new_heads(&mut decoder)?; @@ -170,7 +170,7 @@ impl WireMessage { ))? .clone(); let metadata = fields - .get("metadata") + .get("peerMetadata") .and_then(|v| v.as_metadata()) .cloned(); @@ -191,7 +191,7 @@ impl WireMessage { ))? .clone(); let metadata = fields - .get("metadata") + .get("peerMetadata") .and_then(|v| v.as_metadata()) .cloned(); @@ -362,7 +362,7 @@ impl WireMessage { encoder.str(version)?; } if let Some(metadata) = metadata { - encoder.str("metadata")?; + encoder.str("peerMetadata")?; encode_metadata(&mut encoder, metadata) .map_err(|e| EncodeError::Minicbor(format!("{e:?}")))?; } @@ -382,7 +382,7 @@ impl WireMessage { .str(selected_protocol_version)?; encoder.str("targetId")?.str(&target_id.to_string())?; if let Some(metadata) = metadata { - encoder.str("metadata")?; + encoder.str("peerMetadata")?; encode_metadata(&mut encoder, metadata) .map_err(|e| EncodeError::Minicbor(format!("{e:?}")))?; } @@ -598,8 +598,13 @@ fn decode_metadata(decoder: &mut minicbor::Decoder) -> Result { - let storage_id_str = decoder.str()?; - storage_id = Some(StorageId::from(storage_id_str)); + // cbor-x may encode undefined values (e.g. when JS peer has no storage) + if decoder.datatype()? == minicbor::data::Type::Undefined { + decoder.undefined()?; + } else { + let storage_id_str = decoder.str()?; + storage_id = Some(StorageId::from(storage_id_str)); + } } "isEphemeral" => { is_ephemeral = decoder.bool()?; diff --git a/samod/interop-test-server/server.ts b/samod/interop-test-server/server.ts index 82ca214..26d9e64 100644 --- a/samod/interop-test-server/server.ts +++ b/samod/interop-test-server/server.ts @@ -42,6 +42,11 @@ class Server { res.send(`👍 @automerge/automerge-repo-sync-server is running`); }); + app.get("/storage-keys", (req, res) => { + const keys = this.#storage.keys(); + res.json(keys); + }); + this.#server = app.listen(PORT, () => { console.log(`Listening on port ${this.#server.address().port}`); }); @@ -95,6 +100,10 @@ class InMemoryStorageAdapter implements StorageAdapterInterface { return Promise.resolve(); } + keys(): string[][] { + return Array.from(this.#data.keys()); + } + log() { console.log(`InMemoryStorageAdapter has ${this.#data.size} items:`); for (const [key, value] of this.#data.entries()) { diff --git a/samod/tests/js_interop/js_wrapper.rs b/samod/tests/js_interop/js_wrapper.rs index 9357577..25f9967 100644 --- a/samod/tests/js_interop/js_wrapper.rs +++ b/samod/tests/js_interop/js_wrapper.rs @@ -4,6 +4,16 @@ use futures::{Stream, StreamExt}; use samod::DocumentId; use tokio::{io::AsyncBufReadExt, process::Command, sync::OnceCell}; +impl RunningJsServer { + /// Query the JS server's storage keys via its /storage-keys HTTP endpoint. + pub(super) async fn storage_keys(&self) -> eyre::Result>> { + let url = format!("http://localhost:{}/storage-keys", self.port); + let resp = reqwest::get(&url).await?; + let keys: Vec> = resp.json().await?; + Ok(keys) + } +} + const INTEROP_SERVER_PATH: &str = "interop-test-server"; static JS_DEPS_INITIALIZED: OnceCell> = OnceCell::const_new(); diff --git a/samod/tests/js_interop/main.rs b/samod/tests/js_interop/main.rs index 87655a9..807a07d 100644 --- a/samod/tests/js_interop/main.rs +++ b/samod/tests/js_interop/main.rs @@ -187,6 +187,49 @@ async fn js_client_sending_remote_heads_changed_does_not_break_rust_server() { assert_eq!(heads, fetched_heads); } +/// Test that the JS server saves sync state for a non-ephemeral samod peer. +/// +/// When samod connects with `isEphemeral: false` and a `storageId`, the JS +/// automerge-repo should persist sync state keyed by that storage ID. If this +/// doesn't happen, reconnecting peers will have to re-sync from scratch, +/// resulting in unnecessarily large initial sync messages. +#[tokio::test] +async fn js_server_saves_sync_state_for_non_ephemeral_samod_peer() { + init_logging(); + let js = JsWrapper::create().await.unwrap(); + let js_server = js.start_server().await.unwrap(); + let port = js_server.port; + + let repo = samod_connected_to_js_server(port, Some("repo1".to_string())).await; + + let doc_handle = repo.create(Automerge::new()).await.unwrap(); + doc_handle + .with_document(|doc| { + doc.transact(|tx| { + tx.put(automerge::ROOT, "key", "value")?; + Ok::<_, automerge::AutomergeError>(()) + }) + }) + .unwrap(); + + // Wait for sync to complete and sync state to be persisted + tokio::time::sleep(Duration::from_millis(2000)).await; + + let keys = js_server.storage_keys().await.unwrap(); + println!("JS server storage keys: {:?}", keys); + + // The JS server should have saved sync state for the samod peer. + // Sync state keys have the form [documentId, "sync-state", storageId]. + let has_sync_state = keys + .iter() + .any(|key| key.len() >= 2 && key[1] == "sync-state"); + assert!( + has_sync_state, + "JS server should have saved sync state for the non-ephemeral samod peer, but storage keys were: {:?}", + keys + ); +} + async fn samod_connected_to_js_server(port: u16, peer_id: Option) -> Repo { let mut builder = Repo::build_tokio(); if let Some(peer_id) = peer_id { From addd77862d1a64f7784010c0d11af48f66f20b99 Mon Sep 17 00:00:00 2001 From: Alex Good Date: Tue, 10 Mar 2026 13:34:59 +0000 Subject: [PATCH 2/2] formatting --- samod/src/lib.rs | 9 +++++++-- samod/src/transport.rs | 3 ++- samod/src/websocket.rs | 5 ++++- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/samod/src/lib.rs b/samod/src/lib.rs index 824f774..78e10a6 100644 --- a/samod/src/lib.rs +++ b/samod/src/lib.rs @@ -981,7 +981,10 @@ impl Inner { } ConnectionOwner::Listener(listener_id) => { if let Some(ah) = self.acceptor_handles.get(&listener_id) { - ah.notify_client_connected(samod_peer_info.clone(), connection_id); + ah.notify_client_connected( + samod_peer_info.clone(), + connection_id, + ); } conn_handle.notify_client_connected(samod_peer_info); } @@ -1017,7 +1020,9 @@ impl Inner { ConnFinishedReason::ErrorReceiving(error.clone()), ); if let Some(conn_handle) = self.connections.get(&connection_id) { - conn_handle.notify_client_disconnected(ConnFinishedReason::ErrorReceiving(error.clone())); + conn_handle.notify_client_disconnected( + ConnFinishedReason::ErrorReceiving(error.clone()), + ); }; } } diff --git a/samod/src/transport.rs b/samod/src/transport.rs index 18e1c57..ffccb65 100644 --- a/samod/src/transport.rs +++ b/samod/src/transport.rs @@ -61,7 +61,8 @@ impl Transport { // The default Tokio frame size is 8mb, this increases it to 8gb. Documents above that size won't sync. let codec = LengthDelimitedCodec::builder() - .max_frame_length(8 * 1024 * 1024 * 1024).new_codec(); + .max_frame_length(8 * 1024 * 1024 * 1024) + .new_codec(); let framed = Framed::new(io, codec); let (msg_sink, msg_stream) = framed.split(); diff --git a/samod/src/websocket.rs b/samod/src/websocket.rs index 4d394ab..dcdb993 100644 --- a/samod/src/websocket.rs +++ b/samod/src/websocket.rs @@ -266,7 +266,10 @@ impl crate::AcceptorHandle { /// /// * `socket` - An axum WebSocket (both `Sink` and `Stream`). #[cfg(feature = "axum")] - pub fn accept_axum(&self, socket: axum::extract::ws::WebSocket) -> Result { + pub fn accept_axum( + &self, + socket: axum::extract::ws::WebSocket, + ) -> Result { let ws = socket .map_err(|e| NetworkError(format!("error receiving websocket message: {}", e))) .sink_map_err(|e| NetworkError(format!("error sending websocket message: {}", e)));