Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env-template
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ SIGN_KEY =
#BUCKET_ENDPOINT=
#BUCKET_NAME=
#BUCKET_ROOT=
REDIS_URL=
44 changes: 44 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions apps/keck/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ tokio = { version = "=1.28.0", features = [
"rt-multi-thread",
"signal",
] }
async-trait = "0.1"
utoipa = { version = "3.5.0", features = ["axum_extras"], optional = true }
utoipa-swagger-ui = { version = "3.1.5", optional = true }
libc = "0.2.147"
reqwest = { version = "0.11.19", default-features = false, features = [
"json",
"rustls-tls",
] }
redis = { version = "0.23", features = ["tokio-comp"] }

# ======= workspace dependencies =======
anyhow = { workspace = true }
Expand All @@ -58,6 +60,7 @@ rand = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
thiserror = { workspace = true }
jwst-codec = { workspace = true }

jwst-core = { workspace = true, features = ["large_refs"] }
jwst-logger = { workspace = true }
Expand Down
30 changes: 30 additions & 0 deletions apps/keck/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Keck

This service provides collaborative Yjs document sync with optional Redis Pub/Sub
to coordinate multiple instances.

## Multi-node synchronization

1. Start a Redis server.
2. For each `keck` instance set the `REDIS_URL` environment variable pointing to
the Redis server, e.g.
```bash
REDIS_URL=redis://127.0.0.1:6379 cargo run -p keck
```
3. Run multiple `keck` processes. Clients can connect to any instance using
`ws://<host>:<port>/collaboration/{roomid}` and will share the same Yjs data.

## Environment variables

- `DATABASE_URL` – database for persistence.
- `REDIS_URL` – enables Redis Pub/Sub for cross-node updates. When unset the
instance behaves as a single node.

## Tests

A Redis server is required to run the multi-node integration test:

```bash
redis-server --daemonize yes
cargo test -p keck --test-threads=1 -- --ignored
```
161 changes: 158 additions & 3 deletions apps/keck/src/server/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ mod blobs;
mod blocks;
mod doc;

use std::collections::HashMap;
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
sync::Arc,
};

use async_trait::async_trait;
use axum::Router;
#[cfg(feature = "api")]
use axum::{
Expand All @@ -15,9 +19,15 @@ use axum::{
routing::{delete, get, head, post},
};
use doc::doc_apis;
use jwst_rpc::{BroadcastChannels, RpcContextImpl};
use futures::StreamExt;
use jwst_codec::encode_update_as_message;
use jwst_rpc::{broadcast::subscribe, decode_update_with_guid, BroadcastChannels, BroadcastType, RpcContextImpl};
use jwst_storage::{BlobStorageType, JwstStorage, JwstStorageResult};
use tokio::sync::RwLock;
use redis::AsyncCommands;
use tokio::sync::{
broadcast::{channel as broadcast, Sender as BroadcastSender},
Mutex, RwLock,
};

use super::*;

Expand All @@ -44,6 +54,9 @@ pub struct Context {
channel: BroadcastChannels,
storage: JwstStorage,
webhook: Arc<std::sync::RwLock<String>>,
redis: Option<redis::Client>,
ignore_updates: Arc<Mutex<HashSet<Vec<u8>>>>,
redis_workspaces: RwLock<HashSet<String>>,
}

impl Context {
Expand Down Expand Up @@ -71,6 +84,11 @@ impl Context {
webhook: Arc::new(std::sync::RwLock::new(
dotenvy::var("HOOK_ENDPOINT").unwrap_or_default(),
)),
redis: dotenvy::var("REDIS_URL")
.ok()
.and_then(|url| redis::Client::open(url).ok()),
ignore_updates: Arc::new(Mutex::new(HashSet::new())),
redis_workspaces: RwLock::new(HashSet::new()),
}
}

Expand Down Expand Up @@ -120,6 +138,112 @@ impl Context {
*write_guard = endpoint;
}

async fn spawn_redis(&self, id: String, broadcast_tx: BroadcastSender<BroadcastType>, workspace: Workspace) {
{
let mut guard = self.redis_workspaces.write().await;
if !guard.insert(id.clone()) {
return;
}
}

let Some(client) = &self.redis else {
return;
};
let client_pub = client.clone();
let client_sub = client.clone();
let channel = format!("keck:{id}");
let ignore = self.ignore_updates.clone();
let tx_pub = broadcast_tx.clone();
let channel_pub = channel.clone();

// publisher task
tokio::spawn(async move {
let mut conn = match client_pub.get_async_connection().await {
Ok(c) => c,
Err(e) => {
error!("redis publisher connect error: {:?}", e);
return;
}
};
let mut rx = tx_pub.subscribe();
while let Ok(msg) = rx.recv().await {
match msg {
BroadcastType::BroadcastRawContent(update) => {
if ignore.lock().await.remove(&update) {
continue;
}
let mut payload = Vec::with_capacity(1 + update.len());
payload.push(0);
payload.extend_from_slice(&update);
let _: Result<(), _> = conn.publish(channel_pub.clone(), payload).await;
}
BroadcastType::BroadcastAwareness(data) => {
let mut payload = Vec::with_capacity(1 + data.len());
payload.push(1);
payload.extend_from_slice(&data);
let _: Result<(), _> = conn.publish(channel_pub.clone(), payload).await;
}
_ => {}
}
}
});

// subscriber task
let ignore = self.ignore_updates.clone();
let channel_sub = channel;
let tx_sub = broadcast_tx.clone();
tokio::spawn(async move {
let mut conn = match client_sub.get_async_connection().await {
Ok(c) => c,
Err(e) => {
error!("redis subscriber connect error: {:?}", e);
return;
}
};
let mut pubsub = conn.into_pubsub();
if pubsub.subscribe(channel_sub.clone()).await.is_err() {
error!("redis subscribe failed");
return;
}
let mut stream = pubsub.on_message();
while let Some(msg) = stream.next().await {
let payload: Vec<u8> = match msg.get_payload() {
Ok(p) => p,
Err(e) => {
error!("redis payload error: {:?}", e);
continue;
}
};
if payload.is_empty() {
continue;
}
match payload[0] {
0 => {
let data = payload[1..].to_vec();
ignore.lock().await.insert(data.clone());
if let Ok((_, update)) = decode_update_with_guid(&data) {
let mut ws = workspace.clone();
let update_vec = update.to_vec();
let _ = tokio::task::spawn_blocking(move || {
ws.sync_messages(vec![update_vec]);
})
.await;
if let Ok(msg) = encode_update_as_message(update.to_vec()) {
let _ = tx_sub.send(BroadcastType::BroadcastContent(msg));
}
let _ = tx_sub.send(BroadcastType::BroadcastRawContent(data));
}
}
1 => {
let data = payload[1..].to_vec();
let _ = tx_sub.send(BroadcastType::BroadcastAwareness(data));
}
_ => {}
}
}
});
}

pub async fn get_workspace<S>(&self, workspace_id: S) -> JwstStorageResult<Workspace>
where
S: AsRef<str>,
Expand Down Expand Up @@ -155,6 +279,7 @@ impl Context {
}
}

#[async_trait]
impl RpcContextImpl<'_> for Context {
fn get_storage(&self) -> &JwstStorage {
&self.storage
Expand All @@ -163,6 +288,36 @@ impl RpcContextImpl<'_> for Context {
fn get_channel(&self) -> &BroadcastChannels {
&self.channel
}

async fn join_broadcast(
&self,
workspace: &mut Workspace,
identifier: String,
last_synced: tokio::sync::mpsc::Sender<i64>,
) -> BroadcastSender<BroadcastType> {
let id = workspace.id();
let broadcast_tx = {
let mut channels = self.channel.write().await;
match channels.entry(id.clone()) {
Entry::Occupied(tx) => tx.get().clone(),
Entry::Vacant(v) => {
let (tx, _) = broadcast(10240);
v.insert(tx.clone());
tx.clone()
}
}
};

subscribe(workspace, identifier.clone(), broadcast_tx.clone()).await;
self.save_update(&id, identifier, broadcast_tx.subscribe(), last_synced)
.await;

if self.redis.is_some() {
self.spawn_redis(id, broadcast_tx.clone(), workspace.clone()).await;
}

broadcast_tx
}
}

pub fn api_handler(router: Router) -> Router {
Expand Down
Loading