diff --git a/samod-core/src/network/wire_protocol.rs b/samod-core/src/network/wire_protocol.rs index b414413..82ee579 100644 --- a/samod-core/src/network/wire_protocol.rs +++ b/samod-core/src/network/wire_protocol.rs @@ -140,9 +140,9 @@ impl WireMessage { "count" | "timestamp" => { fields.insert(key, FieldValue::Uint(decode_u64_or_f64(&mut decoder)?)); } - "metadata" => { + "peerMetadata" | "metadata" => { let metadata = decode_metadata(&mut decoder)?; - fields.insert(key, FieldValue::Metadata(metadata)); + fields.insert("peerMetadata".to_string(), FieldValue::Metadata(metadata)); } "newHeads" => { let new_heads = decode_new_heads(&mut decoder)?; @@ -170,7 +170,7 @@ impl WireMessage { ))? .clone(); let metadata = fields - .get("metadata") + .get("peerMetadata") .and_then(|v| v.as_metadata()) .cloned(); @@ -191,7 +191,7 @@ impl WireMessage { ))? .clone(); let metadata = fields - .get("metadata") + .get("peerMetadata") .and_then(|v| v.as_metadata()) .cloned(); @@ -362,7 +362,7 @@ impl WireMessage { encoder.str(version)?; } if let Some(metadata) = metadata { - encoder.str("metadata")?; + encoder.str("peerMetadata")?; encode_metadata(&mut encoder, metadata) .map_err(|e| EncodeError::Minicbor(format!("{e:?}")))?; } @@ -382,7 +382,7 @@ impl WireMessage { .str(selected_protocol_version)?; encoder.str("targetId")?.str(&target_id.to_string())?; if let Some(metadata) = metadata { - encoder.str("metadata")?; + encoder.str("peerMetadata")?; encode_metadata(&mut encoder, metadata) .map_err(|e| EncodeError::Minicbor(format!("{e:?}")))?; } @@ -598,8 +598,13 @@ fn decode_metadata(decoder: &mut minicbor::Decoder) -> Result { - let storage_id_str = decoder.str()?; - storage_id = Some(StorageId::from(storage_id_str)); + // cbor-x may encode undefined values (e.g. when JS peer has no storage) + if decoder.datatype()? == minicbor::data::Type::Undefined { + decoder.undefined()?; + } else { + let storage_id_str = decoder.str()?; + storage_id = Some(StorageId::from(storage_id_str)); + } } "isEphemeral" => { is_ephemeral = decoder.bool()?; diff --git a/samod/interop-test-server/server.ts b/samod/interop-test-server/server.ts index 82ca214..26d9e64 100644 --- a/samod/interop-test-server/server.ts +++ b/samod/interop-test-server/server.ts @@ -42,6 +42,11 @@ class Server { res.send(`👍 @automerge/automerge-repo-sync-server is running`); }); + app.get("/storage-keys", (req, res) => { + const keys = this.#storage.keys(); + res.json(keys); + }); + this.#server = app.listen(PORT, () => { console.log(`Listening on port ${this.#server.address().port}`); }); @@ -95,6 +100,10 @@ class InMemoryStorageAdapter implements StorageAdapterInterface { return Promise.resolve(); } + keys(): string[][] { + return Array.from(this.#data.keys()); + } + log() { console.log(`InMemoryStorageAdapter has ${this.#data.size} items:`); for (const [key, value] of this.#data.entries()) { diff --git a/samod/src/lib.rs b/samod/src/lib.rs index 824f774..78e10a6 100644 --- a/samod/src/lib.rs +++ b/samod/src/lib.rs @@ -981,7 +981,10 @@ impl Inner { } ConnectionOwner::Listener(listener_id) => { if let Some(ah) = self.acceptor_handles.get(&listener_id) { - ah.notify_client_connected(samod_peer_info.clone(), connection_id); + ah.notify_client_connected( + samod_peer_info.clone(), + connection_id, + ); } conn_handle.notify_client_connected(samod_peer_info); } @@ -1017,7 +1020,9 @@ impl Inner { ConnFinishedReason::ErrorReceiving(error.clone()), ); if let Some(conn_handle) = self.connections.get(&connection_id) { - conn_handle.notify_client_disconnected(ConnFinishedReason::ErrorReceiving(error.clone())); + conn_handle.notify_client_disconnected( + ConnFinishedReason::ErrorReceiving(error.clone()), + ); }; } } diff --git a/samod/src/transport.rs b/samod/src/transport.rs index 18e1c57..ffccb65 100644 --- a/samod/src/transport.rs +++ b/samod/src/transport.rs @@ -61,7 +61,8 @@ impl Transport { // The default Tokio frame size is 8mb, this increases it to 8gb. Documents above that size won't sync. let codec = LengthDelimitedCodec::builder() - .max_frame_length(8 * 1024 * 1024 * 1024).new_codec(); + .max_frame_length(8 * 1024 * 1024 * 1024) + .new_codec(); let framed = Framed::new(io, codec); let (msg_sink, msg_stream) = framed.split(); diff --git a/samod/src/websocket.rs b/samod/src/websocket.rs index 4d394ab..dcdb993 100644 --- a/samod/src/websocket.rs +++ b/samod/src/websocket.rs @@ -266,7 +266,10 @@ impl crate::AcceptorHandle { /// /// * `socket` - An axum WebSocket (both `Sink` and `Stream`). #[cfg(feature = "axum")] - pub fn accept_axum(&self, socket: axum::extract::ws::WebSocket) -> Result { + pub fn accept_axum( + &self, + socket: axum::extract::ws::WebSocket, + ) -> Result { let ws = socket .map_err(|e| NetworkError(format!("error receiving websocket message: {}", e))) .sink_map_err(|e| NetworkError(format!("error sending websocket message: {}", e))); diff --git a/samod/tests/js_interop/js_wrapper.rs b/samod/tests/js_interop/js_wrapper.rs index 9357577..25f9967 100644 --- a/samod/tests/js_interop/js_wrapper.rs +++ b/samod/tests/js_interop/js_wrapper.rs @@ -4,6 +4,16 @@ use futures::{Stream, StreamExt}; use samod::DocumentId; use tokio::{io::AsyncBufReadExt, process::Command, sync::OnceCell}; +impl RunningJsServer { + /// Query the JS server's storage keys via its /storage-keys HTTP endpoint. + pub(super) async fn storage_keys(&self) -> eyre::Result>> { + let url = format!("http://localhost:{}/storage-keys", self.port); + let resp = reqwest::get(&url).await?; + let keys: Vec> = resp.json().await?; + Ok(keys) + } +} + const INTEROP_SERVER_PATH: &str = "interop-test-server"; static JS_DEPS_INITIALIZED: OnceCell> = OnceCell::const_new(); diff --git a/samod/tests/js_interop/main.rs b/samod/tests/js_interop/main.rs index 87655a9..807a07d 100644 --- a/samod/tests/js_interop/main.rs +++ b/samod/tests/js_interop/main.rs @@ -187,6 +187,49 @@ async fn js_client_sending_remote_heads_changed_does_not_break_rust_server() { assert_eq!(heads, fetched_heads); } +/// Test that the JS server saves sync state for a non-ephemeral samod peer. +/// +/// When samod connects with `isEphemeral: false` and a `storageId`, the JS +/// automerge-repo should persist sync state keyed by that storage ID. If this +/// doesn't happen, reconnecting peers will have to re-sync from scratch, +/// resulting in unnecessarily large initial sync messages. +#[tokio::test] +async fn js_server_saves_sync_state_for_non_ephemeral_samod_peer() { + init_logging(); + let js = JsWrapper::create().await.unwrap(); + let js_server = js.start_server().await.unwrap(); + let port = js_server.port; + + let repo = samod_connected_to_js_server(port, Some("repo1".to_string())).await; + + let doc_handle = repo.create(Automerge::new()).await.unwrap(); + doc_handle + .with_document(|doc| { + doc.transact(|tx| { + tx.put(automerge::ROOT, "key", "value")?; + Ok::<_, automerge::AutomergeError>(()) + }) + }) + .unwrap(); + + // Wait for sync to complete and sync state to be persisted + tokio::time::sleep(Duration::from_millis(2000)).await; + + let keys = js_server.storage_keys().await.unwrap(); + println!("JS server storage keys: {:?}", keys); + + // The JS server should have saved sync state for the samod peer. + // Sync state keys have the form [documentId, "sync-state", storageId]. + let has_sync_state = keys + .iter() + .any(|key| key.len() >= 2 && key[1] == "sync-state"); + assert!( + has_sync_state, + "JS server should have saved sync state for the non-ephemeral samod peer, but storage keys were: {:?}", + keys + ); +} + async fn samod_connected_to_js_server(port: u16, peer_id: Option) -> Repo { let mut builder = Repo::build_tokio(); if let Some(peer_id) = peer_id {