diff --git a/Cargo.lock b/Cargo.lock index df1c4709c..77a5d26eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -888,6 +888,20 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -2538,8 +2552,10 @@ dependencies = [ "axum", "axum-test-helper", "cfg-if 1.0.0", + "chrono", "dotenvy", "futures", + "jwst-codec", "jwst-core", "jwst-logger", "jwst-rpc", @@ -2549,6 +2565,7 @@ dependencies = [ "mimalloc", "nanoid", "rand 0.8.5", + "redis", "reqwest", "serde", "serde_json", @@ -3657,6 +3674,30 @@ dependencies = [ "yasna", ] +[[package]] +name = "redis" +version = "0.23.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f49cdc0bb3f412bf8e7d1bd90fe1d9eb10bc5c399ba90973c14662a27b3f8ba" +dependencies = [ + "arc-swap", + "async-trait", + "bytes", + "combine", + "futures", + "futures-util", + "itoa", + "percent-encoding", + "pin-project-lite", + "ryu", + "sha1_smol", + "socket2", + "tokio", + "tokio-retry", + "tokio-util", + "url", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -4464,6 +4505,12 @@ dependencies = [ "digest 0.10.7", ] +[[package]] +name = "sha1_smol" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" + [[package]] name = "sha2" version = "0.10.8" @@ -5201,6 +5248,17 @@ dependencies = [ "syn 2.0.99", ] +[[package]] +name = "tokio-retry" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" +dependencies = [ + "pin-project", + "rand 0.8.5", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.24.1" @@ -5671,23 +5729,25 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.87" +version = "0.2.101" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7706a72ab36d8cb1f80ffbf0e071533974a60d0a308d01a5d0375bf60499a342" +checksum = "7e14915cadd45b529bb8d1f343c4ed0ac1de926144b746e2710f9cd05df6603b" dependencies = [ "cfg-if 1.0.0", + "once_cell", + "rustversion", "wasm-bindgen-macro", + "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-backend" -version = "0.2.87" +version = "0.2.101" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ef2b6d3c510e9625e5fe6f509ab07d66a760f0885d858736483c32ed7809abd" +checksum = "e28d1ba982ca7923fd01448d5c30c6864d0a14109560296a162f80f305fb93bb" dependencies = [ "bumpalo", "log", - "once_cell", "proc-macro2", "quote", "syn 2.0.99", @@ -5708,9 +5768,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.87" +version = "0.2.101" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dee495e55982a3bd48105a7b947fd2a9b4a8ae3010041b9e0faab3f9cd028f1d" +checksum = "7c3d463ae3eff775b0c45df9da45d68837702ac35af998361e2c84e7c5ec1b0d" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -5718,9 +5778,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.87" +version = "0.2.101" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" +checksum = "7bb4ce89b08211f923caf51d527662b75bdc9c9c7aab40f86dcb9fb85ac552aa" dependencies = [ "proc-macro2", "quote", @@ -5731,9 +5791,12 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.87" +version = "0.2.101" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" +checksum = "f143854a3b13752c6950862c906306adb27c7e839f7414cec8fea35beab624c1" +dependencies = [ + "unicode-ident", +] [[package]] name = "wasm-streams" diff --git a/Cargo.toml b/Cargo.toml index d41957113..1a2af06f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,12 +29,11 @@ serde_json = "1.0.107" thiserror = "1.0.50" tokio = "1" -jwst = { workspace = true, path = "libs/jwst" } -jwst-codec = { workspace = true, path = "libs/jwst-codec" } -jwst-core = { workspace = true, path = "libs/jwst-core" } -jwst-logger = { workspace = true, path = "libs/jwst-logger" } -jwst-rpc = { workspace = true, path = "libs/jwst-rpc" } -jwst-storage = { workspace = true, path = "libs/jwst-storage" } +jwst-codec = { path = "libs/jwst-codec" } +jwst-core = { path = "libs/jwst-core" } +jwst-logger = { path = "libs/jwst-logger" } +jwst-rpc = { path = "libs/jwst-rpc" } +jwst-storage = { path = "libs/jwst-storage" } [profile.release] lto = true diff --git a/apps/keck/.env.template b/apps/keck/.env.template index e3cca72ae..13a6c871d 100644 --- a/apps/keck/.env.template +++ b/apps/keck/.env.template @@ -1,2 +1,8 @@ # The block observation webhook endpoint # HOOK_ENDPOINT= + +# Redis URL for multi-node synchronization +# REDIS_URL=redis://127.0.0.1:6379 + +# Node ID for multi-node setup (optional, auto-generated if not provided) +# NODE_ID= diff --git a/apps/keck/Cargo.toml b/apps/keck/Cargo.toml index f219f5235..29af814b7 100644 --- a/apps/keck/Cargo.toml +++ b/apps/keck/Cargo.toml @@ -49,9 +49,11 @@ reqwest = { version = "0.11.19", default-features = false, features = [ "json", "rustls-tls", ] } +redis = { version = "0.23.3", features = ["tokio-comp", "connection-manager"] } # ======= workspace dependencies ======= anyhow = { workspace = true } +chrono = { workspace = true } futures = { workspace = true } nanoid = { workspace = true } rand = { workspace = true } @@ -60,6 +62,7 @@ serde_json = { workspace = true } thiserror = { workspace = true } jwst-core = { workspace = true, features = ["large_refs"] } +jwst-codec = { workspace = true } jwst-logger = { workspace = true } jwst-rpc = { workspace = true } jwst-storage = { workspace = true } diff --git a/apps/keck/Dockerfile b/apps/keck/Dockerfile new file mode 100644 index 000000000..0ca2be50f --- /dev/null +++ b/apps/keck/Dockerfile @@ -0,0 +1,41 @@ +# Build stage +FROM rust:1.75-bullseye as builder + +WORKDIR /app + +# Copy the workspace configuration +COPY Cargo.toml Cargo.lock ./ +COPY apps/keck/Cargo.toml ./apps/keck/ +COPY libs/ ./libs/ + +# Copy source code +COPY apps/keck/src/ ./apps/keck/src/ + +# Build the application +RUN cargo build --release -p keck + +# Runtime stage +FROM debian:bullseye-slim + +# Install runtime dependencies +RUN apt-get update && apt-get install -y \ + ca-certificates \ + && rm -rf /var/lib/apt/lists/* + +# Create app user +RUN useradd -m -u 1001 keck + +# Copy the binary +COPY --from=builder /app/target/release/keck /usr/local/bin/keck + +# Set ownership +RUN chown keck:keck /usr/local/bin/keck + +# Switch to app user +USER keck + +# Expose port +EXPOSE 3000 + +# Run the application +CMD ["keck"] \ No newline at end of file diff --git a/apps/keck/README-multinode.md b/apps/keck/README-multinode.md new file mode 100644 index 000000000..68b1fc5eb --- /dev/null +++ b/apps/keck/README-multinode.md @@ -0,0 +1,211 @@ +# Keck Multi-Node Setup + +This guide explains how to set up Keck in a multi-node configuration with Redis synchronization for high availability and scalability. + +## Architecture + +``` +Client Apps + | + v + Nginx (Port 3000) + | + v +Load Balancer + | + +-- Keck Node 1 (Port 3001) + +-- Keck Node 2 (Port 3002) + +-- Keck Node 3 (Port 3003) + | + v + Redis (Sync) + PostgreSQL (Storage) +``` + +## Features + +- **Multi-node collaboration**: Multiple Keck instances can run simultaneously +- **Redis synchronization**: Real-time sync of CRDT operations between nodes +- **Load balancing**: Nginx distributes WebSocket connections across nodes +- **Session consistency**: Same roomid gets consistent data across all nodes +- **High availability**: If one node fails, others continue to serve requests + +## Quick Start with Docker Compose + +1. **Start all services**: + ```bash + docker-compose up -d + ``` + +2. **Access the application**: + - Main endpoint: `ws://localhost:3000/collaboration/{roomid}` + - Individual nodes: `ws://localhost:3001/collaboration/{roomid}`, etc. + +3. **Monitor logs**: + ```bash + docker-compose logs -f keck-node-1 + ``` + +4. **Scale up/down**: + ```bash + docker-compose up -d --scale keck-node-1=2 + ``` + +## Manual Setup + +### Prerequisites + +- Redis server +- PostgreSQL database +- Rust 1.75+ + +### Environment Variables + +Set these variables for each Keck node: + +```bash +# Required +DATABASE_URL=postgres://user:password@host:5432/database +REDIS_URL=redis://localhost:6379 + +# Optional +KECK_PORT=3000 # Port for this node +NODE_ID=keck-node-1 # Unique identifier for this node +HOOK_ENDPOINT= # Webhook URL for block changes +``` + +### Running Multiple Nodes + +1. **Start Redis**: + ```bash + redis-server + ``` + +2. **Start PostgreSQL**: + ```bash + # Make sure PostgreSQL is running and database is created + createdb keck + ``` + +3. **Start Keck nodes**: + ```bash + # Node 1 + KECK_PORT=3001 NODE_ID=node1 REDIS_URL=redis://localhost:6379 \ + DATABASE_URL=postgres://user:pass@localhost/keck cargo run + + # Node 2 + KECK_PORT=3002 NODE_ID=node2 REDIS_URL=redis://localhost:6379 \ + DATABASE_URL=postgres://user:pass@localhost/keck cargo run + + # Node 3 + KECK_PORT=3003 NODE_ID=node3 REDIS_URL=redis://localhost:6379 \ + DATABASE_URL=postgres://user:pass@localhost/keck cargo run + ``` + +4. **Start Nginx**: + ```bash + nginx -c /path/to/nginx.conf + ``` + +## Configuration + +### Nginx Load Balancer + +The included `nginx.conf` uses `ip_hash` for session affinity. Edit the upstream block to add/remove nodes: + +```nginx +upstream keck_nodes { + ip_hash; + server 127.0.0.1:3001 weight=1; + server 127.0.0.1:3002 weight=1; + server 127.0.0.1:3003 weight=1; +} +``` + +### Redis Configuration + +For production, consider Redis clustering or Redis Sentinel for high availability: + +```bash +# Redis with persistence +redis-server --appendonly yes --save 60 1000 + +# Redis with password +redis-server --requirepass your-password +``` + +### Database Migrations + +Keck automatically runs database migrations on startup. For production, you may want to run migrations separately: + +```bash +# Run migrations before starting nodes +DATABASE_URL=postgres://... cargo run --bin migrations +``` + +## Testing Multi-Node Setup + +1. **Connect to the load balancer**: + ```javascript + const ws = new WebSocket('ws://localhost:3000/collaboration/test-room'); + ``` + +2. **Verify synchronization**: + - Connect multiple clients to the same room + - Make changes and verify they appear on all clients + - Stop one node and verify others continue working + +3. **Monitor Redis activity**: + ```bash + redis-cli monitor + ``` + +## Troubleshooting + +### Connection Issues + +- Check if all services are running: `docker-compose ps` +- Verify Redis connectivity: `redis-cli ping` +- Check PostgreSQL connectivity: `psql -h localhost -p 5432 -U keck -d keck` + +### Synchronization Issues + +- Check Redis logs for pub/sub activity +- Verify NODE_ID is unique for each instance +- Ensure all nodes use the same Redis instance + +### Performance Issues + +- Monitor Redis memory usage: `redis-cli info memory` +- Check PostgreSQL connection pool settings +- Adjust Nginx worker processes and connections + +## Production Considerations + +1. **Security**: + - Use TLS for Redis connections + - Enable PostgreSQL SSL + - Configure Nginx with SSL/TLS + +2. **Monitoring**: + - Add health checks for each service + - Monitor Redis pub/sub lag + - Track PostgreSQL connection counts + +3. **Scaling**: + - Consider Redis Cluster for large deployments + - Use read replicas for PostgreSQL + - Implement proper session affinity strategies + +4. **Backup**: + - Regular PostgreSQL backups + - Redis AOF/RDB persistence + - Monitor backup integrity + +## API Endpoints + +- `GET /api/workspace/:id/blob/:name` - Get blob +- `PUT /api/workspace/:id/blob` - Upload blob +- `POST /collaboration/:workspace` - Auth WebSocket +- `GET /collaboration/:workspace` - Upgrade to WebSocket + +All endpoints are automatically load balanced across available nodes. \ No newline at end of file diff --git a/apps/keck/docker-compose.yml b/apps/keck/docker-compose.yml new file mode 100644 index 000000000..625f16f28 --- /dev/null +++ b/apps/keck/docker-compose.yml @@ -0,0 +1,101 @@ +version: '3.8' + +services: + # Redis for multi-node synchronization + redis: + image: redis:7-alpine + ports: + - "6379:6379" + command: redis-server --appendonly yes + volumes: + - redis_data:/data + networks: + - keck_network + + # PostgreSQL database (shared by all Keck nodes) + postgres: + image: postgres:15-alpine + environment: + POSTGRES_DB: keck + POSTGRES_USER: keck + POSTGRES_PASSWORD: keck_password + ports: + - "5432:5432" + volumes: + - postgres_data:/var/lib/postgresql/data + networks: + - keck_network + + # Keck Node 1 + keck-node-1: + build: . + environment: + - KECK_PORT=3001 + - DATABASE_URL=postgres://keck:keck_password@postgres:5432/keck + - REDIS_URL=redis://redis:6379 + - NODE_ID=keck-node-1 + ports: + - "3001:3001" + depends_on: + - postgres + - redis + networks: + - keck_network + restart: unless-stopped + + # Keck Node 2 + keck-node-2: + build: . + environment: + - KECK_PORT=3002 + - DATABASE_URL=postgres://keck:keck_password@postgres:5432/keck + - REDIS_URL=redis://redis:6379 + - NODE_ID=keck-node-2 + ports: + - "3002:3002" + depends_on: + - postgres + - redis + networks: + - keck_network + restart: unless-stopped + + # Keck Node 3 + keck-node-3: + build: . + environment: + - KECK_PORT=3003 + - DATABASE_URL=postgres://keck:keck_password@postgres:5432/keck + - REDIS_URL=redis://redis:6379 + - NODE_ID=keck-node-3 + ports: + - "3003:3003" + depends_on: + - postgres + - redis + networks: + - keck_network + restart: unless-stopped + + # Nginx Load Balancer + nginx: + image: nginx:alpine + ports: + - "3000:3000" + volumes: + - ./nginx.conf:/etc/nginx/conf.d/default.conf + depends_on: + - keck-node-1 + - keck-node-2 + - keck-node-3 + networks: + - keck_network + restart: unless-stopped + +volumes: + redis_data: + postgres_data: + +networks: + keck_network: + driver: bridge \ No newline at end of file diff --git a/apps/keck/nginx.conf b/apps/keck/nginx.conf new file mode 100644 index 000000000..3ba905397 --- /dev/null +++ b/apps/keck/nginx.conf @@ -0,0 +1,114 @@ +# Nginx configuration for multi-node Keck load balancing +# This configuration distributes WebSocket connections across multiple Keck instances +# while ensuring session affinity for the same roomid + +upstream keck_nodes { + # Use IP hash to ensure sticky sessions for the same client/room + ip_hash; + + # Add your Keck instances here + server 127.0.0.1:3001 weight=1 max_fails=3 fail_timeout=30s; + server 127.0.0.1:3002 weight=1 max_fails=3 fail_timeout=30s; + server 127.0.0.1:3003 weight=1 max_fails=3 fail_timeout=30s; + + # Add more instances as needed + # server 127.0.0.1:3004 weight=1 max_fails=3 fail_timeout=30s; + + # Health check + keepalive 32; +} + +server { + listen 3000; + server_name localhost; + + # Logging + access_log /var/log/nginx/keck_access.log; + error_log /var/log/nginx/keck_error.log; + + # WebSocket support + proxy_http_version 1.1; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + + # Disable buffering for real-time WebSocket communication + proxy_buffering off; + proxy_cache off; + + # Timeout settings + proxy_connect_timeout 60s; + proxy_send_timeout 60s; + proxy_read_timeout 60s; + + # Handle WebSocket collaboration endpoints + location /collaboration/ { + proxy_pass http://keck_nodes; + + # For WebSocket connections, these headers are important + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; + + # Pass the original host information + proxy_set_header Host $http_host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + } + + # Handle API endpoints + location /api/ { + proxy_pass http://keck_nodes; + + # Standard headers for API requests + proxy_set_header Host $http_host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + } + + # Health check endpoint for load balancer + location /health { + proxy_pass http://keck_nodes; + proxy_set_header Host $http_host; + } + + # Static files (if any) + location / { + proxy_pass http://keck_nodes; + proxy_set_header Host $http_host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + } +} + +# Alternative configuration with path-based routing +# Uncomment this section if you want to route different rooms to specific nodes +# server { +# listen 3000; +# server_name localhost; +# +# # Route specific room patterns to specific nodes +# location ~ ^/collaboration/room-(.*)$ { +# set $room_id $1; +# +# # Use consistent hashing based on room ID +# set $backend_server ""; +# set_by_lua $backend_server ' +# local crc32 = ngx.crc32_long(ngx.var.room_id) +# local node_count = 3 -- Number of Keck nodes +# local node_index = crc32 % node_count + 1 +# local ports = {3001, 3002, 3003} +# return "127.0.0.1:" .. ports[node_index] +# '; +# +# proxy_pass http://$backend_server; +# proxy_set_header Upgrade $http_upgrade; +# proxy_set_header Connection "upgrade"; +# proxy_set_header Host $host; +# } +# } \ No newline at end of file diff --git a/apps/keck/src/server/api/mod.rs b/apps/keck/src/server/api/mod.rs index 54d520434..311a7fa87 100644 --- a/apps/keck/src/server/api/mod.rs +++ b/apps/keck/src/server/api/mod.rs @@ -19,7 +19,7 @@ use jwst_rpc::{BroadcastChannels, RpcContextImpl}; use jwst_storage::{BlobStorageType, JwstStorage, JwstStorageResult}; use tokio::sync::RwLock; -use super::*; +use super::{redis_sync::RedisSync, *}; #[derive(Deserialize)] #[cfg_attr(feature = "api", derive(utoipa::IntoParams))] @@ -44,6 +44,7 @@ pub struct Context { channel: BroadcastChannels, storage: JwstStorage, webhook: Arc>, + redis_sync: Option>, } impl Context { @@ -65,12 +66,35 @@ impl Context { } .expect("Cannot create database"); + // Initialize Redis sync if Redis URL is provided + let redis_sync = if let Ok(redis_url) = dotenvy::var("REDIS_URL") { + match RedisSync::new(&redis_url).await { + Ok(sync) => { + if sync.is_connected().await { + info!("Redis sync enabled: {}", redis_url); + Some(Arc::new(sync)) + } else { + warn!("Redis sync initialization failed, falling back to single-node mode"); + None + } + } + Err(e) => { + warn!("Failed to initialize Redis sync: {}, falling back to single-node mode", e); + None + } + } + } else { + info!("Redis URL not provided, running in single-node mode"); + None + }; + Context { channel: RwLock::new(HashMap::new()), storage, webhook: Arc::new(std::sync::RwLock::new( dotenvy::var("HOOK_ENDPOINT").unwrap_or_default(), )), + redis_sync, } } @@ -153,9 +177,13 @@ impl Context { .await .map(|w| self.register_webhook(w)) } + + pub fn get_redis_sync(&self) -> Option> { + self.redis_sync.clone() + } } -impl RpcContextImpl<'_> for Context { +impl<'a> RpcContextImpl<'a> for Context { fn get_storage(&self) -> &JwstStorage { &self.storage } diff --git a/apps/keck/src/server/mod.rs b/apps/keck/src/server/mod.rs index 8804be4b6..7a8e7c4b4 100644 --- a/apps/keck/src/server/mod.rs +++ b/apps/keck/src/server/mod.rs @@ -1,4 +1,6 @@ mod api; +mod redis_broadcast; +mod redis_sync; mod sync; mod utils; diff --git a/apps/keck/src/server/redis_broadcast.rs b/apps/keck/src/server/redis_broadcast.rs new file mode 100644 index 000000000..56d2a60c3 --- /dev/null +++ b/apps/keck/src/server/redis_broadcast.rs @@ -0,0 +1,151 @@ +use std::sync::Arc; + +use jwst_core::Workspace; +use jwst_rpc::BroadcastType; +use tokio::sync::broadcast; + +use super::{redis_sync::{OperationType, RedisSync}, *}; + +/// Redis-aware broadcast implementation that sends messages to both local clients and other nodes via Redis +pub async fn setup_redis_broadcast( + workspace: &Workspace, + identifier: String, + sender: broadcast::Sender, + redis_sync: Option>, +) { + let workspace_id = workspace.id(); + + // Subscribe to Redis for this workspace if Redis sync is available + if let Some(redis_sync) = redis_sync.clone() { + if let Err(e) = redis_sync.subscribe_workspace(&workspace_id, sender.clone()).await { + error!("Failed to setup Redis subscription for workspace {}: {}", workspace_id, e); + } + } + + // Awareness subscription with Redis publishing + { + let sender = sender.clone(); + let workspace_id = workspace.id(); + let redis_sync = redis_sync.clone(); + + workspace + .subscribe_awareness(move |awareness, e| { + use jwst_codec::encode_awareness_as_message; + + let buffer = match encode_awareness_as_message(e.get_updated(awareness.get_states())) { + Ok(data) => data, + Err(e) => { + error!("failed to write awareness update: {}", e); + return; + } + }; + + // Publish to Redis first (if available) + if let Some(redis_sync) = redis_sync.clone() { + let workspace_id = workspace_id.clone(); + let buffer = buffer.clone(); + tokio::spawn(async move { + if let Err(e) = redis_sync.publish_operation(&workspace_id, OperationType::Awareness, buffer).await { + debug!("Failed to publish awareness to Redis: {}", e); + } + }); + } + + // Then send to local subscribers + if sender.send(BroadcastType::BroadcastAwareness(buffer)).is_err() { + debug!("broadcast channel {workspace_id} has been closed") + } + }) + .await; + } + + // Doc content subscription with Redis publishing + { + let sender = sender.clone(); + let workspace_id = workspace.id(); + let redis_sync = redis_sync.clone(); + + workspace.subscribe_doc(move |update, history| { + use jwst_codec::encode_update_as_message; + use super::utils::encode_update_with_guid; + + debug!( + "workspace {} changed: {}bytes, {} histories", + workspace_id, + update.len(), + history.len() + ); + + match encode_update_with_guid(update, workspace_id.clone()) + .and_then(|update_with_guid| encode_update_as_message(update.to_vec()).map(|u| (update_with_guid, u))) + { + Ok((broadcast_update, sendable_update)) => { + // Publish to Redis first (if available) + if let Some(redis_sync) = redis_sync.clone() { + let workspace_id = workspace_id.clone(); + let broadcast_update_clone = broadcast_update.clone(); + let sendable_update_clone = sendable_update.clone(); + + tokio::spawn(async move { + // Publish raw content + if let Err(e) = redis_sync.publish_operation(&workspace_id, OperationType::RawContent, broadcast_update_clone).await { + debug!("Failed to publish raw content to Redis: {}", e); + } + + // Publish sendable content + if let Err(e) = redis_sync.publish_operation(&workspace_id, OperationType::Content, sendable_update_clone).await { + debug!("Failed to publish content to Redis: {}", e); + } + }); + } + + // Send to local subscribers + if sender + .send(BroadcastType::BroadcastRawContent(broadcast_update)) + .is_err() + { + debug!("broadcast channel {workspace_id} has been closed") + } + + if sender.send(BroadcastType::BroadcastContent(sendable_update)).is_err() { + debug!("broadcast channel {workspace_id} has been closed") + } + } + Err(e) => { + debug!("failed to encode update: {}", e); + } + } + }); + } + + // Cleanup task + let workspace_id = workspace.id(); + let redis_sync_cleanup = redis_sync.clone(); + tokio::spawn(async move { + let mut rx = sender.subscribe(); + loop { + tokio::select! { + Ok(msg) = rx.recv() => { + match msg { + BroadcastType::CloseUser(user) if user == identifier => break, + BroadcastType::CloseAll => break, + _ => {} + } + }, + _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => { + let count = sender.receiver_count(); + if count < 1 { + break; + } + } + } + } + + // Cleanup Redis subscription + if let Some(redis_sync) = redis_sync_cleanup { + redis_sync.unsubscribe_workspace(&workspace_id).await; + } + + debug!("broadcast channel {workspace_id} has been closed"); + }); +} \ No newline at end of file diff --git a/apps/keck/src/server/redis_sync.rs b/apps/keck/src/server/redis_sync.rs new file mode 100644 index 000000000..94b164739 --- /dev/null +++ b/apps/keck/src/server/redis_sync.rs @@ -0,0 +1,153 @@ +use std::{collections::HashMap, sync::Arc}; + +use anyhow::Result; +use chrono::Utc; +use futures::StreamExt; +use jwst_rpc::BroadcastType; +use redis::{aio::ConnectionManager, AsyncCommands, Client}; +use serde::{Deserialize, Serialize}; +use tokio::sync::{broadcast, RwLock}; + +use super::*; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SyncMessage { + pub node_id: String, + pub workspace_id: String, + pub operation_type: OperationType, + pub data: Vec, + pub timestamp: i64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum OperationType { + Awareness, + Content, + RawContent, +} + +pub struct RedisSync { + client: Client, + connection_manager: Option, + node_id: String, + subscribers: Arc>>>, +} + +impl RedisSync { + pub async fn new(redis_url: &str) -> Result { + let client = Client::open(redis_url)?; + let connection_manager = ConnectionManager::new(client.clone()).await.ok(); + + let node_id = dotenvy::var("NODE_ID").unwrap_or_else(|_| nanoid!()); + info!("Redis sync initialized with node_id: {}", node_id); + + Ok(Self { + client, + connection_manager, + node_id, + subscribers: Arc::new(RwLock::new(HashMap::new())), + }) + } + + pub async fn is_connected(&self) -> bool { + self.connection_manager.is_some() + } + + pub async fn publish_operation( + &self, + workspace_id: &str, + operation_type: OperationType, + data: Vec, + ) -> Result<()> { + if let Some(mut conn) = self.connection_manager.clone() { + let message = SyncMessage { + node_id: self.node_id.clone(), + workspace_id: workspace_id.to_string(), + operation_type, + data, + timestamp: Utc::now().timestamp(), + }; + + let channel = format!("keck:sync:{}", workspace_id); + let serialized = serde_json::to_string(&message)?; + + conn.publish::<_, _, ()>(&channel, serialized).await?; + debug!("Published sync message to Redis channel: {}", channel); + } + Ok(()) + } + + pub async fn subscribe_workspace( + &self, + workspace_id: &str, + sender: broadcast::Sender, + ) -> Result<()> { + self.subscribers + .write() + .await + .insert(workspace_id.to_string(), sender); + + if self.connection_manager.is_some() { + let channel = format!("keck:sync:{}", workspace_id); + let subscribers = self.subscribers.clone(); + let node_id = self.node_id.clone(); + let client = self.client.clone(); + + tokio::spawn(async move { + let conn = match client.get_async_connection().await { + Ok(conn) => conn, + Err(e) => { + error!("Failed to get Redis connection for subscription: {}", e); + return; + } + }; + + let mut pubsub = conn.into_pubsub(); + if let Err(e) = pubsub.subscribe(&channel).await { + error!("Failed to subscribe to Redis channel {}: {}", channel, e); + return; + } + + loop { + match pubsub.on_message().next().await { + Some(msg) => { + if let Ok(payload) = msg.get_payload::() { + if let Ok(sync_message) = serde_json::from_str::(&payload) { + // Skip messages from the same node to avoid loops + if sync_message.node_id == node_id { + continue; + } + + let subscribers_read = subscribers.read().await; + if let Some(sender) = subscribers_read.get(&sync_message.workspace_id) { + let broadcast_type = match sync_message.operation_type { + OperationType::Awareness => BroadcastType::BroadcastAwareness(sync_message.data), + OperationType::Content => BroadcastType::BroadcastContent(sync_message.data), + OperationType::RawContent => BroadcastType::BroadcastRawContent(sync_message.data), + }; + + if let Err(e) = sender.send(broadcast_type) { + debug!("Failed to broadcast Redis sync message: {}", e); + } else { + debug!("Broadcast Redis sync message from node {}", sync_message.node_id); + } + } + } + } + } + None => { + debug!("Redis pubsub connection closed for channel: {}", channel); + break; + } + } + } + }); + } + + Ok(()) + } + + pub async fn unsubscribe_workspace(&self, workspace_id: &str) { + self.subscribers.write().await.remove(workspace_id); + } +} \ No newline at end of file diff --git a/apps/keck/src/server/sync/collaboration.rs b/apps/keck/src/server/sync/collaboration.rs index 10504667b..df313597e 100644 --- a/apps/keck/src/server/sync/collaboration.rs +++ b/apps/keck/src/server/sync/collaboration.rs @@ -32,8 +32,10 @@ pub async fn upgrade_handler( if let Err(e) = context.create_workspace(workspace.clone()).await { error!("create workspace failed: {:?}", e); } + + // Context now implements RPC with Redis support ws.protocols(["AFFiNE"]).on_upgrade(move |socket| { - handle_connector(context.clone(), workspace.clone(), identifier, move || { + handle_connector(context, workspace.clone(), identifier, move || { axum_socket_connector(socket, &workspace) }) .map(|_| ()) diff --git a/apps/keck/src/server/utils.rs b/apps/keck/src/server/utils.rs index 596fa35a0..dfc3c6ec8 100644 --- a/apps/keck/src/server/utils.rs +++ b/apps/keck/src/server/utils.rs @@ -1,3 +1,19 @@ +use std::io::Write; + +use jwst_codec::{CrdtWriter, JwstCodecError, JwstCodecResult, RawEncoder}; + pub use jwst_logger::{debug, error, info, warn}; pub use nanoid::nanoid; pub use serde::{Deserialize, Serialize}; + +pub fn encode_update_with_guid>(update: &[u8], guid: S) -> JwstCodecResult> { + let mut encoder = RawEncoder::default(); + encoder.write_var_string(guid)?; + let mut buffer = encoder.into_inner(); + + buffer + .write_all(update) + .map_err(|e| JwstCodecError::InvalidWriteBuffer(e.to_string()))?; + + Ok(buffer) +} diff --git a/apps/keck/start-multinode.sh b/apps/keck/start-multinode.sh new file mode 100755 index 000000000..4237c32ec --- /dev/null +++ b/apps/keck/start-multinode.sh @@ -0,0 +1,87 @@ +#!/bin/bash + +# Local development startup script for multi-node Keck +# This script starts multiple Keck instances for testing + +set -e + +echo "๐Ÿš€ Starting Multi-Node Keck for Local Development..." + +# Default values +REDIS_URL="${REDIS_URL:-redis://localhost:6379}" +DATABASE_URL="${DATABASE_URL:-postgres://localhost:5432/keck}" +NODE_COUNT="${NODE_COUNT:-3}" +BASE_PORT="${BASE_PORT:-3001}" + +echo "๐Ÿ“‹ Configuration:" +echo " Redis URL: $REDIS_URL" +echo " Database URL: $DATABASE_URL" +echo " Node Count: $NODE_COUNT" +echo " Base Port: $BASE_PORT" + +# Check if Redis is running +if ! redis-cli ping > /dev/null 2>&1; then + echo "โš ๏ธ Redis not found. Starting with Docker..." + docker run -d --name keck-redis -p 6379:6379 redis:7-alpine + sleep 2 +fi + +# Array to store PIDs +PIDS=() + +# Function to cleanup on exit +cleanup() { + echo "๐Ÿ›‘ Stopping all nodes..." + for pid in "${PIDS[@]}"; do + kill "$pid" 2>/dev/null || true + done + wait +} + +# Set up cleanup trap +trap cleanup EXIT INT TERM + +# Start multiple Keck nodes +for i in $(seq 1 $NODE_COUNT); do + PORT=$((BASE_PORT + i - 1)) + NODE_ID="keck-node-$i" + + echo "๐Ÿ”„ Starting $NODE_ID on port $PORT..." + + KECK_PORT=$PORT \ + NODE_ID=$NODE_ID \ + REDIS_URL=$REDIS_URL \ + DATABASE_URL=$DATABASE_URL \ + RUST_LOG=info \ + cargo run --release -p keck & + + PID=$! + PIDS+=($PID) + echo "โœ… Started $NODE_ID (PID: $PID)" + + # Small delay between starts + sleep 2 +done + +echo "" +echo "๐ŸŽ‰ All nodes started successfully!" +echo "" +echo "๐Ÿ“ก Active nodes:" +for i in $(seq 1 $NODE_COUNT); do + PORT=$((BASE_PORT + i - 1)) + echo " - Node $i: http://localhost:$PORT (ws://localhost:$PORT/collaboration/{roomid})" +done + +echo "" +echo "๐Ÿ“š Testing commands:" +echo " curl http://localhost:$BASE_PORT/api/workspace/test/blob/test" +echo " websocat ws://localhost:$BASE_PORT/collaboration/test-room" +echo "" +echo "๐Ÿ” Monitoring:" +echo " redis-cli monitor" +echo " ./test-multinode.sh" +echo "" +echo "Press Ctrl+C to stop all nodes..." + +# Wait for all background jobs +wait \ No newline at end of file diff --git a/apps/keck/test-multinode.sh b/apps/keck/test-multinode.sh new file mode 100755 index 000000000..62c06d04a --- /dev/null +++ b/apps/keck/test-multinode.sh @@ -0,0 +1,97 @@ +#!/bin/bash + +# Test script for multi-node Keck setup +# This script tests that multiple nodes are working and synchronized + +set -e + +echo "๐Ÿงช Testing Multi-Node Keck Setup..." + +# Check if required services are running +echo "๐Ÿ“‹ Checking services..." + +# Check Redis +if ! redis-cli ping > /dev/null 2>&1; then + echo "โŒ Redis is not running. Please start Redis first." + exit 1 +fi +echo "โœ… Redis is running" + +# Check if Keck nodes are running +NODES=("3001" "3002" "3003") +RUNNING_NODES=() + +for port in "${NODES[@]}"; do + if curl -s "http://localhost:${port}/api/workspace/test/blob/test" > /dev/null 2>&1; then + RUNNING_NODES+=($port) + echo "โœ… Keck node on port $port is running" + else + echo "โš ๏ธ Keck node on port $port is not responding" + fi +done + +if [ ${#RUNNING_NODES[@]} -eq 0 ]; then + echo "โŒ No Keck nodes are running. Please start at least one node." + exit 1 +fi + +echo "๐Ÿ“Š Found ${#RUNNING_NODES[@]} running node(s): ${RUNNING_NODES[*]}" + +# Check Nginx load balancer +if curl -s "http://localhost:3000/api/workspace/test/blob/test" > /dev/null 2>&1; then + echo "โœ… Nginx load balancer is working" +else + echo "โš ๏ธ Nginx load balancer is not responding (this is OK if not using nginx)" +fi + +# Test Redis pub/sub functionality +echo "๐Ÿ”„ Testing Redis pub/sub..." +REDIS_TEST_CHANNEL="keck:sync:test-workspace" + +# Subscribe to Redis channel in background +redis-cli subscribe "$REDIS_TEST_CHANNEL" > /tmp/redis_test.log & +REDIS_PID=$! + +# Give it a moment to subscribe +sleep 1 + +# Publish a test message +redis-cli publish "$REDIS_TEST_CHANNEL" '{"node_id":"test","workspace_id":"test-workspace","operation_type":"Content","data":[1,2,3],"timestamp":1234567890}' + +# Wait a moment and check +sleep 1 +kill $REDIS_PID 2>/dev/null || true + +if grep -q "test-workspace" /tmp/redis_test.log; then + echo "โœ… Redis pub/sub is working" +else + echo "โš ๏ธ Redis pub/sub test inconclusive" +fi + +rm -f /tmp/redis_test.log + +# Test WebSocket connections if websocat is available +if command -v websocat > /dev/null 2>&1; then + echo "๐Ÿ”Œ Testing WebSocket connections..." + + for port in "${RUNNING_NODES[@]}"; do + echo "Testing WebSocket on port $port..." + timeout 5 websocat "ws://localhost:${port}/collaboration/test-room" <<< '{"type":"ping"}' > /dev/null 2>&1 && \ + echo "โœ… WebSocket on port $port is working" || \ + echo "โš ๏ธ WebSocket on port $port connection failed" + done +else + echo "โš ๏ธ websocat not found, skipping WebSocket tests" + echo " Install with: cargo install websocat" +fi + +echo "" +echo "๐ŸŽ‰ Multi-node test completed!" +echo "" +echo "๐Ÿ“š Usage:" +echo " - Connect clients to: ws://localhost:3000/collaboration/{roomid}" +echo " - Individual nodes: ws://localhost:3001/collaboration/{roomid}" +echo " - Monitor Redis: redis-cli monitor" +echo " - Check logs: docker-compose logs -f" +echo "" +echo "๐Ÿš€ Your multi-node Keck setup is ready!" \ No newline at end of file diff --git a/libs/jwst-binding/jwst-wasm/Cargo.toml b/libs/jwst-binding/jwst-wasm/Cargo.toml index d4e1aac61..28f0654bc 100644 --- a/libs/jwst-binding/jwst-wasm/Cargo.toml +++ b/libs/jwst-binding/jwst-wasm/Cargo.toml @@ -14,7 +14,7 @@ cfg-if = "1.0.0" console_error_panic_hook = { version = "0.1.7", optional = true } getrandom = { version = "0.2", features = ["js"] } js-sys = "0.3.64" -wasm-bindgen = "0.2.87" +wasm-bindgen = "0.2.88" # ======= workspace dependencies ======= jwst-core = { workspace = true } diff --git a/libs/jwst-codec/src/doc/codec/update.rs b/libs/jwst-codec/src/doc/codec/update.rs index d107666bb..f2f32159a 100644 --- a/libs/jwst-codec/src/doc/codec/update.rs +++ b/libs/jwst-codec/src/doc/codec/update.rs @@ -97,11 +97,11 @@ impl Update { Ok(encoder.into_inner()) } - pub(crate) fn iter(&mut self, state: StateVector) -> UpdateIterator { + pub(crate) fn iter(&mut self, state: StateVector) -> UpdateIterator<'_> { UpdateIterator::new(self, state) } - pub fn delete_set_iter(&mut self, state: StateVector) -> DeleteSetIterator { + pub fn delete_set_iter(&mut self, state: StateVector) -> DeleteSetIterator<'_> { DeleteSetIterator::new(self, state) } diff --git a/libs/jwst-codec/src/doc/common/somr.rs b/libs/jwst-codec/src/doc/common/somr.rs index 71a158bbc..999ddb918 100644 --- a/libs/jwst-codec/src/doc/common/somr.rs +++ b/libs/jwst-codec/src/doc/common/somr.rs @@ -94,7 +94,7 @@ impl SomrInner { self.data.as_ref().map(|x| unsafe { &*x.get() }) } - fn data_mut(&self) -> Option> { + fn data_mut(&self) -> Option> { self.data.as_ref().map(|x| InnerRefMut { inner: unsafe { NonNull::new_unchecked(x.get()) }, _marker: PhantomData, @@ -145,7 +145,7 @@ impl Somr { } #[allow(unused)] - pub unsafe fn get_mut_from_ref(&self) -> Option> { + pub unsafe fn get_mut_from_ref(&self) -> Option> { if !self.is_owned() || self.dangling() { return None; } diff --git a/libs/jwst-codec/src/doc/types/array.rs b/libs/jwst-codec/src/doc/types/array.rs index 176f70702..f5644fd62 100644 --- a/libs/jwst-codec/src/doc/types/array.rs +++ b/libs/jwst-codec/src/doc/types/array.rs @@ -47,7 +47,7 @@ impl Array { None } - pub fn iter(&self) -> ArrayIter { + pub fn iter(&self) -> ArrayIter<'_> { ArrayIter(self.iter_item()) } diff --git a/libs/jwst-codec/src/doc/types/list/mod.rs b/libs/jwst-codec/src/doc/types/list/mod.rs index 864fa3d04..96555bd45 100644 --- a/libs/jwst-codec/src/doc/types/list/mod.rs +++ b/libs/jwst-codec/src/doc/types/list/mod.rs @@ -60,7 +60,7 @@ pub(crate) trait ListType: AsInner { self.as_inner().ty().unwrap().len } - fn iter_item(&self) -> ListIterator { + fn iter_item(&self) -> ListIterator<'_> { let inner = self.as_inner().ty().unwrap(); ListIterator { cur: inner.start.clone(), diff --git a/libs/jwst-codec/src/doc/types/map.rs b/libs/jwst-codec/src/doc/types/map.rs index 765d0ed88..39e990c1b 100644 --- a/libs/jwst-codec/src/doc/types/map.rs +++ b/libs/jwst-codec/src/doc/types/map.rs @@ -67,7 +67,7 @@ pub(crate) trait MapType: AsInner { self._keys().count() as u64 } - fn _iter(&self) -> EntriesInnerIterator { + fn _iter(&self) -> EntriesInnerIterator<'_> { let ty = self.as_inner().ty(); if let Some(ty) = ty { @@ -85,15 +85,15 @@ pub(crate) trait MapType: AsInner { } } - fn _keys(&self) -> KeysIterator { + fn _keys(&self) -> KeysIterator<'_> { KeysIterator(self._iter()) } - fn _values(&self) -> ValuesIterator { + fn _values(&self) -> ValuesIterator<'_> { ValuesIterator(self._iter()) } - fn _entries(&self) -> EntriesIterator { + fn _entries(&self) -> EntriesIterator<'_> { EntriesIterator(self._iter()) } } @@ -185,22 +185,22 @@ impl Map { } #[inline(always)] - pub fn iter(&self) -> EntriesIterator { + pub fn iter(&self) -> EntriesIterator<'_> { self._entries() } #[inline(always)] - pub fn entries(&self) -> EntriesIterator { + pub fn entries(&self) -> EntriesIterator<'_> { self._entries() } #[inline(always)] - pub fn keys(&self) -> KeysIterator { + pub fn keys(&self) -> KeysIterator<'_> { self._keys() } #[inline(always)] - pub fn values(&self) -> ValuesIterator { + pub fn values(&self) -> ValuesIterator<'_> { self._values() } } diff --git a/libs/jwst-codec/src/doc/types/mod.rs b/libs/jwst-codec/src/doc/types/mod.rs index 4934690c3..19feb8c29 100644 --- a/libs/jwst-codec/src/doc/types/mod.rs +++ b/libs/jwst-codec/src/doc/types/mod.rs @@ -98,11 +98,11 @@ impl YTypeRef { } } - pub fn ty(&self) -> Option> { + pub fn ty(&self) -> Option> { self.inner.get().and_then(|ty| ty.read().ok()) } - pub fn ty_mut(&self) -> Option> { + pub fn ty_mut(&self) -> Option> { self.inner.get().and_then(|ty| ty.write().ok()) } @@ -128,11 +128,11 @@ impl YTypeRef { } #[allow(dead_code)] - pub fn read(&self) -> Option<(RwLockReadGuard, RwLockReadGuard)> { + pub fn read(&self) -> Option<(RwLockReadGuard<'_, DocStore>, RwLockReadGuard<'_, YType>)> { self.store().and_then(|store| self.ty().map(|ty| (store, ty))) } - pub fn write(&self) -> Option<(RwLockWriteGuard, RwLockWriteGuard)> { + pub fn write(&self) -> Option<(RwLockWriteGuard<'_, DocStore>, RwLockWriteGuard<'_, YType>)> { self.store_mut().and_then(|store| self.ty_mut().map(|ty| (store, ty))) } } diff --git a/libs/jwst-codec/src/protocol/scanner.rs b/libs/jwst-codec/src/protocol/scanner.rs index 1d9bda9c3..089318238 100644 --- a/libs/jwst-codec/src/protocol/scanner.rs +++ b/libs/jwst-codec/src/protocol/scanner.rs @@ -5,7 +5,7 @@ pub struct SyncMessageScanner<'a> { } impl SyncMessageScanner<'_> { - pub fn new(buffer: &[u8]) -> SyncMessageScanner { + pub fn new(buffer: &[u8]) -> SyncMessageScanner<'_> { SyncMessageScanner { buffer } } } diff --git a/libs/jwst-logger/src/formatter.rs b/libs/jwst-logger/src/formatter.rs index 09387f925..259973aa9 100644 --- a/libs/jwst-logger/src/formatter.rs +++ b/libs/jwst-logger/src/formatter.rs @@ -31,7 +31,7 @@ pub struct JWSTFormatter { } impl JWSTFormatter { - fn format_level(level: &Level) -> AnsiGenericString { + fn format_level(level: &Level) -> AnsiGenericString<'_, str> { match *level { Level::ERROR => Color::Red.paint("ERROR"), Level::WARN => Color::Yellow.paint(" WARN"), diff --git a/libs/jwst-storage/src/rate_limiter.rs b/libs/jwst-storage/src/rate_limiter.rs index 2ba158ff4..c8522345a 100644 --- a/libs/jwst-storage/src/rate_limiter.rs +++ b/libs/jwst-storage/src/rate_limiter.rs @@ -42,7 +42,7 @@ impl Bucket { } } - pub async fn read(&self) -> BucketLocker { + pub async fn read(&self) -> BucketLocker<'_> { self.bucket.until_ready().await; match &self.lock { BucketLock::RwLock(lock) => BucketLocker::ReadLock(lock.read().await), @@ -52,7 +52,7 @@ impl Bucket { } } - pub async fn write(&self) -> BucketLocker { + pub async fn write(&self) -> BucketLocker<'_> { self.bucket.until_ready().await; match &self.lock { BucketLock::RwLock(lock) => BucketLocker::WriteLock(lock.write().await),