Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@

## Unreleased

### Fixed

* A bug where locally unavailable documents sent by peers with an announce
policy set to false would be marked as unavailable

### Added

- `TcpDialer::new` which takes a `Url` parameter, rather than a host and a port
* `TcpDialer::new` which takes a `Url` parameter, rather than a host and a port
or a socket address.
- `Repo::dial_tcp()` to simplify construction of `TcpDialer`.
- Allow documents syncing over the TCP transport to be up to 8gb size instead
* `Repo::dial_tcp()` to simplify construction of `TcpDialer`.
* Allow documents syncing over the TCP transport to be up to 8gb size instead
of Tokio's default 8mb frame size
- Exposed receiving `ConnectionHandle`s via `accept()`. Users can now subscribe
* Exposed receiving `ConnectionHandle`s via `accept()`. Users can now subscribe
to an `events()` stream directly on the handle, or `await` for
`handshake_completed()`.

Expand Down
3 changes: 1 addition & 2 deletions samod-core/src/actors/document.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ mod load;
mod on_disk_state;
pub use on_disk_state::CompactionHash;
mod peer_doc_connection;
mod ready;
mod request;
mod phase;
mod spawn_args;
mod with_doc_result;
pub use with_doc_result::WithDocResult;
Expand Down
62 changes: 57 additions & 5 deletions samod-core/src/actors/document/doc_actor_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use std::collections::HashMap;
use automerge::ChangeHash;

use crate::{
ConnectionId, DocumentChanged, PeerId, StorageKey,
ConnectionId, DocumentChanged, DocumentId, PeerId, StorageKey,
actors::{
DocToHubMsg,
document::{SyncMessageStat, io::DocumentIoTask},
messages::DocToHubMsgPayload,
document::{DocumentStatus, SyncMessageStat, io::DocumentIoTask},
messages::{Broadcast, DocToHubMsgPayload, SyncMessage},
},
io::{IoTask, IoTaskId, StorageTask},
network::PeerDocState,
Expand Down Expand Up @@ -58,8 +58,60 @@ impl DocActorResult {
}

/// Send a message back to the hub
pub(crate) fn send_message(&mut self, message: DocToHubMsgPayload) {
self.outgoing_messages.push(DocToHubMsg(message));
pub(crate) fn send_sync_message(
&mut self,
conn_id: ConnectionId,
doc_id: DocumentId,
message: SyncMessage,
) {
self.outgoing_messages
.push(DocToHubMsg(DocToHubMsgPayload::SendSyncMessage {
connection_id: conn_id,
document_id: doc_id,
message,
}));
}

pub(crate) fn send_broadcast(&mut self, connections: Vec<ConnectionId>, msg: Broadcast) {
self.outgoing_messages
.push(DocToHubMsg(DocToHubMsgPayload::Broadcast {
connections,
msg,
}));
}

pub(crate) fn send_terminated(&mut self) {
self.outgoing_messages
.push(DocToHubMsg(DocToHubMsgPayload::Terminated));
}

pub(crate) fn send_peer_states_changes(
&mut self,
new_states: HashMap<ConnectionId, PeerDocState>,
) {
// Remove previous peer state change messages as they are redundant
self.outgoing_messages
.retain(|m| !matches!(m.0, DocToHubMsgPayload::PeerStatesChanged { .. }));
self.outgoing_messages
.push(DocToHubMsg(DocToHubMsgPayload::PeerStatesChanged {
new_states,
}));
}

pub(crate) fn send_doc_status_update(&mut self, new_status: DocumentStatus) {
// remove any existing doc status update so that if the document status changes
// multiple times during a turn, only the latest status is sent to the hub.
// This is especially important to avoid bouncing through a NotFound state
// when loading a document as that will cause any outstanding find commands
// to fail even if the document loads successfully in this turn (as it might
// if we finish loading after receiving a sync message with the document
// content).
self.outgoing_messages
.retain(|m| !matches!(m.0, DocToHubMsgPayload::DocumentStatusChanged { .. }));
self.outgoing_messages
.push(DocToHubMsg(DocToHubMsgPayload::DocumentStatusChanged {
new_status,
}));
}

pub(crate) fn put(&mut self, key: StorageKey, value: Vec<u8>) -> IoTaskId {
Expand Down
117 changes: 37 additions & 80 deletions samod-core/src/actors/document/doc_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,21 @@ use automerge::Automerge;
use crate::{
ConnectionId, DocumentId, StorageKey, UnixTimestamp,
actors::{
document::{DocActorResult, SyncDirection, SyncMessageStat},
messages::{Broadcast, DocMessage, DocToHubMsgPayload, SyncMessage},
document::{
DocActorResult, SyncDirection, SyncMessageStat,
phase::{
loading::Loading,
ready::Ready,
request::{Request, RequestState},
},
},
messages::{Broadcast, DocMessage, SyncMessage},
},
};

use super::{
DocumentStatus,
peer_doc_connection::{AnnouncePolicy, PeerDocConnection},
ready::Ready,
request::{Request, RequestState},
};

#[derive(Debug)]
Expand All @@ -32,9 +37,7 @@ pub(super) struct DocState {

#[derive(Debug)]
pub enum Phase {
Loading {
pending_sync_messages: HashMap<ConnectionId, Vec<SyncMessage>>,
},
Loading(Loading),
Requesting(Request),
Ready(Ready),
NotFound,
Expand All @@ -56,9 +59,7 @@ impl DocState {
any_dialer_connecting: bool,
) -> Self {
Self {
phase: Phase::Loading {
pending_sync_messages: HashMap::new(),
},
phase: Phase::Loading(Loading::new()),
document_id,
doc,
any_dialer_connecting,
Expand All @@ -79,43 +80,33 @@ impl DocState {
PhaseTransition::None => {}
PhaseTransition::ToReady => {
tracing::trace!("transitioning to ready");
out.send_message(DocToHubMsgPayload::DocumentStatusChanged {
new_status: DocumentStatus::Ready,
});
out.send_doc_status_update(DocumentStatus::Ready);
out.emit_doc_changed(self.doc.get_heads());
self.phase = Phase::Ready(Ready::new());
}
PhaseTransition::ToNotFound => {
tracing::trace!("transitioning to NotFound");
out.send_message(DocToHubMsgPayload::DocumentStatusChanged {
new_status: DocumentStatus::NotFound,
});
out.send_doc_status_update(DocumentStatus::NotFound);
if let Phase::Requesting(request) = &self.phase {
for peer in request.peers_waiting_for_us_to_respond() {
out.send_message(DocToHubMsgPayload::SendSyncMessage {
connection_id: peer,
document_id: self.document_id.clone(),
message: SyncMessage::DocUnavailable,
});
out.send_sync_message(
peer,
self.document_id.clone(),
SyncMessage::DocUnavailable,
);
}
}
self.phase = Phase::NotFound;
}
PhaseTransition::ToRequesting(request) => {
tracing::trace!("transitioning to requesting");
out.send_message(DocToHubMsgPayload::DocumentStatusChanged {
new_status: DocumentStatus::Requesting,
});
out.send_doc_status_update(DocumentStatus::Requesting);
self.phase = Phase::Requesting(request);
}
PhaseTransition::ToLoading => {
tracing::trace!("transitioning to loading");
out.send_message(DocToHubMsgPayload::DocumentStatusChanged {
new_status: DocumentStatus::Loading,
});
self.phase = Phase::Loading {
pending_sync_messages: HashMap::new(),
};
out.send_doc_status_update(DocumentStatus::Loading);
self.phase = Phase::Loading(Loading::new());
}
}
}
Expand Down Expand Up @@ -159,8 +150,9 @@ impl DocState {
}
// self.save_state
// .add_on_disk(snapshots.into_keys().chain(incrementals.into_keys()));
if matches!(self.phase, Phase::Loading { .. }) {
if self.doc.get_heads().is_empty() {
if let Phase::Loading(loading) = &mut self.phase {
let pending_sync_messages = loading.take_pending_sync_messages();
let phase_transition = if self.doc.get_heads().is_empty() {
let eligible_conns = peer_connections
.values()
.any(|p| p.announce_policy() != AnnouncePolicy::DontAnnounce);
Expand All @@ -170,47 +162,23 @@ impl DocState {
self.any_dialer_connecting,
"no data found on disk, requesting document"
);
let mut next_phase = Phase::Requesting(Request::new(
PhaseTransition::ToRequesting(Request::new(
self.document_id.clone(),
peer_connections.values(),
));
std::mem::swap(&mut self.phase, &mut next_phase);
let Phase::Loading {
pending_sync_messages,
} = next_phase
else {
unreachable!("we already checked");
};
for (conn_id, msgs) in pending_sync_messages {
for msg in msgs {
self.handle_sync_message(now, out, conn_id, peer_connections, msg, now);
}
}
out.send_message(DocToHubMsgPayload::DocumentStatusChanged {
new_status: DocumentStatus::Requesting,
});
))
} else {
tracing::debug!(
"no data found on disk and no connections available, transitioning to NotFound"
);
self.handle_phase_transition(out, PhaseTransition::ToNotFound);
PhaseTransition::ToNotFound
}
return;
}
} else {
tracing::trace!("load complete, transitioning to ready");
PhaseTransition::ToReady
};

tracing::trace!("load complete, transitioning to ready");
self.handle_phase_transition(out, phase_transition);

let mut next_phase = Phase::Ready(Ready::new());
std::mem::swap(&mut self.phase, &mut next_phase);
let Phase::Loading {
pending_sync_messages,
} = next_phase
else {
unreachable!("we already checked");
};
out.send_message(DocToHubMsgPayload::DocumentStatusChanged {
new_status: DocumentStatus::Ready,
});
for (conn_id, msgs) in pending_sync_messages {
for msg in msgs {
self.handle_sync_message(now, out, conn_id, peer_connections, msg, now);
Expand Down Expand Up @@ -254,10 +222,7 @@ impl DocState {
}
})
.collect();
out.send_message(DocToHubMsgPayload::Broadcast {
connections: targets,
msg: Broadcast::Gossip { msg },
});
out.send_broadcast(targets, Broadcast::Gossip { msg });
}
DocMessage::Sync(msg) => self.handle_sync_message(
now,
Expand Down Expand Up @@ -298,13 +263,8 @@ impl DocState {
};

let (transition, duration) = match &mut self.phase {
Phase::Loading {
pending_sync_messages,
} => {
pending_sync_messages
.entry(connection_id)
.or_default()
.push(msg);
Phase::Loading(loading) => {
loading.receive_sync_message(connection_id, msg);
(PhaseTransition::None, None)
}
Phase::Requesting(request) => {
Expand Down Expand Up @@ -380,11 +340,8 @@ impl DocState {
) -> HashMap<ConnectionId, Vec<SyncMessage>> {
let mut result: HashMap<ConnectionId, Vec<SyncMessage>> = HashMap::new();
for (conn_id, peer_conn) in peer_connections {
if let Phase::Loading {
pending_sync_messages,
} = &self.phase
{
out.pending_sync_messages = pending_sync_messages.values().map(|v| v.len()).sum();
if let Phase::Loading(loading) = &self.phase {
out.pending_sync_messages = loading.pending_msg_count();
continue;
}

Expand Down
14 changes: 4 additions & 10 deletions samod-core/src/actors/document/document_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,7 @@ impl DocumentActor {

let state = if let Some(doc) = initial_content {
// Let the hub know this document is ready immediately if we already have content
out.send_message(DocToHubMsgPayload::DocumentStatusChanged {
new_status: DocumentStatus::Ready,
});
out.send_doc_status_update(DocumentStatus::Ready);
DocState::new_ready(document_id.clone(), doc, any_dialer_pending)
} else {
DocState::new_loading(document_id.clone(), Automerge::new(), any_dialer_pending)
Expand Down Expand Up @@ -400,7 +398,7 @@ impl DocumentActor {
if self.run_state == RunState::Stopping {
if self.on_disk_state.is_flushed() {
self.run_state = RunState::Stopped;
out.send_message(DocToHubMsgPayload::Terminated);
out.send_terminated();
out.stopped = true;
}
return;
Expand Down Expand Up @@ -458,11 +456,7 @@ impl DocumentActor {
.generate_sync_messages(now, out, &mut self.peer_connections)
{
for msg in msgs {
out.send_message(DocToHubMsgPayload::SendSyncMessage {
connection_id: conn_id,
document_id: doc_id.clone(),
message: msg,
});
out.send_sync_message(conn_id, doc_id.clone(), msg);
}
}
}
Expand All @@ -475,7 +469,7 @@ impl DocumentActor {
.collect::<HashMap<_, _>>();
if !states.is_empty() {
out.peer_state_changes = states.clone();
out.send_message(DocToHubMsgPayload::PeerStatesChanged { new_states: states })
out.send_peer_states_changes(states)
}
}

Expand Down
3 changes: 3 additions & 0 deletions samod-core/src/actors/document/phase.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub(crate) mod loading;
pub(crate) mod ready;
pub(crate) mod request;
Loading
Loading