From 6b0e42ee18560664c9970a24b7470df8ccb405c0 Mon Sep 17 00:00:00 2001 From: Michal Pokrywka Date: Wed, 5 Nov 2025 15:53:27 +0100 Subject: [PATCH] WStompEvent instead of Result --- README.md | 18 +++++++++--------- src/client.rs | 10 +++++----- src/connect.rs | 1 - src/lib.rs | 4 ++-- src/stomp_handler.rs | 25 +++++++++++-------------- src/{error.rs => wstomp_event.rs} | 24 ++++++++++++++++++++++-- 6 files changed, 49 insertions(+), 33 deletions(-) rename src/{error.rs => wstomp_event.rs} (77%) diff --git a/README.md b/README.md index 88500cb..080fb77 100644 --- a/README.md +++ b/README.md @@ -43,7 +43,7 @@ Here is a basic example of connecting, subscribing to a topic, and receiving mes use wstomp::{ connect_with_pass, stomp::{FromServer, Message, ToServer, client::Subscriber}, - WStompError, + WStompEvent, WStompError, }; #[actix_rt::main] @@ -72,10 +72,10 @@ async fn main() -> Result<(), Box> { println!("Subscribed! Waiting for messages..."); // 4. Listen for incoming messages - loop { - match client.recv().await { + while let Some(event) = client.recv().await { + match event { // Receive messages from the server - Some(Ok(msg)) => { + WStompEvent::Message(msg) => { match msg.content { FromServer::Message { destination, @@ -109,9 +109,10 @@ async fn main() -> Result<(), Box> { other => println!("Received other frame: {:?}", other), } } + WStompEvent::WebsocketClosed(reason) => break, // Handle errors - Some(Err(e)) => { - match e { + WStompEvent::Error(err) => { + match err { WStompError::IncompleteStompFrame => { // This is a warning, you can choose to ignore it or log it eprintln!("Warning: Dropped incomplete STOMP frame."); @@ -123,8 +124,6 @@ async fn main() -> Result<(), Box> { } } } - // Exit loop if channel closes - None => break, } } @@ -181,9 +180,10 @@ async fn main() { The connection functions (`connect`, `connect_ssl`, etc.) return a `Result`. -Once connected, the `WStompClient::rx` channel produces `Result, WStompError>` items. You should check for errors in your receive loop. +Once connected, the `WStompClient::rx` channel produces `WStompEvent` items, it may be a message, websocket closing, or `WStompError`. * **`WStompConnectError`**: An error that occurs during the initial WebSocket and STOMP `CONNECT` handshake. + * **`WStompError`**: An error that occurs *after* a successful connection. * `WsReceive` / `WsSend`: A WebSocket protocol error. * `StompDecoding` / `StompEncoding`: A STOMP frame decoding/encoding error. diff --git a/src/client.rs b/src/client.rs index 0d6487f..8dc2131 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,13 +1,13 @@ use actix_codec::Framed; use actix_http::Uri; -use async_stomp::{FromServer, Message, ToServer}; +use async_stomp::{Message, ToServer}; use awc::{BoxedSocket, error::HttpError, ws::Codec}; use tokio::sync::mpsc::{self, Receiver, Sender, error::SendError}; -use crate::{WStompConfig, WStompError, stomp_handler::stomp_handler_task}; +use crate::{WStompConfig, stomp_handler::stomp_handler_task, wstomp_event::WStompEvent}; pub type WStompSender = Sender>; -pub type WStompReceiver = Receiver, WStompError>>; +pub type WStompReceiver = Receiver; /// Your client which reads websocket and produces STOMP messages. Also takes STOMP messages from you and sends it through websocket pub struct WStompClient { @@ -36,7 +36,7 @@ impl WStompClient { let (app_tx, app_rx) = mpsc::channel::>(100); // Channel for the handler task to send STOMP frames back to you - let (stomp_tx, stomp_rx) = mpsc::channel::, WStompError>>(100); + let (stomp_tx, stomp_rx) = mpsc::channel::(100); // Spawn the task that handles all the low-level logic. actix_rt::spawn(stomp_handler_task(ws_framed, app_rx, stomp_tx)); @@ -47,7 +47,7 @@ impl WStompClient { } } - pub async fn recv(&mut self) -> Option, WStompError>> { + pub async fn recv(&mut self) -> Option { self.rx.recv().await } diff --git a/src/connect.rs b/src/connect.rs index bc6a40a..ccbaadb 100644 --- a/src/connect.rs +++ b/src/connect.rs @@ -123,7 +123,6 @@ impl WStompConfig { stomp_client .send(connect_msg) .await - .inspect_err(|err| println!("CONNECT error: {err}")) .map_err(WStompConnectError::ConnectMessageFailed)?; Ok(stomp_client) diff --git a/src/lib.rs b/src/lib.rs index 7cb30e2..e285886 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,8 +15,8 @@ mod stomp_handler; mod connect_ssl; pub use connect_ssl::{connect_ssl, connect_ssl_with_pass, connect_ssl_with_token}; -pub mod error; -pub use error::{WStompConnectError, WStompError}; +pub mod wstomp_event; +pub use wstomp_event::{WStompConnectError, WStompError, WStompEvent}; // # Re-export stomp structs pub mod stomp { diff --git a/src/stomp_handler.rs b/src/stomp_handler.rs index c2cb3c5..590f39c 100644 --- a/src/stomp_handler.rs +++ b/src/stomp_handler.rs @@ -1,6 +1,6 @@ use actix_codec::Framed; use actix_http::ws::{Codec, Frame as WsFrame, Item as WsItem, Message as WsMessage}; -use async_stomp::{FromServer, Message, ToServer, client::ClientCodec}; +use async_stomp::{Message, ToServer, client::ClientCodec}; use awc::BoxedSocket; use bytes::{Bytes, BytesMut}; use futures_util::{SinkExt, StreamExt}; @@ -11,7 +11,7 @@ use tokio::{ }; use tokio_util::codec::{Decoder, Encoder}; -use crate::WStompError; +use crate::{WStompError, wstomp_event::WStompEvent}; /// This is the internal task that manages the connection. /// It multiplexes between: @@ -21,7 +21,7 @@ use crate::WStompError; pub(crate) async fn stomp_handler_task( ws_framed: Framed, mut app_rx: Receiver>, - stomp_tx: Sender, WStompError>>, + stomp_tx: Sender, ) { let (mut ws_sink, mut ws_stream) = ws_framed.split(); let mut stomp_codec = ClientCodec; @@ -38,7 +38,7 @@ pub(crate) async fn stomp_handler_task( match ws_frame { WsFrame::Ping(bytes) => { if let Err(e) = ws_sink.send(WsMessage::Pong(bytes)).await { - let _ = stomp_tx.send(Err(WStompError::WsSend(e))).await; + let _ = stomp_tx.send(WStompError::WsSend(e).into()).await; break; } } @@ -51,7 +51,7 @@ pub(crate) async fn stomp_handler_task( finished_reading = true; } WsFrame::Close(reason) => { - println!("Server closed WebSocket: {:?}", reason); + let _ = stomp_tx.send(WStompEvent::WebsocketClosed(reason)).await; break; } WsFrame::Pong(_) => {} @@ -82,19 +82,19 @@ pub(crate) async fn stomp_handler_task( Ok(Some(stomp_frame)) => { read_buf.clear(); // Decoded a STOMP frame, send it to the app - if stomp_tx.send(Ok(stomp_frame)).await.is_err() { + if stomp_tx.send(WStompEvent::Message(stomp_frame)).await.is_err() { // Receiver was dropped, app is gone. break; } } Ok(None) => { // Not enough data for a full STOMP frame, wait for more. - let _ = stomp_tx.send(Err(WStompError::IncompleteStompFrame)).await; + let _ = stomp_tx.send(WStompError::IncompleteStompFrame.into()).await; break; } Err(e) => { // STOMP decoding error - let _ = stomp_tx.send(Err(WStompError::StompDecoding(e))).await; + let _ = stomp_tx.send(WStompError::StompDecoding(e).into()).await; break; } } @@ -108,14 +108,14 @@ pub(crate) async fn stomp_handler_task( Ok(_) => { // Send it as a WebSocket Binary message if let Err(e) = ws_sink.send(WsMessage::Binary(encode_buf.clone().freeze())).await { - let _ = stomp_tx.send(Err(WStompError::WsReceive(e))).await; + let _ = stomp_tx.send(WStompError::WsReceive(e).into()).await; break; } encode_buf.clear(); } Err(e) => { // STOMP encoding error - let _ = stomp_tx.send(Err(WStompError::StompEncoding(e))).await; + let _ = stomp_tx.send(WStompError::StompEncoding(e).into()).await; } } } @@ -125,10 +125,7 @@ pub(crate) async fn stomp_handler_task( } // 3. Both streams closed, exit loop - else => { - println!("STOMP client task shutting down."); - break; - } + else => break, } } } diff --git a/src/error.rs b/src/wstomp_event.rs similarity index 77% rename from src/error.rs rename to src/wstomp_event.rs index 6bcf25c..d87e60f 100644 --- a/src/error.rs +++ b/src/wstomp_event.rs @@ -1,5 +1,8 @@ -use async_stomp::{Message, ToServer}; -use awc::error::{WsClientError, WsProtocolError}; +use async_stomp::{FromServer, Message, ToServer}; +use awc::{ + error::{WsClientError, WsProtocolError}, + ws::CloseReason, +}; use tokio::sync::mpsc::error::SendError; #[derive(Debug)] @@ -8,6 +11,23 @@ pub enum WStompConnectError { ConnectMessageFailed(SendError>), } +/// Custom enum combine events in WebSocket and STOMP +#[derive(Debug)] +pub enum WStompEvent { + /// Regular message from STOMP protocol + Message(Message), + /// Websocket closed connection (with reason) + WebsocketClosed(Option), + /// WebSocket or STOMP error combined + Error(WStompError), +} + +impl From for WStompEvent { + fn from(err: WStompError) -> Self { + Self::Error(err) + } +} + /// Custom error type to combine WebSocket and STOMP errors. #[derive(Debug)] pub enum WStompError {