Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions src/api/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
/// `host`, `port` and `tls` affect all connections to all gRPC services; the
/// resulting endpoint is composed like this:
/// http{tls?'s':''}://{host}:{port}
#[derive(Clone, Debug, Default)]
#[derive(Clone, Default)]
#[cfg_attr(feature = "js", napi_derive::napi(object))]
#[cfg_attr(feature = "py", pyo3::pyclass(get_all, set_all))]
#[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))]
pub struct Config {
/// User identifier used to register, possibly your email.
pub username: String,
/// User password chosen upon registration.
pub password: String,
pub password: String, // must not leak this!
/// Address of server to connect to, default api.code.mp.
pub host: Option<String>,
/// Port to connect to, default 50053.
Expand Down Expand Up @@ -61,3 +61,25 @@ impl Config {
)
}
}

// manual impl: we want to obfuscate the password field!!
// TODO: can we just tag password to be obfuscated in debug print?
// reimplementing the whole Debug thing is pretty lame
impl std::fmt::Debug for Config {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if f.alternate() {
write!(f,
r#"""Config {{
username: {},
password: ********,
host: {:#?},
port: {:#?},
tls: {:#?}
}}"""#,
self.username, self.host, self.port, self.tls
)
} else {
write!(f, "Config {{ username: {}, password: ********, host: {:?}, port: {:?}, tls: {:?} }}", self.username, self.host, self.port, self.tls)
}
}
}
21 changes: 17 additions & 4 deletions src/buffer/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use super::controller::{BufferController, BufferControllerInner};
struct BufferWorker {
agent_id: u32,
path: String,
workspace_id: String,
latest_version: watch::Sender<diamond_types::LocalVersion>,
local_version: watch::Sender<diamond_types::LocalVersion>,
ack_rx: mpsc::UnboundedReceiver<LocalVersion>,
Expand Down Expand Up @@ -75,6 +76,7 @@ impl BufferController {
let worker = BufferWorker {
agent_id,
path: path.to_string(),
workspace_id: workspace_id.to_string(),
latest_version: latest_version_tx,
local_version: my_version_tx,
ack_rx,
Expand All @@ -95,15 +97,16 @@ impl BufferController {
BufferController(controller)
}

#[tracing::instrument(skip(worker, tx, rx), fields(ws = worker.workspace_id, path = worker.path))]
async fn work(
mut worker: BufferWorker,
tx: mpsc::Sender<Operation>,
mut rx: Streaming<BufferEvent>,
) {
tracing::debug!("controller worker started");
tracing::debug!("buffer worker started");
loop {
if worker.controller.upgrade().is_none() {
break;
break tracing::debug!("buffer worker clean exit");
};

// block until one of these is ready
Expand All @@ -114,6 +117,7 @@ impl BufferController {
res = worker.ack_rx.recv() => match res {
None => break tracing::error!("ack channel closed"),
Some(v) => {
tracing::debug!("client acked change");
worker.branch.merge(&worker.oplog, &v);
worker.local_version.send(worker.branch.local_version())
.unwrap_or_warn("could not ack local version");
Expand Down Expand Up @@ -160,11 +164,12 @@ impl BufferController {
}
}

tracing::debug!("controller worker stopped");
tracing::debug!("buffer worker stopped");
}
}

impl BufferWorker {
#[tracing::instrument(skip(self, tx))]
async fn handle_editor_change(&mut self, change: TextChange, tx: &mpsc::Sender<Operation>) {
let last_ver = self.oplog.local_version();
// clip to buffer extents
Expand Down Expand Up @@ -205,11 +210,16 @@ impl BufferWorker {
}
}

#[tracing::instrument(skip(self))]
async fn handle_server_change(&mut self, change: BufferEvent) -> bool {
match self.controller.upgrade() {
None => true, // clean exit actually, just weird we caught it here
None => { // clean exit actually, just weird we caught it here
tracing::debug!("clean exit while handling server change");
true
},
Some(controller) => match self.oplog.decode_and_add(&change.op.data) {
Ok(local_version) => {
tracing::debug!("updating local version: {local_version:?}");
self.latest_version
.send(local_version)
.unwrap_or_warn("failed to update latest version!");
Expand All @@ -229,6 +239,7 @@ impl BufferWorker {
}
}

#[tracing::instrument(skip(self, tx))]
async fn handle_delta_request(&mut self, tx: oneshot::Sender<Option<BufferUpdate>>) {
let last_ver = self.branch.local_version();
if let Some((lv, Some(dtop))) = self
Expand Down Expand Up @@ -285,9 +296,11 @@ impl BufferWorker {
},
},
};
tracing::debug!("sending update {tc:?}");
tx.send(Some(tc))
.unwrap_or_warn("could not update ops channel -- is controller dead?");
} else {
tracing::debug!("no enqueued changes");
tx.send(None)
.unwrap_or_warn("could not update ops channel -- is controller dead?");
}
Expand Down
2 changes: 2 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ struct ClientInner {

impl Client {
/// Connect to the server, authenticate and instantiate a new [`Client`].
#[tracing::instrument]
pub async fn connect(config: crate::api::Config) -> ConnectionResult<Self> {
// TODO move these two into network.rs
let channel = Endpoint::from_shared(config.endpoint())?.connect().await?;
Expand Down Expand Up @@ -157,6 +158,7 @@ impl Client {
}

/// Join and return a [`Workspace`].
#[tracing::instrument(skip(self, workspace), fields(ws = workspace.as_ref()))]
pub async fn attach_workspace(
&self,
workspace: impl AsRef<str>,
Expand Down
15 changes: 10 additions & 5 deletions src/cursor/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use codemp_proto::cursor::{CursorEvent, CursorPosition};
use super::controller::{CursorController, CursorControllerInner};

struct CursorWorker {
workspace_id: String,
op: mpsc::UnboundedReceiver<CursorPosition>,
map: Arc<dashmap::DashMap<Uuid, User>>,
stream: mpsc::Receiver<oneshot::Sender<Option<Cursor>>>,
Expand All @@ -24,6 +25,7 @@ struct CursorWorker {
}

impl CursorWorker {
#[tracing::instrument(skip(self, tx))]
fn handle_recv(&mut self, tx: oneshot::Sender<Option<Cursor>>) {
tx.send(
self.store.pop_front().and_then(|event| {
Expand Down Expand Up @@ -71,6 +73,7 @@ impl CursorController {
let weak = Arc::downgrade(&controller);

let worker = CursorWorker {
workspace_id: workspace_id.to_string(),
op: op_rx,
map: user_map,
stream: stream_rx,
Expand All @@ -86,16 +89,17 @@ impl CursorController {
CursorController(controller)
}

#[tracing::instrument(skip(worker, tx, rx), fields(ws = worker.workspace_id))]
async fn work(
mut worker: CursorWorker,
tx: mpsc::Sender<CursorPosition>,
mut rx: Streaming<CursorEvent>,
) {
tracing::debug!("starting cursor worker");
loop {
tracing::debug!("cursor worker polling");
if worker.controller.upgrade().is_none() {
break;
}; // clean exit: all controllers dropped
break tracing::debug!("cursor worker clean exit");
};
tokio::select! {
biased;

Expand All @@ -110,7 +114,7 @@ impl CursorController {

// server sents us a cursor
Ok(Some(cur)) = rx.message() => match worker.controller.upgrade() {
None => break, // clean exit, just weird that we got it here
None => break tracing::debug!("cursor worker clean (late) exit"), // clean exit, just weird that we got it here
Some(controller) => {
tracing::debug!("received cursor from server");
worker.store.push_back(cur);
Expand All @@ -127,8 +131,9 @@ impl CursorController {
// client wants to get next cursor event
Some(tx) = worker.stream.recv() => worker.handle_recv(tx),

else => break,
else => break tracing::debug!("cursor worker clean-ish exit"),
}
}
tracing::debug!("stopping cursor worker");
}
}
20 changes: 13 additions & 7 deletions src/workspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ impl AsyncReceiver<Event> for Workspace {
}

impl Workspace {
#[tracing::instrument(skip(name, user, token, claims), fields(ws = name))]
pub(crate) async fn connect(
name: String,
user: Arc<User>,
Expand Down Expand Up @@ -165,6 +166,7 @@ impl Workspace {
}

/// Attach to a buffer and return a handle to it.
#[tracing::instrument(skip(self))]
pub async fn attach_buffer(&self, path: &str) -> ConnectionResult<buffer::Controller> {
let mut worskspace_client = self.0.services.ws();
let request = tonic::Request::new(BufferNode {
Expand Down Expand Up @@ -326,7 +328,7 @@ impl Workspace {
.0
.filetree
.iter()
.filter(|f| filter.map_or(true, |flt| f.starts_with(flt)))
.filter(|f| filter.is_none_or(|flt| f.starts_with(flt)))
.map(|f| f.clone())
.collect::<Vec<String>>();
tree.sort();
Expand All @@ -342,7 +344,8 @@ struct WorkspaceWorker {
}

impl WorkspaceWorker {
pub(crate) async fn work(mut self, name: String, mut stream: Streaming<WorkspaceEvent>, weak: Weak<WorkspaceInner>) {
#[tracing::instrument(skip(self, stream, weak))]
pub(crate) async fn work(mut self, ws: String, mut stream: Streaming<WorkspaceEvent>, weak: Weak<WorkspaceInner>) {
tracing::debug!("workspace worker starting");
loop {
tokio::select! {
Expand All @@ -352,13 +355,16 @@ impl WorkspaceWorker {
},

res = stream.message() => match res {
Err(e) => break tracing::error!("workspace '{}' stream closed: {}", name, e),
Ok(None) => break tracing::info!("leaving workspace {}", name),
Err(e) => break tracing::error!("workspace '{ws}' stream closed: {e}"),
Ok(None) => break tracing::info!("leaving workspace {ws}"),
Ok(Some(WorkspaceEvent { event: None })) => {
tracing::warn!("workspace {} received empty event", name)
tracing::warn!("workspace {ws} received empty event")
}
Ok(Some(WorkspaceEvent { event: Some(ev) })) => {
let Some(inner) = weak.upgrade() else { break };
let Some(inner) = weak.upgrade() else {
break tracing::debug!("workspace worker clean exit");
};
tracing::debug!("received workspace event: {ev:?}");
let update = crate::api::Event::from(&ev);
match ev {
// user
Expand Down Expand Up @@ -391,7 +397,7 @@ impl WorkspaceWorker {
if let Some(ws) = weak.upgrade() {
cb.call(Workspace(ws));
} else {
break tracing::debug!("workspace worker clean exit");
break tracing::debug!("workspace worker clean (late) exit");
}
}
}
Expand Down