From 0da66d3bb31f010556cb2454894db6f803e957f5 Mon Sep 17 00:00:00 2001 From: DuKuangjin Date: Wed, 10 Sep 2025 15:43:07 +0800 Subject: [PATCH 1/2] docs(keck): add redis usage guide --- .env-template | 1 + Cargo.lock | 44 ++++++ apps/keck/Cargo.toml | 3 + apps/keck/README.md | 30 ++++ apps/keck/src/server/api/mod.rs | 161 ++++++++++++++++++++- apps/keck/src/server/sync/collaboration.rs | 77 ++++++++++ libs/jwst-rpc/src/lib.rs | 4 +- 7 files changed, 315 insertions(+), 5 deletions(-) create mode 100644 apps/keck/README.md diff --git a/.env-template b/.env-template index d2195cba9..1016ff7cc 100644 --- a/.env-template +++ b/.env-template @@ -29,3 +29,4 @@ SIGN_KEY = #BUCKET_ENDPOINT= #BUCKET_NAME= #BUCKET_ROOT= +REDIS_URL= diff --git a/Cargo.lock b/Cargo.lock index df1c4709c..0f2d81ea5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -888,6 +888,20 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -2535,11 +2549,13 @@ name = "keck" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "axum", "axum-test-helper", "cfg-if 1.0.0", "dotenvy", "futures", + "jwst-codec", "jwst-core", "jwst-logger", "jwst-rpc", @@ -2549,6 +2565,7 @@ dependencies = [ "mimalloc", "nanoid", "rand 0.8.5", + "redis", "reqwest", "serde", "serde_json", @@ -3657,6 +3674,27 @@ dependencies = [ "yasna", ] +[[package]] +name = "redis" +version = "0.23.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f49cdc0bb3f412bf8e7d1bd90fe1d9eb10bc5c399ba90973c14662a27b3f8ba" +dependencies = [ + "async-trait", + "bytes", + "combine", + "futures-util", + "itoa", + "percent-encoding", + "pin-project-lite", + "ryu", + "sha1_smol", + "socket2", + "tokio", + "tokio-util", + "url", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -4464,6 +4502,12 @@ dependencies = [ "digest 0.10.7", ] +[[package]] +name = "sha1_smol" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" + [[package]] name = "sha2" version = "0.10.8" diff --git a/apps/keck/Cargo.toml b/apps/keck/Cargo.toml index f219f5235..ad29aa4aa 100644 --- a/apps/keck/Cargo.toml +++ b/apps/keck/Cargo.toml @@ -42,6 +42,7 @@ tokio = { version = "=1.28.0", features = [ "rt-multi-thread", "signal", ] } +async-trait = "0.1" utoipa = { version = "3.5.0", features = ["axum_extras"], optional = true } utoipa-swagger-ui = { version = "3.1.5", optional = true } libc = "0.2.147" @@ -49,6 +50,7 @@ reqwest = { version = "0.11.19", default-features = false, features = [ "json", "rustls-tls", ] } +redis = { version = "0.23", features = ["tokio-comp"] } # ======= workspace dependencies ======= anyhow = { workspace = true } @@ -58,6 +60,7 @@ rand = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } thiserror = { workspace = true } +jwst-codec = { workspace = true } jwst-core = { workspace = true, features = ["large_refs"] } jwst-logger = { workspace = true } diff --git a/apps/keck/README.md b/apps/keck/README.md new file mode 100644 index 000000000..4022542e6 --- /dev/null +++ b/apps/keck/README.md @@ -0,0 +1,30 @@ +# Keck + +This service provides collaborative Yjs document sync with optional Redis Pub/Sub +to coordinate multiple instances. + +## Multi-node synchronization + +1. Start a Redis server. +2. For each `keck` instance set the `REDIS_URL` environment variable pointing to +the Redis server, e.g. + ```bash + REDIS_URL=redis://127.0.0.1:6379 cargo run -p keck + ``` +3. Run multiple `keck` processes. Clients can connect to any instance using + `ws://:/collaboration/{roomid}` and will share the same Yjs data. + +## Environment variables + +- `DATABASE_URL` – database for persistence. +- `REDIS_URL` – enables Redis Pub/Sub for cross-node updates. When unset the + instance behaves as a single node. + +## Tests + +A Redis server is required to run the multi-node integration test: + +```bash +redis-server --daemonize yes +cargo test -p keck --test-threads=1 -- --ignored +``` diff --git a/apps/keck/src/server/api/mod.rs b/apps/keck/src/server/api/mod.rs index 54d520434..2dd3e5a72 100644 --- a/apps/keck/src/server/api/mod.rs +++ b/apps/keck/src/server/api/mod.rs @@ -4,8 +4,12 @@ mod blobs; mod blocks; mod doc; -use std::collections::HashMap; +use std::{ + collections::{hash_map::Entry, HashMap, HashSet}, + sync::Arc, +}; +use async_trait::async_trait; use axum::Router; #[cfg(feature = "api")] use axum::{ @@ -15,9 +19,15 @@ use axum::{ routing::{delete, get, head, post}, }; use doc::doc_apis; -use jwst_rpc::{BroadcastChannels, RpcContextImpl}; +use futures::StreamExt; +use jwst_codec::encode_update_as_message; +use jwst_rpc::{broadcast::subscribe, decode_update_with_guid, BroadcastChannels, BroadcastType, RpcContextImpl}; use jwst_storage::{BlobStorageType, JwstStorage, JwstStorageResult}; -use tokio::sync::RwLock; +use redis::AsyncCommands; +use tokio::sync::{ + broadcast::{channel as broadcast, Sender as BroadcastSender}, + Mutex, RwLock, +}; use super::*; @@ -44,6 +54,9 @@ pub struct Context { channel: BroadcastChannels, storage: JwstStorage, webhook: Arc>, + redis: Option, + ignore_updates: Arc>>>, + redis_workspaces: RwLock>, } impl Context { @@ -71,6 +84,11 @@ impl Context { webhook: Arc::new(std::sync::RwLock::new( dotenvy::var("HOOK_ENDPOINT").unwrap_or_default(), )), + redis: dotenvy::var("REDIS_URL") + .ok() + .and_then(|url| redis::Client::open(url).ok()), + ignore_updates: Arc::new(Mutex::new(HashSet::new())), + redis_workspaces: RwLock::new(HashSet::new()), } } @@ -120,6 +138,112 @@ impl Context { *write_guard = endpoint; } + async fn spawn_redis(&self, id: String, broadcast_tx: BroadcastSender, workspace: Workspace) { + { + let mut guard = self.redis_workspaces.write().await; + if !guard.insert(id.clone()) { + return; + } + } + + let Some(client) = &self.redis else { + return; + }; + let client_pub = client.clone(); + let client_sub = client.clone(); + let channel = format!("keck:{id}"); + let ignore = self.ignore_updates.clone(); + let tx_pub = broadcast_tx.clone(); + let channel_pub = channel.clone(); + + // publisher task + tokio::spawn(async move { + let mut conn = match client_pub.get_async_connection().await { + Ok(c) => c, + Err(e) => { + error!("redis publisher connect error: {:?}", e); + return; + } + }; + let mut rx = tx_pub.subscribe(); + while let Ok(msg) = rx.recv().await { + match msg { + BroadcastType::BroadcastRawContent(update) => { + if ignore.lock().await.remove(&update) { + continue; + } + let mut payload = Vec::with_capacity(1 + update.len()); + payload.push(0); + payload.extend_from_slice(&update); + let _: Result<(), _> = conn.publish(channel_pub.clone(), payload).await; + } + BroadcastType::BroadcastAwareness(data) => { + let mut payload = Vec::with_capacity(1 + data.len()); + payload.push(1); + payload.extend_from_slice(&data); + let _: Result<(), _> = conn.publish(channel_pub.clone(), payload).await; + } + _ => {} + } + } + }); + + // subscriber task + let ignore = self.ignore_updates.clone(); + let channel_sub = channel; + let tx_sub = broadcast_tx.clone(); + tokio::spawn(async move { + let mut conn = match client_sub.get_async_connection().await { + Ok(c) => c, + Err(e) => { + error!("redis subscriber connect error: {:?}", e); + return; + } + }; + let mut pubsub = conn.into_pubsub(); + if pubsub.subscribe(channel_sub.clone()).await.is_err() { + error!("redis subscribe failed"); + return; + } + let mut stream = pubsub.on_message(); + while let Some(msg) = stream.next().await { + let payload: Vec = match msg.get_payload() { + Ok(p) => p, + Err(e) => { + error!("redis payload error: {:?}", e); + continue; + } + }; + if payload.is_empty() { + continue; + } + match payload[0] { + 0 => { + let data = payload[1..].to_vec(); + ignore.lock().await.insert(data.clone()); + if let Ok((_, update)) = decode_update_with_guid(&data) { + let mut ws = workspace.clone(); + let update_vec = update.to_vec(); + let _ = tokio::task::spawn_blocking(move || { + ws.sync_messages(vec![update_vec]); + }) + .await; + if let Ok(msg) = encode_update_as_message(update.to_vec()) { + let _ = tx_sub.send(BroadcastType::BroadcastContent(msg)); + } + let _ = tx_sub.send(BroadcastType::BroadcastRawContent(data)); + } + } + 1 => { + let data = payload[1..].to_vec(); + let _ = tx_sub.send(BroadcastType::BroadcastAwareness(data)); + } + _ => {} + } + } + }); + } + pub async fn get_workspace(&self, workspace_id: S) -> JwstStorageResult where S: AsRef, @@ -155,6 +279,7 @@ impl Context { } } +#[async_trait] impl RpcContextImpl<'_> for Context { fn get_storage(&self) -> &JwstStorage { &self.storage @@ -163,6 +288,36 @@ impl RpcContextImpl<'_> for Context { fn get_channel(&self) -> &BroadcastChannels { &self.channel } + + async fn join_broadcast( + &self, + workspace: &mut Workspace, + identifier: String, + last_synced: tokio::sync::mpsc::Sender, + ) -> BroadcastSender { + let id = workspace.id(); + let broadcast_tx = { + let mut channels = self.channel.write().await; + match channels.entry(id.clone()) { + Entry::Occupied(tx) => tx.get().clone(), + Entry::Vacant(v) => { + let (tx, _) = broadcast(10240); + v.insert(tx.clone()); + tx.clone() + } + } + }; + + subscribe(workspace, identifier.clone(), broadcast_tx.clone()).await; + self.save_update(&id, identifier, broadcast_tx.subscribe(), last_synced) + .await; + + if self.redis.is_some() { + self.spawn_redis(id, broadcast_tx.clone(), workspace.clone()).await; + } + + broadcast_tx + } } pub fn api_handler(router: Router) -> Router { diff --git a/apps/keck/src/server/sync/collaboration.rs b/apps/keck/src/server/sync/collaboration.rs index 10504667b..60c82931a 100644 --- a/apps/keck/src/server/sync/collaboration.rs +++ b/apps/keck/src/server/sync/collaboration.rs @@ -234,6 +234,54 @@ mod test { close_collaboration_server(child); } + #[test] + #[ignore = "requires redis"] + fn multi_node_sync_with_redis() { + let redis_port = thread_rng().gen_range(31000..=32000); + let mut redis_child = Command::new("redis-server") + .args(["--save", "", "--appendonly", "no", "--port", &redis_port.to_string()]) + .stdout(Stdio::null()) + .spawn() + .expect("start redis"); + sleep(Duration::from_millis(500)); + + let server_a = thread_rng().gen_range(10000..=20000); + let server_b = thread_rng().gen_range(20001..=30000); + let child_a = start_collaboration_server_with_redis(server_a, redis_port); + let child_b = start_collaboration_server_with_redis(server_b, redis_port); + + let rt = Arc::new(Runtime::new().unwrap()); + let workspace_id = "redis"; + { + let context = rt.block_on(async move { + Arc::new(TestContext::new(Arc::new( + JwstStorage::new_with_migration("sqlite::memory:", BlobStorageType::DB) + .await + .expect("get storage: memory sqlite failed"), + ))) + }); + let remote = format!("ws://localhost:{server_a}/collaboration/{workspace_id}"); + start_websocket_client_sync( + rt.clone(), + context.clone(), + Arc::default(), + remote, + workspace_id.to_owned(), + ); + let mut workspace = rt.block_on(async move { context.get_workspace(workspace_id).await.unwrap() }); + create_block(&mut workspace, "b1".to_string(), "list".to_string()); + } + + sleep(Duration::from_secs(1)); + let ret = get_block_from_server(workspace_id.to_string(), "b1".to_string(), server_b); + assert!(!ret.is_empty()); + + close_collaboration_server(child_a); + close_collaboration_server(child_b); + let _ = redis_child.kill(); + let _ = redis_child.wait(); + } + fn get_block_from_server(workspace_id: String, block_id: String, server_port: u16) -> String { let rt = Runtime::new().unwrap(); rt.block_on(async { @@ -282,6 +330,35 @@ mod test { child } + fn start_collaboration_server_with_redis(port: u16, redis_port: u16) -> Child { + let url = format!("redis://127.0.0.1:{redis_port}"); + let mut child = Command::new("cargo") + .args(&["run", "-p", "keck"]) + .env("KECK_PORT", port.to_string()) + .env("USE_MEMORY_SQLITE", "true") + .env("KECK_LOG", "debug") + .env("REDIS_URL", url) + .stdout(Stdio::piped()) + .spawn() + .expect("Failed to run command"); + + if let Some(ref mut stdout) = child.stdout { + let reader = BufReader::new(stdout); + + for line in reader.lines() { + let line = line.expect("Failed to read line"); + info!("{}", line); + + if line.contains("listening on 0.0.0.0:") { + info!("Keck server started"); + break; + } + } + } + + child + } + fn close_collaboration_server(child: Child) { unsafe { kill(child.id() as c_int, SIGTERM) }; } diff --git a/libs/jwst-rpc/src/lib.rs b/libs/jwst-rpc/src/lib.rs index 031a3af7d..f545404eb 100644 --- a/libs/jwst-rpc/src/lib.rs +++ b/libs/jwst-rpc/src/lib.rs @@ -1,5 +1,5 @@ #[forbid(unsafe_code)] -mod broadcast; +pub mod broadcast; mod client; mod connector; mod context; @@ -32,7 +32,7 @@ use tokio::{ time::{sleep, Duration}, }; pub use utils::{connect_memory_workspace, MinimumServerContext}; -use utils::{decode_update_with_guid, encode_update_with_guid}; +pub use utils::{decode_update_with_guid, encode_update_with_guid}; #[cfg(feature = "webrtc")] pub use webrtcrs::peer_connection::sdp::session_description::RTCSessionDescription; From 2c3cdb57cd457b9dfb1ce5d65380a8793b4b6ef6 Mon Sep 17 00:00:00 2001 From: DuKuangjin Date: Wed, 10 Sep 2025 16:50:33 +0800 Subject: [PATCH 2/2] fix: avoid duplicate workspace docs --- .../src/migration/src/m20230626_023319_doc_guid.rs | 1 + libs/jwst-storage/src/storage/docs/database.rs | 13 +++++++++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/libs/jwst-storage/src/migration/src/m20230626_023319_doc_guid.rs b/libs/jwst-storage/src/migration/src/m20230626_023319_doc_guid.rs index ccaf1f49a..338c689f5 100644 --- a/libs/jwst-storage/src/migration/src/m20230626_023319_doc_guid.rs +++ b/libs/jwst-storage/src/migration/src/m20230626_023319_doc_guid.rs @@ -30,6 +30,7 @@ impl MigrationTrait for Migration { .table(Docs::Table) .name("docs_guid") .col(Docs::Guid) + .unique() .to_owned(), ) .await diff --git a/libs/jwst-storage/src/storage/docs/database.rs b/libs/jwst-storage/src/storage/docs/database.rs index 12325d4f0..23600a8ea 100644 --- a/libs/jwst-storage/src/storage/docs/database.rs +++ b/libs/jwst-storage/src/storage/docs/database.rs @@ -2,7 +2,7 @@ use std::collections::hash_map::Entry; use jwst_codec::{encode_update_as_message, CrdtReader, Doc, DocOptions, RawDecoder, StateVector}; use jwst_core::{DocStorage, Workspace}; -use sea_orm::Condition; +use sea_orm::{sea_query::OnConflict, Condition}; use tokio::task::spawn_blocking; use super::{entities::prelude::*, *}; @@ -144,7 +144,6 @@ impl DocDBStorage { let is_workspace = Self::is_workspace(conn, guid).await?; - Docs::delete_many().filter(DocsColumn::Guid.eq(guid)).exec(conn).await?; Docs::insert(DocsActiveModel { workspace_id: Set(workspace.into()), guid: Set(guid.into()), @@ -153,6 +152,16 @@ impl DocDBStorage { created_at: Set(Utc::now().into()), ..Default::default() }) + .on_conflict( + OnConflict::column(DocsColumn::Guid) + .update_columns([ + DocsColumn::WorkspaceId, + DocsColumn::Blob, + DocsColumn::IsWorkspace, + DocsColumn::CreatedAt, + ]) + .to_owned(), + ) .exec(conn) .await?; trace!("end replace: {workspace}");