Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions samod-core/src/network/wire_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ impl WireMessage {
"count" | "timestamp" => {
fields.insert(key, FieldValue::Uint(decode_u64_or_f64(&mut decoder)?));
}
"metadata" => {
"peerMetadata" | "metadata" => {
let metadata = decode_metadata(&mut decoder)?;
fields.insert(key, FieldValue::Metadata(metadata));
fields.insert("peerMetadata".to_string(), FieldValue::Metadata(metadata));
}
"newHeads" => {
let new_heads = decode_new_heads(&mut decoder)?;
Expand Down Expand Up @@ -170,7 +170,7 @@ impl WireMessage {
))?
.clone();
let metadata = fields
.get("metadata")
.get("peerMetadata")
.and_then(|v| v.as_metadata())
.cloned();

Expand All @@ -191,7 +191,7 @@ impl WireMessage {
))?
.clone();
let metadata = fields
.get("metadata")
.get("peerMetadata")
.and_then(|v| v.as_metadata())
.cloned();

Expand Down Expand Up @@ -362,7 +362,7 @@ impl WireMessage {
encoder.str(version)?;
}
if let Some(metadata) = metadata {
encoder.str("metadata")?;
encoder.str("peerMetadata")?;
encode_metadata(&mut encoder, metadata)
.map_err(|e| EncodeError::Minicbor(format!("{e:?}")))?;
}
Expand All @@ -382,7 +382,7 @@ impl WireMessage {
.str(selected_protocol_version)?;
encoder.str("targetId")?.str(&target_id.to_string())?;
if let Some(metadata) = metadata {
encoder.str("metadata")?;
encoder.str("peerMetadata")?;
encode_metadata(&mut encoder, metadata)
.map_err(|e| EncodeError::Minicbor(format!("{e:?}")))?;
}
Expand Down Expand Up @@ -598,8 +598,13 @@ fn decode_metadata(decoder: &mut minicbor::Decoder) -> Result<PeerMetadata, Deco
for _ in 0..len {
match decoder.str()? {
"storageId" => {
let storage_id_str = decoder.str()?;
storage_id = Some(StorageId::from(storage_id_str));
// cbor-x may encode undefined values (e.g. when JS peer has no storage)
if decoder.datatype()? == minicbor::data::Type::Undefined {
decoder.undefined()?;
} else {
let storage_id_str = decoder.str()?;
storage_id = Some(StorageId::from(storage_id_str));
}
}
"isEphemeral" => {
is_ephemeral = decoder.bool()?;
Expand Down
9 changes: 9 additions & 0 deletions samod/interop-test-server/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ class Server {
res.send(`👍 @automerge/automerge-repo-sync-server is running`);
});

app.get("/storage-keys", (req, res) => {
const keys = this.#storage.keys();
res.json(keys);
});

this.#server = app.listen(PORT, () => {
console.log(`Listening on port ${this.#server.address().port}`);
});
Expand Down Expand Up @@ -95,6 +100,10 @@ class InMemoryStorageAdapter implements StorageAdapterInterface {
return Promise.resolve();
}

keys(): string[][] {
return Array.from(this.#data.keys());
}

log() {
console.log(`InMemoryStorageAdapter has ${this.#data.size} items:`);
for (const [key, value] of this.#data.entries()) {
Expand Down
9 changes: 7 additions & 2 deletions samod/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,10 @@ impl Inner {
}
ConnectionOwner::Listener(listener_id) => {
if let Some(ah) = self.acceptor_handles.get(&listener_id) {
ah.notify_client_connected(samod_peer_info.clone(), connection_id);
ah.notify_client_connected(
samod_peer_info.clone(),
connection_id,
);
}
conn_handle.notify_client_connected(samod_peer_info);
}
Expand Down Expand Up @@ -1017,7 +1020,9 @@ impl Inner {
ConnFinishedReason::ErrorReceiving(error.clone()),
);
if let Some(conn_handle) = self.connections.get(&connection_id) {
conn_handle.notify_client_disconnected(ConnFinishedReason::ErrorReceiving(error.clone()));
conn_handle.notify_client_disconnected(
ConnFinishedReason::ErrorReceiving(error.clone()),
);
};
}
}
Expand Down
3 changes: 2 additions & 1 deletion samod/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ impl Transport {

// The default Tokio frame size is 8mb, this increases it to 8gb. Documents above that size won't sync.
let codec = LengthDelimitedCodec::builder()
.max_frame_length(8 * 1024 * 1024 * 1024).new_codec();
.max_frame_length(8 * 1024 * 1024 * 1024)
.new_codec();

let framed = Framed::new(io, codec);
let (msg_sink, msg_stream) = framed.split();
Expand Down
5 changes: 4 additions & 1 deletion samod/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,10 @@ impl crate::AcceptorHandle {
///
/// * `socket` - An axum WebSocket (both `Sink` and `Stream`).
#[cfg(feature = "axum")]
pub fn accept_axum(&self, socket: axum::extract::ws::WebSocket) -> Result<ConnectionHandle, crate::Stopped> {
pub fn accept_axum(
&self,
socket: axum::extract::ws::WebSocket,
) -> Result<ConnectionHandle, crate::Stopped> {
let ws = socket
.map_err(|e| NetworkError(format!("error receiving websocket message: {}", e)))
.sink_map_err(|e| NetworkError(format!("error sending websocket message: {}", e)));
Expand Down
10 changes: 10 additions & 0 deletions samod/tests/js_interop/js_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@ use futures::{Stream, StreamExt};
use samod::DocumentId;
use tokio::{io::AsyncBufReadExt, process::Command, sync::OnceCell};

impl RunningJsServer {
/// Query the JS server's storage keys via its /storage-keys HTTP endpoint.
pub(super) async fn storage_keys(&self) -> eyre::Result<Vec<Vec<String>>> {
let url = format!("http://localhost:{}/storage-keys", self.port);
let resp = reqwest::get(&url).await?;
let keys: Vec<Vec<String>> = resp.json().await?;
Ok(keys)
}
}

const INTEROP_SERVER_PATH: &str = "interop-test-server";

static JS_DEPS_INITIALIZED: OnceCell<Result<(), String>> = OnceCell::const_new();
Expand Down
43 changes: 43 additions & 0 deletions samod/tests/js_interop/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,49 @@ async fn js_client_sending_remote_heads_changed_does_not_break_rust_server() {
assert_eq!(heads, fetched_heads);
}

/// Test that the JS server saves sync state for a non-ephemeral samod peer.
///
/// When samod connects with `isEphemeral: false` and a `storageId`, the JS
/// automerge-repo should persist sync state keyed by that storage ID. If this
/// doesn't happen, reconnecting peers will have to re-sync from scratch,
/// resulting in unnecessarily large initial sync messages.
#[tokio::test]
async fn js_server_saves_sync_state_for_non_ephemeral_samod_peer() {
init_logging();
let js = JsWrapper::create().await.unwrap();
let js_server = js.start_server().await.unwrap();
let port = js_server.port;

let repo = samod_connected_to_js_server(port, Some("repo1".to_string())).await;

let doc_handle = repo.create(Automerge::new()).await.unwrap();
doc_handle
.with_document(|doc| {
doc.transact(|tx| {
tx.put(automerge::ROOT, "key", "value")?;
Ok::<_, automerge::AutomergeError>(())
})
})
.unwrap();

// Wait for sync to complete and sync state to be persisted
tokio::time::sleep(Duration::from_millis(2000)).await;

let keys = js_server.storage_keys().await.unwrap();
println!("JS server storage keys: {:?}", keys);

// The JS server should have saved sync state for the samod peer.
// Sync state keys have the form [documentId, "sync-state", storageId].
let has_sync_state = keys
.iter()
.any(|key| key.len() >= 2 && key[1] == "sync-state");
assert!(
has_sync_state,
"JS server should have saved sync state for the non-ephemeral samod peer, but storage keys were: {:?}",
keys
);
}

async fn samod_connected_to_js_server(port: u16, peer_id: Option<String>) -> Repo {
let mut builder = Repo::build_tokio();
if let Some(peer_id) = peer_id {
Expand Down