From 39b1d06f513f06e26d71c7e4785990e44b93f5c5 Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Tue, 24 Mar 2026 16:21:04 -0700 Subject: [PATCH 1/2] homepage: add send-bulk-chat --- Cargo.lock | 119 ++++++++++++++- Cargo.toml | 2 +- hyperdrive/packages/homepage/Cargo.lock | 10 ++ hyperdrive/packages/homepage/Cargo.toml | 1 + hyperdrive/packages/homepage/pkg/scripts.json | 14 ++ .../homepage/send-bulk-chat/Cargo.toml | 17 +++ .../homepage/send-bulk-chat/src/lib.rs | 136 ++++++++++++++++++ 7 files changed, 295 insertions(+), 4 deletions(-) create mode 100644 hyperdrive/packages/homepage/pkg/scripts.json create mode 100644 hyperdrive/packages/homepage/send-bulk-chat/Cargo.toml create mode 100644 hyperdrive/packages/homepage/send-bulk-chat/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index a71b0995c..84c40ded3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -852,6 +852,15 @@ dependencies = [ "derive_arbitrary", ] +[[package]] +name = "arc-swap" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a07d1f37ff60921c83bdfc7407723bdefe89b44b98a9b772f225c8f9d67141a6" +dependencies = [ + "rustversion", +] + [[package]] name = "argon2" version = "0.5.3" @@ -1058,6 +1067,12 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" +[[package]] +name = "atomic_refcell" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41e67cd8309bbd06cd603a9e693a784ac2e5d1e955f11286e355089fcab3047c" + [[package]] name = "auditable-serde" version = "0.8.0" @@ -1559,6 +1574,26 @@ dependencies = [ "zip 1.1.4", ] +[[package]] +name = "chat" +version = "0.1.0" +dependencies = [ + "anyhow", + "base64 0.21.7", + "flate2", + "futures", + "hyperapp_macro 0.2.0", + "hyperware-crdt", + "hyperware-pubsub-core", + "hyperware_process_lib 3.0.0", + "process_macros", + "rand 0.8.5", + "rmp-serde", + "serde", + "serde_json", + "wit-bindgen 0.42.1", +] + [[package]] name = "chrono" version = "0.4.41" @@ -2720,6 +2755,9 @@ name = "fastrand" version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +dependencies = [ + "getrandom 0.2.16", +] [[package]] name = "fastrlp" @@ -3571,9 +3609,21 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "hyperapp_macro" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "061414cb1a535e9e2a9c98ac112c1ac770552ae53d6f6309133cdccf5c251659" +dependencies = [ + "hyperware_process_lib 3.0.0", + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "hyperdrive" -version = "1.9.3" +version = "1.9.4" dependencies = [ "aes-gcm", "alloy", @@ -3633,7 +3683,7 @@ dependencies = [ [[package]] name = "hyperdrive_lib" -version = "1.9.3" +version = "1.9.4" dependencies = [ "lib", ] @@ -3670,6 +3720,20 @@ dependencies = [ "url", ] +[[package]] +name = "hyperware-crdt" +version = "0.1.0" +source = "git+https://github.com/hyperware-ai/hyperware-crdt?rev=a0aefa9#a0aefa95d7c55b7f848622119b269131e16efb71" +dependencies = [ + "base64 0.22.1", + "hyperware_process_lib 3.0.0", + "rmp-serde", + "serde", + "serde_json", + "thiserror 1.0.69", + "yrs", +] + [[package]] name = "hyperware-parse-wit" version = "0.1.0" @@ -3680,6 +3744,20 @@ dependencies = [ "zip 2.4.2", ] +[[package]] +name = "hyperware-pubsub-core" +version = "0.1.0" +source = "git+https://github.com/hyperware-ai/hyperware-pubsub?rev=abd30a6#abd30a63db2672ce8acb40095a769c2c1cd8d1f5" +dependencies = [ + "async-trait", + "bytes", + "futures-core", + "serde", + "thiserror 1.0.69", + "tracing", + "uuid 1.17.0", +] + [[package]] name = "hyperware_process_lib" version = "2.2.0" @@ -4401,7 +4479,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lib" -version = "1.9.3" +version = "1.9.4" dependencies = [ "alloy", "anyhow", @@ -6347,6 +6425,16 @@ dependencies = [ "pest", ] +[[package]] +name = "send-bulk-chat" +version = "0.1.0" +dependencies = [ + "hyperware_process_lib 3.0.0", + "serde", + "serde_json", + "wit-bindgen 0.41.0", +] + [[package]] name = "send_wrapper" version = "0.6.0" @@ -6574,6 +6662,15 @@ dependencies = [ "futures-io", ] +[[package]] +name = "smallstr" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "862077b1e764f04c251fe82a2ef562fd78d7cadaeb072ca7c2bcaf7217b1ff3b" +dependencies = [ + "smallvec", +] + [[package]] name = "smallvec" version = "1.15.1" @@ -9113,6 +9210,22 @@ dependencies = [ "synstructure 0.13.2", ] +[[package]] +name = "yrs" +version = "0.19.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8ca5126331b9a5ef5bb10f3f1c3d01b05f298d348c66f8fb15497d83ee73176" +dependencies = [ + "arc-swap", + "atomic_refcell", + "fastrand 2.3.0", + "serde", + "serde_json", + "smallstr", + "smallvec", + "thiserror 1.0.69", +] + [[package]] name = "zerocopy" version = "0.8.26" diff --git a/Cargo.toml b/Cargo.toml index 3aa69fae9..30a035d08 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ members = [ "hyperdrive/packages/app-store/download", "hyperdrive/packages/app-store/install", "hyperdrive/packages/app-store/uninstall", "hyperdrive/packages/app-store/reset-store", "hyperdrive/packages/contacts/contacts", "hyperdrive/packages/file-explorer/explorer", - "hyperdrive/packages/homepage/homepage", + "hyperdrive/packages/homepage/homepage", "hyperdrive/packages/homepage/chat", "hyperdrive/packages/homepage/send-bulk-chat", "hyperdrive/packages/hns-indexer/hns-indexer", "hyperdrive/packages/hns-indexer/get-block", "hyperdrive/packages/settings/settings", "hyperdrive/packages/hns-indexer/reset", "hyperdrive/packages/hns-indexer/node-info", "hyperdrive/packages/hns-indexer/state", "hyperdrive/packages/hypermap-cacher/binding-cacher", "hyperdrive/packages/hypermap-cacher/hypermap-cacher", "hyperdrive/packages/hypermap-cacher/reset-cache", diff --git a/hyperdrive/packages/homepage/Cargo.lock b/hyperdrive/packages/homepage/Cargo.lock index 69d09a426..77db2be69 100644 --- a/hyperdrive/packages/homepage/Cargo.lock +++ b/hyperdrive/packages/homepage/Cargo.lock @@ -2882,6 +2882,16 @@ dependencies = [ "pest", ] +[[package]] +name = "send-bulk-chat" +version = "0.1.0" +dependencies = [ + "hyperware_process_lib", + "serde", + "serde_json", + "wit-bindgen 0.41.0", +] + [[package]] name = "serde" version = "1.0.228" diff --git a/hyperdrive/packages/homepage/Cargo.toml b/hyperdrive/packages/homepage/Cargo.toml index bec2fb08b..8220c3ae7 100644 --- a/hyperdrive/packages/homepage/Cargo.toml +++ b/hyperdrive/packages/homepage/Cargo.toml @@ -7,5 +7,6 @@ panic = "abort" members = [ "homepage", "chat", + "send-bulk-chat", ] resolver = "2" diff --git a/hyperdrive/packages/homepage/pkg/scripts.json b/hyperdrive/packages/homepage/pkg/scripts.json new file mode 100644 index 000000000..7924ddca5 --- /dev/null +++ b/hyperdrive/packages/homepage/pkg/scripts.json @@ -0,0 +1,14 @@ +{ + "send-bulk-chat.wasm": { + "root": false, + "public": false, + "request_networking": false, + "request_capabilities": [ + "chat:homepage:sys" + ], + "grant_capabilities": [ + "chat:homepage:sys" + ], + "wit_version": 1 + } +} diff --git a/hyperdrive/packages/homepage/send-bulk-chat/Cargo.toml b/hyperdrive/packages/homepage/send-bulk-chat/Cargo.toml new file mode 100644 index 000000000..c06c6cc5d --- /dev/null +++ b/hyperdrive/packages/homepage/send-bulk-chat/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "send-bulk-chat" +version = "0.1.0" +edition = "2021" + +[lib] +crate-type = ["cdylib"] + +[dependencies] +hyperware_process_lib = { version = "3.0.0", features = ["hyperapp"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +wit-bindgen = "0.41.0" + +[profile.release] +opt-level = "s" +lto = true diff --git a/hyperdrive/packages/homepage/send-bulk-chat/src/lib.rs b/hyperdrive/packages/homepage/send-bulk-chat/src/lib.rs new file mode 100644 index 000000000..622e1d140 --- /dev/null +++ b/hyperdrive/packages/homepage/send-bulk-chat/src/lib.rs @@ -0,0 +1,136 @@ +use hyperware_process_lib::{println, script, Address, Request}; +use serde_json::Value; +use std::collections::HashMap; + +wit_bindgen::generate!({ + path: "../target/wit", + world: "process-v1", +}); + +const USAGE: &str = r#"\x1b[1mUsage:\x1b[0m send-bulk-chat '{"node1": "message1", "node2": "message2", ...}' + +Send messages to multiple nodes at once. Creates chats if they don't exist. + +Example: + send-bulk-chat '{"alice:hyper": "Hello Alice!", "bob:hyper": "Hey Bob!"}' +"#; + +const CHAT_PROCESS_ID: (&str, &str, &str) = ("chat", "homepage", "sys"); + +script!(init); +fn init(our: Address, args: String) -> String { + if args.is_empty() { + return USAGE.to_string(); + } + + // Parse the JSON argument + println!("{args}"); + let args_slice = if args.starts_with('\'') && args.ends_with('\'') && args.len() >= 2 { + &args[1..args.len() - 1] + } else { + &args + }; + let messages: HashMap = match serde_json::from_str(args_slice) { + Ok(j) => j, + Err(e) => return format!("Error parsing JSON: {}\n\n{}", e, USAGE), + }; + + if messages.is_empty() { + return "Error: No messages to send".to_string(); + } + + let mut results = Vec::new(); + let mut success_count = 0; + let mut error_count = 0; + + // Process each node-message pair + for (node, message) in messages { + // Create typed request for creating/getting chat + let create_chat_request = serde_json::json!({ + "CreateChat": { + "counterparty": node.clone() + } + }); + + // Send create chat request to our own node's chat:chat:hyper + let chat_address = Address::new(our.node(), CHAT_PROCESS_ID); + + match Request::to(&chat_address) + .body(serde_json::to_vec(&create_chat_request).unwrap_or_default()) + .send_and_await_response(5) + { + Ok(Ok(response_msg)) => { + // Parse the response to get the actual chat object with normalized ID + let response: Value = match serde_json::from_slice(response_msg.body()) { + Ok(v) => v, + Err(e) => { + results.push(format!("✗ {}: Failed to parse chat response - {}", node, e)); + error_count += 1; + continue; + } + }; + + // Extract the chat ID from the Ok response + let chat_id = match response + .get("Ok") + .and_then(|ok| ok.get("id")) + .and_then(|id| id.as_str()) + { + Some(id) => id.to_string(), + None => { + results.push(format!("✗ {}: Invalid chat response format", node)); + error_count += 1; + continue; + } + }; + + println!("Created/got chat with ID: {} for node: {}", chat_id, node); + + // Now send the message with typed request using the actual chat ID + let send_msg_request = serde_json::json!({ + "SendMessage": { + "chat_id": chat_id, + "content": message.clone(), + "reply_to": null, + "file_info": null + } + }); + + match Request::to(&chat_address) + .body(serde_json::to_vec(&send_msg_request).unwrap_or_default()) + .send_and_await_response(5) + { + Ok(Ok(_)) => { + results.push(format!("✓ {}: Message sent", node)); + success_count += 1; + } + Ok(Err(e)) => { + results.push(format!("✗ {}: Failed to send message - {:?}", node, e)); + error_count += 1; + } + Err(e) => { + results.push(format!("✗ {}: Failed to send message - {:?}", node, e)); + error_count += 1; + } + } + } + Ok(Err(e)) => { + results.push(format!("✗ {}: Failed to create/get chat - {:?}", node, e)); + error_count += 1; + } + Err(e) => { + results.push(format!("✗ {}: Failed to create/get chat - {:?}", node, e)); + error_count += 1; + } + } + } + + // Format output + let mut output = results.join("\n"); + output.push_str(&format!( + "\n\n\x1b[1mSummary:\x1b[0m {} sent, {} failed", + success_count, error_count + )); + + output +} From 9ea39a8627c52bf16b6b238dd018cadc5bc378be Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 2 Apr 2026 04:04:09 +0000 Subject: [PATCH 2/2] Format Rust code using rustfmt --- .../packages/homepage/chat/src/groups.rs | 19 +- hyperdrive/packages/homepage/chat/src/lib.rs | 361 ++++++++++++------ .../packages/homepage/chat/src/replication.rs | 29 +- .../packages/homepage/chat/src/search.rs | 9 +- .../packages/homepage/chat/src/types/api.rs | 3 +- .../packages/homepage/chat/src/types/state.rs | 124 +++--- hyperdrive/packages/homepage/chat/src/ws.rs | 55 ++- 7 files changed, 382 insertions(+), 218 deletions(-) diff --git a/hyperdrive/packages/homepage/chat/src/groups.rs b/hyperdrive/packages/homepage/chat/src/groups.rs index 753c2fd44..1b5ae9e2c 100644 --- a/hyperdrive/packages/homepage/chat/src/groups.rs +++ b/hyperdrive/packages/homepage/chat/src/groups.rs @@ -552,17 +552,14 @@ impl ChatState { membership_proposal_key(group_id, &candidate, MembershipActionKind::Invite); group.membership_proposals.remove(&invite_proposal_id); - let entry = group - .members - .entry(candidate.clone()) - .or_insert_with(|| { - GroupMember::new( - candidate.clone(), - default_role_id.clone(), - MembershipStatus::Active, - now, - ) - }); + let entry = group.members.entry(candidate.clone()).or_insert_with(|| { + GroupMember::new( + candidate.clone(), + default_role_id.clone(), + MembershipStatus::Active, + now, + ) + }); entry.role_id = default_role_id; entry.status = MembershipStatus::Active; entry.last_activity = now; diff --git a/hyperdrive/packages/homepage/chat/src/lib.rs b/hyperdrive/packages/homepage/chat/src/lib.rs index 059085798..08a2fd87f 100644 --- a/hyperdrive/packages/homepage/chat/src/lib.rs +++ b/hyperdrive/packages/homepage/chat/src/lib.rs @@ -2,17 +2,19 @@ // A mobile-first chat application for the Hyperware platform // Supporting 1:1 DMs, Group chats (TODO), and Voice calls (TODO) +use base64::{engine::general_purpose, Engine as _}; use flate2::read::GzDecoder; use flate2::write::GzEncoder; use flate2::Compression; use futures::{channel::mpsc::UnboundedReceiver, pin_mut, select, FutureExt, StreamExt}; use hyperapp_macro::*; use hyperware_crdt::yrs::{Decode, Encode, StateVector}; -use base64::{engine::general_purpose, Engine as _}; use hyperware_process_lib::{ homepage::add_to_homepage, http::server::WsMessageType, - hyperapp::{self, get_path, send, set_response_status, sleep, source, spawn, AppSendError, SaveOptions}, + hyperapp::{ + self, get_path, send, set_response_status, sleep, source, spawn, AppSendError, SaveOptions, + }, our, vfs, Address, Capability, LazyLoadBlob, ProcessId, Request, Request as ProcessRequest, }; use std::cmp::Ordering; @@ -24,7 +26,6 @@ use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; // Import generated RPC functions from caller-utils -use homepage_caller_utils as chat_caller_utils; use chat_caller_utils::chat::{ receive_chat_creation_remote_rpc, receive_message_ack_remote_rpc, receive_message_deletion_remote_rpc, receive_message_edit_remote_rpc, @@ -33,6 +34,7 @@ use chat_caller_utils::chat::{ }; use chat_caller_utils::ChatMessage as CUChatMessage; use chat_caller_utils::UserProfile as CUUserProfile; +use homepage_caller_utils as chat_caller_utils; mod crdt; mod groups; @@ -64,7 +66,6 @@ const CONTACTS_FIELD_BASE_ADDRESS: &str = "base_address"; const REPL_RPC_TIMEOUT_SECS: u64 = 2; const ICON: &str = include_str!("./icon"); - // Helper function to enforce one-way status transitions fn safe_update_message_status(current: &MessageStatus, new: MessageStatus) -> MessageStatus { use MessageStatus::*; @@ -99,7 +100,8 @@ fn safe_update_message_status(current: &MessageStatus, new: MessageStatus) -> Me _ => { log_debug!( "WARNING: Attempted invalid status transition from {:?} to {:?}", - current, new + current, + new ); current.clone() } @@ -206,11 +208,16 @@ pub(crate) fn log_crdt_event( match update_len { Some(len) => log_debug!( "[CRDT][{}] context={} state_vector_len={} update_bytes={}", - doc_id, context, sv_len, len + doc_id, + context, + sv_len, + len ), None => log_debug!( "[CRDT][{}] context={} state_vector_len={}", - doc_id, context, sv_len + doc_id, + context, + sv_len ), } } @@ -302,7 +309,8 @@ async fn send_push_notification_for_message(sender: &str, content: &str, chat_id .expects_response(5); log_debug!( "[NOTIFY] removing invalid endpoint {} via {}", - endpoint, notifications_address + endpoint, + notifications_address ); // Fire and forget the removal request @@ -353,7 +361,9 @@ async fn send_push_notification_for_group_message( group_name: &str, ) { if cfg!(feature = "disable-notifications") { - log_debug!("[NOTIFY] skipping group push notification (disable-notifications feature enabled)"); + log_debug!( + "[NOTIFY] skipping group push notification (disable-notifications feature enabled)" + ); return; } let notify_started = Instant::now(); @@ -799,10 +809,7 @@ impl ChatState { #[local] #[http] - async fn update_chat_settings( - &mut self, - req: UpdateChatSettingsReq, - ) -> Result { + async fn update_chat_settings(&mut self, req: UpdateChatSettingsReq) -> Result { let chat = self .chats .get_mut(&req.chat_id) @@ -853,7 +860,11 @@ impl ChatState { member_count: group.members.len(), thread_count: group.threads.len(), unread_count: self.group_unread.get(&req.group_id).copied().unwrap_or(0), - notify: self.group_notify.get(&req.group_id).copied().unwrap_or(true), + notify: self + .group_notify + .get(&req.group_id) + .copied() + .unwrap_or(true), }) } @@ -1119,7 +1130,10 @@ impl ChatState { sender: our().node.clone(), content: format!( "{} sent {} {} to {}", - our().node, amount, coin_name, to_address + our().node, + amount, + coin_name, + to_address ), timestamp, sequence: None, @@ -1138,7 +1152,8 @@ impl ChatState { }), }; - let (counterparty, stored_message) = self.stage_outgoing_message(&req.chat_id, message, None); + let (counterparty, stored_message) = + self.stage_outgoing_message(&req.chat_id, message, None); self.dispatch_outgoing_message(counterparty, stored_message.clone()); Ok(self @@ -1198,7 +1213,8 @@ impl ChatState { Ok(Ok(())) => {} Ok(Err(err)) => log_debug!( "Counterparty {} rejected message edit: {}", - counterparty, err + counterparty, + err ), Err(err) => { log_debug!("Failed to send message edit to {}: {:?}", counterparty, err) @@ -1456,7 +1472,9 @@ impl ChatState { let target = Address::new(&target_node, OUR_PROCESS_ID.clone()); match receive_reaction_remove_remote_rpc(&target, msg_id, emoji, user).await { Ok(_) => log_debug!("Successfully sent reaction removal to counterparty"), - Err(e) => log_debug!("Failed to send reaction removal to counterparty: {:?}", e), + Err(e) => { + log_debug!("Failed to send reaction removal to counterparty: {:?}", e) + } } }); @@ -1539,26 +1557,19 @@ impl ChatState { async fn update_profile(&mut self, mut profile: UserProfile) -> Result { let nickname = profile.name.trim(); if nickname.is_empty() { - profile.name = our() - .node - .split('.') - .next() - .unwrap_or("User") - .to_string(); + profile.name = our().node.split('.').next().unwrap_or("User").to_string(); } else { profile.name = nickname.to_string(); } - profile.base_address = profile - .base_address - .and_then(|addr| { - let trimmed = addr.trim().to_string(); - if trimmed.is_empty() { - None - } else { - Some(trimmed) - } - }); + profile.base_address = profile.base_address.and_then(|addr| { + let trimmed = addr.trim().to_string(); + if trimmed.is_empty() { + None + } else { + Some(trimmed) + } + }); if let Some(address) = profile.base_address.as_ref() { if !Self::is_valid_evm_address(address) { @@ -1644,7 +1655,8 @@ impl ChatState { Ok(_) => log_debug!("Notified {} about profile pic update", counterparty), Err(e) => log_debug!( "Failed to notify {} about profile pic update: {:?}", - counterparty, e + counterparty, + e ), } }); @@ -1967,7 +1979,9 @@ impl ChatState { // #[remote] #[http(method = "POST", path = "/api/download-group-file")] async fn download_group_file(&mut self, req: DownloadGroupFileReq) -> Result, String> { - if req.group_id.contains('/') || req.group_id.contains("..") || req.attachment_id.contains('/') + if req.group_id.contains('/') + || req.group_id.contains("..") + || req.attachment_id.contains('/') { set_response_status(hyperware_process_lib::http::StatusCode::BAD_REQUEST); return Err("Invalid file path".to_string()); @@ -2047,7 +2061,10 @@ impl ChatState { self.require_subscriber_access(&req.group_id, &caller) .map_err(|err| format!("unauthorized: {}", err))?; - if req.group_id.contains('/') || req.group_id.contains("..") || req.attachment_id.contains('/') { + if req.group_id.contains('/') + || req.group_id.contains("..") + || req.attachment_id.contains('/') + { return Err("Invalid file path".to_string()); } @@ -2066,11 +2083,10 @@ impl ChatState { // #[remote] #[http] async fn send_voice_note(&mut self, req: SendVoiceNoteReq) -> Result { - let audio_bytes = base64_decode(&req.audio_data) - .map_err(|e| { - set_response_status(hyperware_process_lib::http::StatusCode::BAD_REQUEST); - format!("Failed to decode base64: {}", e) - })?; + let audio_bytes = base64_decode(&req.audio_data).map_err(|e| { + set_response_status(hyperware_process_lib::http::StatusCode::BAD_REQUEST); + format!("Failed to decode base64: {}", e) + })?; let audio_size_mb = (audio_bytes.len() as u64) / (1024 * 1024); if audio_size_mb > self.settings.max_file_size_mb { set_response_status(hyperware_process_lib::http::StatusCode::PAYLOAD_TOO_LARGE); @@ -2128,7 +2144,8 @@ impl ChatState { if counterparty != caller_node { log_debug!( "[SEC] receive_chat_creation rejected spoofed counterparty={} source={}", - counterparty, caller_node + counterparty, + caller_node ); return Err("receive_chat_creation rejected spoofed counterparty".to_string()); } @@ -2212,7 +2229,8 @@ impl ChatState { if message.sender != caller_node { log_debug!( "[SEC] receive_message rejected spoofed sender={} source={}", - message.sender, caller_node + message.sender, + caller_node ); return Err("receive_message rejected spoofed sender".to_string()); } @@ -2303,7 +2321,8 @@ impl ChatState { let _ = file.write(&file_data); log_debug!( "Saved received file {} to VFS at {}", - file_info.filename, vfs_path + file_info.filename, + vfs_path ); // For images, keep the data URL for inline display @@ -2447,7 +2466,8 @@ impl ChatState { if user != caller_node { log_debug!( "[SEC] receive_reaction rejected spoofed user={} source={}", - user, caller_node + user, + caller_node ); return Err("receive_reaction rejected spoofed user".to_string()); } @@ -2455,7 +2475,9 @@ impl ChatState { } log_debug!( "Received reaction {} from {} for message {}", - emoji, user, message_id + emoji, + user, + message_id ); let timestamp = std::time::SystemTime::now() @@ -2525,7 +2547,8 @@ impl ChatState { if user != caller_node { log_debug!( "[SEC] receive_reaction_remove rejected spoofed user={} source={}", - user, caller_node + user, + caller_node ); return Err("receive_reaction_remove rejected spoofed user".to_string()); } @@ -2533,7 +2556,9 @@ impl ChatState { } log_debug!( "Received reaction removal {} from {} for message {}", - emoji, user, message_id + emoji, + user, + message_id ); let mut update: Option = None; @@ -2588,7 +2613,9 @@ impl ChatState { if chat_id != expected_chat_id { log_debug!( "[SEC] receive_message_edit rejected spoofed chat_id={} expected={} source={}", - chat_id, expected_chat_id, caller_node + chat_id, + expected_chat_id, + caller_node ); return Err("receive_message_edit rejected spoofed chat_id".to_string()); } @@ -2601,7 +2628,8 @@ impl ChatState { if !is_local_call && message.sender != caller_node { log_debug!( "[SEC] receive_message_edit rejected edit from {} for message sent by {}", - caller_node, message.sender + caller_node, + message.sender ); return Err("receive_message_edit rejected unauthorized edit".to_string()); } @@ -2623,7 +2651,8 @@ impl ChatState { } else { log_debug!( "receive_message_edit: message {} in chat {} not found; dropping edit", - message_id, chat_id + message_id, + chat_id ); } @@ -2699,7 +2728,8 @@ impl ChatState { } log_debug!( "Received deletion request for message {} in chat {}", - message_id, chat_id + message_id, + chat_id ); let mut chat_update: Option = None; @@ -2715,7 +2745,9 @@ impl ChatState { return Err("receive_message_deletion rejected unauthorized delete".to_string()); } if chat.messages[pos].message_type == MessageType::Payment { - return Err("receive_message_deletion rejected payment message delete".to_string()); + return Err( + "receive_message_deletion rejected payment message delete".to_string() + ); } chat.messages.remove(pos); needs_rebuild = true; @@ -2747,7 +2779,8 @@ impl ChatState { if node != caller_node { log_debug!( "[SEC] receive_profile_update rejected spoofed node={} source={}", - node, caller_node + node, + caller_node ); return Err("receive_profile_update rejected spoofed node".to_string()); } @@ -2877,9 +2910,7 @@ impl ChatState { #[http] async fn search_index(&self, req: SearchIndexReq) -> Result { - let results = self - .search_index - .search(&req.query, req.scope, req.limit); + let results = self.search_index.search(&req.query, req.scope, req.limit); Ok(SearchIndexRes { results }) } @@ -3128,7 +3159,8 @@ impl ChatState { async fn push_snapshot_to_peer(&mut self, req: PushSnapshotToPeerReq) -> Result<(), String> { log_debug!( "[REPL][{}] push_snapshot_to_peer invoked peer={}", - req.group_id, req.peer + req.group_id, + req.peer ); let task = ReplicationTask { group_id: req.group_id, @@ -3284,32 +3316,47 @@ impl ChatState { // SPIDER INTEGRATION #[http] - async fn spider_connect(&mut self, force_new: Option) -> Result { + async fn spider_connect( + &mut self, + force_new: Option, + ) -> Result { const SPIDER_PROCESS_ID: (&str, &str, &str) = ("spider", "spider", "sys"); let should_force = force_new.unwrap_or(false); - log_debug!("[SPIDER] spider_connect called, force_new={:?}, should_force={}", force_new, should_force); - log_debug!("[SPIDER] cached key exists: {}", self.spider_api_key.is_some()); + log_debug!( + "[SPIDER] spider_connect called, force_new={:?}, should_force={}", + force_new, + should_force + ); + log_debug!( + "[SPIDER] cached key exists: {}", + self.spider_api_key.is_some() + ); if !should_force { if let Some(existing) = self.spider_api_key.clone() { - log_debug!("[SPIDER] Validating cached key: {}...", &existing[..8.min(existing.len())]); + log_debug!( + "[SPIDER] Validating cached key: {}...", + &existing[..8.min(existing.len())] + ); // Validate the cached key before returning it if self.validate_spider_key(&existing).await { log_debug!("[SPIDER] Cached key is valid, returning it"); - return Ok(SpiderConnectResult { - api_key: existing, - }); + return Ok(SpiderConnectResult { api_key: existing }); } log_debug!("[SPIDER] cached spider API key is invalid, creating new one"); } } // Always use a unique name to ensure Spider creates a fresh key - let key_name = format!("homepage-{}-{}", our().node.clone(), std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_millis()); + let key_name = format!( + "homepage-{}-{}", + our().node.clone(), + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() + ); log_debug!("[SPIDER] Creating new key with name: {}", key_name); @@ -3328,16 +3375,18 @@ impl ChatState { ) .expects_response(5); - let parsed: Result = hyperapp::send(request) - .await - .map_err(|err| { + let parsed: Result = + hyperapp::send(request).await.map_err(|err| { log_debug!("[SPIDER] Failed to contact spider: {}", err); format!("failed to contact spider: {err}") })?; match parsed { Ok(key) => { - log_debug!("[SPIDER] Successfully created key: {}...", &key.key[..8.min(key.key.len())]); + log_debug!( + "[SPIDER] Successfully created key: {}...", + &key.key[..8.min(key.key.len())] + ); self.spider_api_key = Some(key.key.clone()); Ok(SpiderConnectResult { api_key: key.key }) } @@ -3362,15 +3411,23 @@ impl ChatState { .expects_response(2); let ping_result = hyperapp::send::(request).await; let available = ping_result.is_ok(); - log_debug!("[SPIDER] Ping result: {:?}, available: {}", ping_result, available); + log_debug!( + "[SPIDER] Ping result: {:?}, available: {}", + ping_result, + available + ); let status = SpiderStatusInfo { connected: self.spider_api_key.is_some() && available, has_api_key: self.spider_api_key.is_some(), spider_available: available, }; - log_debug!("[SPIDER] Returning status: connected={}, has_api_key={}, spider_available={}", - status.connected, status.has_api_key, status.spider_available); + log_debug!( + "[SPIDER] Returning status: connected={}, has_api_key={}, spider_available={}", + status.connected, + status.has_api_key, + status.spider_available + ); Ok(status) } @@ -3416,7 +3473,8 @@ impl ChatState { Ok(msg) => { log_debug!( "WebSocket: Received message from channel {}: {:?}", - channel_id, msg + channel_id, + msg ); // Initialize connection if not already present if !self.ws_connections.contains_key(&channel_id) @@ -3644,14 +3702,16 @@ impl ChatState { Ok(_) => { log_debug!( "Message {} sent successfully to {}", - message_id_clone, counterparty + message_id_clone, + counterparty ); // Counterparty will send ACK on success. } Err(_) => { log_debug!( "Failed to send message {} to {}, adding to delivery queue", - message_id_clone, counterparty + message_id_clone, + counterparty ); ChatState::enqueue_delivery_message_inner( &delivery_tx, @@ -3776,7 +3836,9 @@ impl ChatState { if let Err(err) = self.require_hub_access(&task.group_id, &our().node) { log_debug!( "[REPL][{}] skip push to {} (local hub publish denied): {}", - task.group_id, task.peer, err + task.group_id, + task.peer, + err ); self.replication_metrics.acl_skips = self.replication_metrics.acl_skips.saturating_add(1); @@ -3787,7 +3849,9 @@ impl ChatState { { log_debug!( "[REPL][{}] skip push to subscriber {} (local publish denied): {}", - task.group_id, task.peer, err + task.group_id, + task.peer, + err ); self.replication_metrics.acl_skips = self.replication_metrics.acl_skips.saturating_add(1); @@ -3807,7 +3871,9 @@ impl ChatState { Err(err) => { log_debug!( "[REPL][{}] cannot load doc for {}: {:?}", - task.group_id, task.peer, err + task.group_id, + task.peer, + err ); self.schedule_backoff(task, now); return; @@ -3859,7 +3925,10 @@ impl ChatState { .expects_response(REPL_RPC_TIMEOUT_SECS); log_debug!( "[REPL][{}] push kind={:?} peer={} target={:?}", - task.group_id, task.kind, task.peer, target + task.group_id, + task.kind, + task.peer, + target ); let rpc_started = Instant::now(); match send::(req).await { @@ -3907,7 +3976,9 @@ impl ChatState { } else { log_debug!( "[REPL][{}] failed to decode apply response from {}: {:?}", - task.group_id, task.peer, val + task.group_id, + task.peer, + val ); self.schedule_backoff(task, now); } @@ -3915,7 +3986,11 @@ impl ChatState { Err(AppSendError::SendError(send_err)) => { log_debug!( "[REPL][{}] push to {} send error: {:?} (kind={:?} target={:?})", - task.group_id, task.peer, send_err, task.kind, target + task.group_id, + task.peer, + send_err, + task.kind, + target ); log_debug!( "[REPL_DIAG][{}] push send_err after_ms={} kind={:?} peer={}", @@ -3929,7 +4004,11 @@ impl ChatState { Err(AppSendError::BuildError(build_err)) => { log_debug!( "[REPL][{}] push to {} build error: {:?} (kind={:?} target={:?})", - task.group_id, task.peer, build_err, task.kind, target + task.group_id, + task.peer, + build_err, + task.kind, + target ); log_debug!( "[REPL_DIAG][{}] push build_err after_ms={} kind={:?} peer={}", @@ -3956,7 +4035,9 @@ impl ChatState { ) { log_debug!( "[REPL][{}] failed to apply snapshot from {}: {}", - task.group_id, task.peer, err + task.group_id, + task.peer, + err ); self.schedule_backoff(task, now); } else if self.local_group_acl_ready(&task.group_id) { @@ -3987,7 +4068,9 @@ impl ChatState { ) { log_debug!( "[REPL][{}] failed to apply delta from {}: {}", - task.group_id, task.peer, err + task.group_id, + task.peer, + err ); self.schedule_backoff(task, now); } @@ -4005,7 +4088,11 @@ impl ChatState { self.replication_metrics.retries = self.replication_metrics.retries.saturating_add(1); log_debug!( "[REPL][{}] backoff {:?} to {} (attempt {} delay={}s)", - task.group_id, task.kind, task.peer, task.attempt, delay + task.group_id, + task.kind, + task.peer, + task.attempt, + delay ); self.enqueue_replication_task(task); } @@ -4075,7 +4162,9 @@ impl ChatState { .expects_response(REPL_RPC_TIMEOUT_SECS); log_debug!( "[REPL][{}] fetch_update_from_peer peer={} target={}", - group_id, peer, target + group_id, + peer, + target ); let req_started = Instant::now(); @@ -4097,7 +4186,9 @@ impl ChatState { } else { log_debug!( "[REPL][{}] failed to decode delta from {} body={:?}", - group_id, peer, val + group_id, + peer, + val ); None } @@ -4105,7 +4196,9 @@ impl ChatState { Err(AppSendError::SendError(err)) => { log_debug!( "[REPL][{}] failed to fetch delta from {} send_err={:?}", - group_id, peer, err + group_id, + peer, + err ); log_debug!( "[REPL_DIAG][{}] fetch_update_from_peer err peer={} elapsed_ms={}", @@ -4118,7 +4211,9 @@ impl ChatState { Err(AppSendError::BuildError(build_err)) => { log_debug!( "[REPL][{}] failed to build delta request to {}: {:?}", - group_id, peer, build_err + group_id, + peer, + build_err ); None } @@ -4142,7 +4237,9 @@ impl ChatState { .expects_response(REPL_RPC_TIMEOUT_SECS); log_debug!( "[REPL][{}] fetch_snapshot_from_peer peer={} target={}", - group_id, peer, target + group_id, + peer, + target ); let req_started = Instant::now(); @@ -4164,7 +4261,9 @@ impl ChatState { } else { log_debug!( "[REPL][{}] failed to decode snapshot from {} body={:?}", - group_id, peer, val + group_id, + peer, + val ); None } @@ -4172,7 +4271,9 @@ impl ChatState { Err(AppSendError::SendError(err)) => { log_debug!( "[REPL][{}] failed to fetch snapshot from {} send_err={:?}", - group_id, peer, err + group_id, + peer, + err ); log_debug!( "[REPL_DIAG][{}] fetch_snapshot_from_peer err peer={} elapsed_ms={}", @@ -4185,7 +4286,9 @@ impl ChatState { Err(AppSendError::BuildError(build_err)) => { log_debug!( "[REPL][{}] failed to build snapshot request to {}: {:?}", - group_id, peer, build_err + group_id, + peer, + build_err ); None } @@ -4222,7 +4325,9 @@ impl ChatState { if in_acl != local_version { log_debug!( "[CRDT][{}] ACL version drift: incoming={} local={}", - group_id, in_acl, local_version + group_id, + in_acl, + local_version ); } } @@ -4233,7 +4338,8 @@ impl ChatState { if update_bytes.is_empty() { log_debug!( "[CRDT][{}] context={} received EMPTY update payload", - group_id, context + group_id, + context ); } let was_missing = !self.groups.contains_key(group_id); @@ -4248,7 +4354,8 @@ impl ChatState { if update_bytes.is_empty() && (was_missing || self.group_needs_bootstrap(group_id)) { log_debug!( "[CRDT][{}] context={} skipping empty update during bootstrap", - group_id, context + group_id, + context ); return Ok(()); } @@ -4263,7 +4370,9 @@ impl ChatState { // Allow membership bootstrap/update to proceed even if we're not yet whitelisted. log_debug!( "[CRDT][{}] bypassing ACL for local_status={:?} context={}", - group_id, local_status, context + group_id, + local_status, + context ); enforce_acl = false; } @@ -4403,10 +4512,7 @@ impl ChatState { global_notify_enabled, active_connection_count ); - if global_notify_enabled - && group_notify_enabled - && active_connection_count == 0 - { + if global_notify_enabled && group_notify_enabled && active_connection_count == 0 { // Only notify for the most recent message to avoid spam if let Some(latest) = new_messages.iter().max_by_key(|m| m.timestamp) { let sender = latest.sender.clone(); @@ -4414,8 +4520,10 @@ impl ChatState { let gid = group_id.clone(); let gname = group_name.clone(); spawn(async move { - send_push_notification_for_group_message(&sender, &content, &gid, &gname) - .await; + send_push_notification_for_group_message( + &sender, &content, &gid, &gname, + ) + .await; }); } } else { @@ -4509,8 +4617,10 @@ impl ChatState { let inferred_counterparty = Self::infer_counterparty_from_chat_id(chat_id, &our().node); - let has_messages_from_counterparty = - chat.messages.iter().any(|message| message.sender == counterparty_node); + let has_messages_from_counterparty = chat + .messages + .iter() + .any(|message| message.sender == counterparty_node); if chat_id == &canonical_chat_id || chat.counterparty == counterparty_node @@ -4607,7 +4717,11 @@ impl ChatState { merged_chat.messages.sort_by(|a, b| { a.timestamp .cmp(&b.timestamp) - .then_with(|| a.sequence.unwrap_or(u64::MAX).cmp(&b.sequence.unwrap_or(u64::MAX))) + .then_with(|| { + a.sequence + .unwrap_or(u64::MAX) + .cmp(&b.sequence.unwrap_or(u64::MAX)) + }) .then_with(|| a.id.cmp(&b.id)) }); @@ -4922,7 +5036,8 @@ impl ChatState { { log_debug!( "Failed to enqueue message during flush for {}: {:?}", - queued.node, err + queued.node, + err ); break; } @@ -4950,7 +5065,9 @@ impl ChatState { Err(err) => { log_debug!( "Failed to deliver message {} to {}: {:?}", - message.id, node, err + message.id, + node, + err ); let retry_tx = delivery_tx.clone(); let retry_pending = pending_deliveries.clone(); @@ -4966,7 +5083,9 @@ impl ChatState { )) { log_debug!( "Failed to requeue message {} for {}: {:?}", - message.id, retry_node, send_err + message.id, + retry_node, + send_err ); } } @@ -4991,7 +5110,10 @@ impl ChatState { async fn validate_spider_key(&self, api_key: &str) -> bool { const SPIDER_PROCESS_ID: (&str, &str, &str) = ("spider", "spider", "sys"); - log_debug!("[SPIDER] validate_spider_key called for key: {}...", &api_key[..8.min(api_key.len())]); + log_debug!( + "[SPIDER] validate_spider_key called for key: {}...", + &api_key[..8.min(api_key.len())] + ); let body = serde_json::json!({ "ListMcpServers": { @@ -5017,8 +5139,13 @@ impl ChatState { // Check if response is an error if let Some(err) = json_body.get("Err") { let err_str = err.as_str().unwrap_or(""); - let is_valid = !err_str.contains("Unauthorized") && !err_str.contains("Invalid API key"); - log_debug!("[SPIDER] Validation response has Err: {}, is_valid: {}", err_str, is_valid); + let is_valid = + !err_str.contains("Unauthorized") && !err_str.contains("Invalid API key"); + log_debug!( + "[SPIDER] Validation response has Err: {}, is_valid: {}", + err_str, + is_valid + ); // If unauthorized or invalid key, return false is_valid } else { diff --git a/hyperdrive/packages/homepage/chat/src/replication.rs b/hyperdrive/packages/homepage/chat/src/replication.rs index c19809ed6..20a3f3533 100644 --- a/hyperdrive/packages/homepage/chat/src/replication.rs +++ b/hyperdrive/packages/homepage/chat/src/replication.rs @@ -351,7 +351,9 @@ impl ChatState { if let Err(err) = self.apply_broker_envelope(&topic, &env, now) { crate::log_debug!( "[BROKER] topic={} offset={} apply error: {}", - topic, env.offset, err + topic, + env.offset, + err ); continue; } @@ -489,7 +491,10 @@ impl ChatState { if age > SUBSCRIBER_ACK_DEADLINE_SECS { crate::log_debug!( "[BROKER][{}] delivery lag {}s topic={} offset={}", - group_id, age, topic, env.offset + group_id, + age, + topic, + env.offset ); } if is_subscriber_topic { @@ -498,7 +503,10 @@ impl ChatState { self.replication_metrics.drops = self.replication_metrics.drops.saturating_add(1); crate::log_debug!( "[BROKER][{}] drop stale subscriber envelope topic={} offset={} age={}s", - group_id, topic, env.offset, age + group_id, + topic, + env.offset, + age ); return Ok(()); } @@ -506,7 +514,9 @@ impl ChatState { self.replication_metrics.drops = self.replication_metrics.drops.saturating_add(1); crate::log_debug!( "[BROKER][{}] drop duplicate subscriber envelope topic={} offset={}", - group_id, topic, env.offset + group_id, + topic, + env.offset ); return Ok(()); } @@ -521,7 +531,10 @@ impl ChatState { if local != in_acl { crate::log_debug!( "[BROKER][{}] ACL drift topic {} incoming={} local={}", - group_id, topic, in_acl, local + group_id, + topic, + in_acl, + local ); self.replication_metrics.acl_drifts = self.replication_metrics.acl_drifts.saturating_add(1); @@ -648,7 +661,11 @@ mod tests { state.enqueue_stale_subscriber_replays_for_node(now, "local.node"); - let peers: Vec = state.replication_queue.iter().map(|t| t.peer.clone()).collect(); + let peers: Vec = state + .replication_queue + .iter() + .map(|t| t.peer.clone()) + .collect(); assert!(peers.contains(&sub_node)); assert!(!peers.contains(&hub_node)); } diff --git a/hyperdrive/packages/homepage/chat/src/search.rs b/hyperdrive/packages/homepage/chat/src/search.rs index ae99ae064..14edfef83 100644 --- a/hyperdrive/packages/homepage/chat/src/search.rs +++ b/hyperdrive/packages/homepage/chat/src/search.rs @@ -161,8 +161,7 @@ impl SearchIndex { } if !doc_ids.is_empty() { - self.group_doc_ids - .insert(group_id.to_string(), doc_ids); + self.group_doc_ids.insert(group_id.to_string(), doc_ids); } } @@ -368,7 +367,11 @@ fn build_group_summary_doc(group_id: &GroupId, group: &Group) -> Option Deserialize<'de> for ChatState { group_unread, group_notify, node_profiles, - ) = - match ChatStateCompat::deserialize(deserializer)? { - ChatStateCompat::V2(data) => ( - data.profile, - data.chats, - data.chat_keys, - data.group_join_keys, - data.settings, - data.spider_api_key, - data.spider_history, - data.message_sequence_counters, - data.groups, - data.group_unread, - data.group_notify, - data.node_profiles, - ), - ChatStateCompat::V2Legacy(data) => ( - data.profile, - data.chats, - data.chat_keys, - HashMap::new(), - data.settings, - None, - Vec::new(), - data.message_sequence_counters, - data.groups, - data.group_unread, - data.group_notify, - data.node_profiles, - ), - ChatStateCompat::V1(data) => ( - data.profile, - data.chats, - data.chat_keys, - HashMap::new(), - data.settings, - None, - Vec::new(), - HashMap::new(), - HashMap::new(), - HashMap::new(), - HashMap::new(), - data.node_profiles, - ), - }; + ) = match ChatStateCompat::deserialize(deserializer)? { + ChatStateCompat::V2(data) => ( + data.profile, + data.chats, + data.chat_keys, + data.group_join_keys, + data.settings, + data.spider_api_key, + data.spider_history, + data.message_sequence_counters, + data.groups, + data.group_unread, + data.group_notify, + data.node_profiles, + ), + ChatStateCompat::V2Legacy(data) => ( + data.profile, + data.chats, + data.chat_keys, + HashMap::new(), + data.settings, + None, + Vec::new(), + data.message_sequence_counters, + data.groups, + data.group_unread, + data.group_notify, + data.node_profiles, + ), + ChatStateCompat::V1(data) => ( + data.profile, + data.chats, + data.chat_keys, + HashMap::new(), + data.settings, + None, + Vec::new(), + HashMap::new(), + HashMap::new(), + HashMap::new(), + HashMap::new(), + data.node_profiles, + ), + }; let (delivery_tx, delivery_rx) = DeliveryTx::new(); let (replication_tx, replication_rx) = ReplicationTx::new(); let (replication_wake_tx, replication_wake_rx) = ReplicationWakeTx::new(); @@ -734,8 +737,7 @@ impl ChatState { pub fn rebuild_group_search(&mut self, group_id: &GroupId) { let our_node = our().node.clone(); if let Some(group) = self.groups.get(group_id) { - self.search_index - .rebuild_group(group_id, group, &our_node); + self.search_index.rebuild_group(group_id, group, &our_node); } else { self.search_index.remove_group(group_id); } @@ -880,7 +882,9 @@ impl ChatState { if let Err(err) = self.require_hub_access(group_id, &local_node) { crate::log_debug!( "[CRDT][{}] skip publish: node {} lacks hub access ({})", - group_id, local_node, err + group_id, + local_node, + err ); self.replication_metrics.acl_skips = self.replication_metrics.acl_skips.saturating_add(1); @@ -1071,7 +1075,9 @@ impl ChatState { if let Err(err) = self.commit_group_crdt(group_id) { crate::log_debug!( "Failed to commit group CRDT state (group={} context={}): {:?}", - group_id, context, err + group_id, + context, + err ); } } diff --git a/hyperdrive/packages/homepage/chat/src/ws.rs b/hyperdrive/packages/homepage/chat/src/ws.rs index dcaba638a..a77c2ad8b 100644 --- a/hyperdrive/packages/homepage/chat/src/ws.rs +++ b/hyperdrive/packages/homepage/chat/src/ws.rs @@ -180,21 +180,18 @@ impl ChatState { } }; - let request = Request::to(Address::new( - "our", - ("http-server", "distro", "sys"), - )) - .body( - serde_json::to_vec(&HttpServerRequest::WebSocketPush { - channel_id, - message_type: WsMessageType::Text, - }) - .unwrap(), - ) - .blob(LazyLoadBlob { - mime: Some("application/json".to_string()), - bytes, - }); + let request = Request::to(Address::new("our", ("http-server", "distro", "sys"))) + .body( + serde_json::to_vec(&HttpServerRequest::WebSocketPush { + channel_id, + message_type: WsMessageType::Text, + }) + .unwrap(), + ) + .blob(LazyLoadBlob { + mime: Some("application/json".to_string()), + bytes, + }); // Send and await response to detect stale channels match request.send_and_await_response(2) { @@ -202,18 +199,29 @@ impl ChatState { // Check if response body contains "WsChannelNotFound" if let Ok(body_str) = String::from_utf8(response.body().to_vec()) { if body_str.contains("WsChannelNotFound") { - crate::log_debug!("[WS_DEBUG] Channel {} not found, marking for removal", channel_id); + crate::log_debug!( + "[WS_DEBUG] Channel {} not found, marking for removal", + channel_id + ); return false; } } true } Ok(Err(send_err)) => { - crate::log_debug!("[WS_DEBUG] Send error for channel {}: {:?}", channel_id, send_err); + crate::log_debug!( + "[WS_DEBUG] Send error for channel {}: {:?}", + channel_id, + send_err + ); false } Err(err) => { - crate::log_debug!("[WS_DEBUG] Failed to push to channel {}: {:?}", channel_id, err); + crate::log_debug!( + "[WS_DEBUG] Failed to push to channel {}: {:?}", + channel_id, + err + ); false } } @@ -222,7 +230,11 @@ impl ChatState { /// Broadcast a message to all WebSocket connections, removing stale channels. pub(crate) fn broadcast_ws_message(&mut self, message: &WsServerMessage) { let channels: Vec = self.ws_connections.keys().cloned().collect(); - crate::log_debug!("[WS_DEBUG] broadcast_ws_message: ws_connections has {} channels: {:?}", channels.len(), channels); + crate::log_debug!( + "[WS_DEBUG] broadcast_ws_message: ws_connections has {} channels: {:?}", + channels.len(), + channels + ); let mut stale_channels = Vec::new(); for channel_id in channels { @@ -233,7 +245,10 @@ impl ChatState { // Clean up stale channels for channel_id in stale_channels { - crate::log_debug!("[WS_DEBUG] Removing stale channel {} from ws_connections", channel_id); + crate::log_debug!( + "[WS_DEBUG] Removing stale channel {} from ws_connections", + channel_id + ); self.ws_connections.remove(&channel_id); self.browser_connections.retain(|_, &mut v| v != channel_id); self.active_connections.remove(&channel_id);