Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ Here is a basic example of connecting, subscribing to a topic, and receiving mes
use wstomp::{
connect_with_pass,
stomp::{FromServer, Message, ToServer, client::Subscriber},
WStompError,
WStompEvent, WStompError,
};

#[actix_rt::main]
Expand Down Expand Up @@ -72,10 +72,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Subscribed! Waiting for messages...");

// 4. Listen for incoming messages
loop {
match client.recv().await {
while let Some(event) = client.recv().await {
match event {
// Receive messages from the server
Some(Ok(msg)) => {
WStompEvent::Message(msg) => {
match msg.content {
FromServer::Message {
destination,
Expand Down Expand Up @@ -109,9 +109,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
other => println!("Received other frame: {:?}", other),
}
}
WStompEvent::WebsocketClosed(reason) => break,
// Handle errors
Some(Err(e)) => {
match e {
WStompEvent::Error(err) => {
match err {
WStompError::IncompleteStompFrame => {
// This is a warning, you can choose to ignore it or log it
eprintln!("Warning: Dropped incomplete STOMP frame.");
Expand All @@ -123,8 +124,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}
}
// Exit loop if channel closes
None => break,
}
}

Expand Down Expand Up @@ -181,9 +180,10 @@ async fn main() {

The connection functions (`connect`, `connect_ssl`, etc.) return a `Result<WStompClient, WStompConnectError>`.

Once connected, the `WStompClient::rx` channel produces `Result<Message<FromServer>, WStompError>` items. You should check for errors in your receive loop.
Once connected, the `WStompClient::rx` channel produces `WStompEvent` items, it may be a message, websocket closing, or `WStompError`.

* **`WStompConnectError`**: An error that occurs during the initial WebSocket and STOMP `CONNECT` handshake.

* **`WStompError`**: An error that occurs *after* a successful connection.
* `WsReceive` / `WsSend`: A WebSocket protocol error.
* `StompDecoding` / `StompEncoding`: A STOMP frame decoding/encoding error.
Expand Down
10 changes: 5 additions & 5 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use actix_codec::Framed;
use actix_http::Uri;
use async_stomp::{FromServer, Message, ToServer};
use async_stomp::{Message, ToServer};
use awc::{BoxedSocket, error::HttpError, ws::Codec};
use tokio::sync::mpsc::{self, Receiver, Sender, error::SendError};

use crate::{WStompConfig, WStompError, stomp_handler::stomp_handler_task};
use crate::{WStompConfig, stomp_handler::stomp_handler_task, wstomp_event::WStompEvent};

pub type WStompSender = Sender<Message<ToServer>>;
pub type WStompReceiver = Receiver<Result<Message<FromServer>, WStompError>>;
pub type WStompReceiver = Receiver<WStompEvent>;

/// Your client which reads websocket and produces STOMP messages. Also takes STOMP messages from you and sends it through websocket
pub struct WStompClient {
Expand Down Expand Up @@ -36,7 +36,7 @@ impl WStompClient {
let (app_tx, app_rx) = mpsc::channel::<Message<ToServer>>(100);

// Channel for the handler task to send STOMP frames back to you
let (stomp_tx, stomp_rx) = mpsc::channel::<Result<Message<FromServer>, WStompError>>(100);
let (stomp_tx, stomp_rx) = mpsc::channel::<WStompEvent>(100);

// Spawn the task that handles all the low-level logic.
actix_rt::spawn(stomp_handler_task(ws_framed, app_rx, stomp_tx));
Expand All @@ -47,7 +47,7 @@ impl WStompClient {
}
}

pub async fn recv(&mut self) -> Option<Result<Message<FromServer>, WStompError>> {
pub async fn recv(&mut self) -> Option<WStompEvent> {
self.rx.recv().await
}

Expand Down
1 change: 0 additions & 1 deletion src/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ impl<U> WStompConfig<U> {
stomp_client
.send(connect_msg)
.await
.inspect_err(|err| println!("CONNECT error: {err}"))
.map_err(WStompConnectError::ConnectMessageFailed)?;

Ok(stomp_client)
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ mod stomp_handler;
mod connect_ssl;
pub use connect_ssl::{connect_ssl, connect_ssl_with_pass, connect_ssl_with_token};

pub mod error;
pub use error::{WStompConnectError, WStompError};
pub mod wstomp_event;
pub use wstomp_event::{WStompConnectError, WStompError, WStompEvent};

// # Re-export stomp structs
pub mod stomp {
Expand Down
25 changes: 11 additions & 14 deletions src/stomp_handler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use actix_codec::Framed;
use actix_http::ws::{Codec, Frame as WsFrame, Item as WsItem, Message as WsMessage};
use async_stomp::{FromServer, Message, ToServer, client::ClientCodec};
use async_stomp::{Message, ToServer, client::ClientCodec};
use awc::BoxedSocket;
use bytes::{Bytes, BytesMut};
use futures_util::{SinkExt, StreamExt};
Expand All @@ -11,7 +11,7 @@ use tokio::{
};
use tokio_util::codec::{Decoder, Encoder};

use crate::WStompError;
use crate::{WStompError, wstomp_event::WStompEvent};

/// This is the internal task that manages the connection.
/// It multiplexes between:
Expand All @@ -21,7 +21,7 @@ use crate::WStompError;
pub(crate) async fn stomp_handler_task(
ws_framed: Framed<BoxedSocket, Codec>,
mut app_rx: Receiver<Message<ToServer>>,
stomp_tx: Sender<Result<Message<FromServer>, WStompError>>,
stomp_tx: Sender<WStompEvent>,
) {
let (mut ws_sink, mut ws_stream) = ws_framed.split();
let mut stomp_codec = ClientCodec;
Expand All @@ -38,7 +38,7 @@ pub(crate) async fn stomp_handler_task(
match ws_frame {
WsFrame::Ping(bytes) => {
if let Err(e) = ws_sink.send(WsMessage::Pong(bytes)).await {
let _ = stomp_tx.send(Err(WStompError::WsSend(e))).await;
let _ = stomp_tx.send(WStompError::WsSend(e).into()).await;
break;
}
}
Expand All @@ -51,7 +51,7 @@ pub(crate) async fn stomp_handler_task(
finished_reading = true;
}
WsFrame::Close(reason) => {
println!("Server closed WebSocket: {:?}", reason);
let _ = stomp_tx.send(WStompEvent::WebsocketClosed(reason)).await;
break;
}
WsFrame::Pong(_) => {}
Expand Down Expand Up @@ -82,19 +82,19 @@ pub(crate) async fn stomp_handler_task(
Ok(Some(stomp_frame)) => {
read_buf.clear();
// Decoded a STOMP frame, send it to the app
if stomp_tx.send(Ok(stomp_frame)).await.is_err() {
if stomp_tx.send(WStompEvent::Message(stomp_frame)).await.is_err() {
// Receiver was dropped, app is gone.
break;
}
}
Ok(None) => {
// Not enough data for a full STOMP frame, wait for more.
let _ = stomp_tx.send(Err(WStompError::IncompleteStompFrame)).await;
let _ = stomp_tx.send(WStompError::IncompleteStompFrame.into()).await;
break;
}
Err(e) => {
// STOMP decoding error
let _ = stomp_tx.send(Err(WStompError::StompDecoding(e))).await;
let _ = stomp_tx.send(WStompError::StompDecoding(e).into()).await;
break;
}
}
Expand All @@ -108,14 +108,14 @@ pub(crate) async fn stomp_handler_task(
Ok(_) => {
// Send it as a WebSocket Binary message
if let Err(e) = ws_sink.send(WsMessage::Binary(encode_buf.clone().freeze())).await {
let _ = stomp_tx.send(Err(WStompError::WsReceive(e))).await;
let _ = stomp_tx.send(WStompError::WsReceive(e).into()).await;
break;
}
encode_buf.clear();
}
Err(e) => {
// STOMP encoding error
let _ = stomp_tx.send(Err(WStompError::StompEncoding(e))).await;
let _ = stomp_tx.send(WStompError::StompEncoding(e).into()).await;
}
}
}
Expand All @@ -125,10 +125,7 @@ pub(crate) async fn stomp_handler_task(
}

// 3. Both streams closed, exit loop
else => {
println!("STOMP client task shutting down.");
break;
}
else => break,
}
}
}
24 changes: 22 additions & 2 deletions src/error.rs → src/wstomp_event.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use async_stomp::{Message, ToServer};
use awc::error::{WsClientError, WsProtocolError};
use async_stomp::{FromServer, Message, ToServer};
use awc::{
error::{WsClientError, WsProtocolError},
ws::CloseReason,
};
use tokio::sync::mpsc::error::SendError;

#[derive(Debug)]
Expand All @@ -8,6 +11,23 @@ pub enum WStompConnectError {
ConnectMessageFailed(SendError<Message<ToServer>>),
}

/// Custom enum combine events in WebSocket and STOMP
#[derive(Debug)]
pub enum WStompEvent {
/// Regular message from STOMP protocol
Message(Message<FromServer>),
/// Websocket closed connection (with reason)
WebsocketClosed(Option<CloseReason>),
/// WebSocket or STOMP error combined
Error(WStompError),
}

impl From<WStompError> for WStompEvent {
fn from(err: WStompError) -> Self {
Self::Error(err)
}
}

/// Custom error type to combine WebSocket and STOMP errors.
#[derive(Debug)]
pub enum WStompError {
Expand Down
Loading