Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 59 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ actix-http = "3"
anyhow = "1"
async-stomp = "0.6"
awc = "3"
backoff = { version = "0.4", features = ["tokio"] }
bytes = "1"
futures-util = "0.3"
pretty-readme = "0.1"
Expand Down
48 changes: 39 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ This crate provides a simple client to connect to a STOMP-enabled WebSocket serv
* Connects to STOMP servers over WebSocket using [`awc`](https://crates.io/crates/awc).
* Handles all STOMP protocol encoding and decoding via [`async-stomp`](https://crates.io/crates/async-stomp).
* Manages WebSocket ping/pong heartbeats automatically in a background task.
* Provides a simple `tokio::mpsc` channel-based API (`WStompClient`) for sending and receiving STOMP frames.
* Provides a simple `tokio::mpsc` channel-based API ([`WStompClient`]) for sending and receiving STOMP frames.
* Connection helpers for various authentication methods:
* `connect`: Anonymous connection.
* `connect_with_pass`: Login and passcode authentication.
* `connect_with_token`: Authentication using an authorization token header.
* [`connect`]: Anonymous connection.
* [`connect_with_pass`]: Login and passcode authentication.
* [`connect_with_token`]: Authentication using an authorization token header.
* Optional `rustls` feature for SSL connections, with helpers that force HTTP/1.1 for compatibility with servers like SockJS.

## Installation
Expand All @@ -24,8 +24,6 @@ Add this to your `Cargo.toml`:
[dependencies]
wstomp = "0.1.0" # Replace with the actual version
actix-rt = "2.0"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
futures-util = "0.3"
```

For SSL support, enable the `rustls` feature:
Expand Down Expand Up @@ -109,7 +107,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
other => println!("Received other frame: {:?}", other),
}
}
WStompEvent::WebsocketClosed(reason) => break,
// Handle errors
WStompEvent::Error(err) => {
match err {
Expand Down Expand Up @@ -176,18 +173,51 @@ async fn main() {
}
```

### Auto-reconnect

Use [`WStompConfig::build_and_connect_with_reconnection_cb`] method to automatically perform a full reconnect upon errors.

```rust,no_run
use wstomp::{WStompClient, WStompConfig, WStompConnectError};

#[actix_rt::main]
async fn main() {
let url = "wss://secure-server.com/ws";
let session_token = "session_token";

let cb = {
move |wstomp_client_res: Result<WStompClient, WStompConnectError>| {
async move {
// Unwrap wstomp client here or react to an error.
// Upon an error you can return from the callback to make wstomp library a re-connection attempt
}
}
};

let res = WStompConfig::new(url)
.ssl()
.auth_token(session_token)
.build_and_connect_with_reconnection_cb(cb);

// ... do different stuff here, but don't exit immediately as this will terminate wstomp loop.
}
```

## Error Handling

The connection functions (`connect`, `connect_ssl`, etc.) return a `Result<WStompClient, WStompConnectError>`.
The connection functions ([`connect`], [`connect_ssl`], etc.) return a `Result<WStompClient, WStompConnectError>`.

Once connected, the `WStompClient::rx` channel produces `WStompEvent` items, it may be a message, websocket closing, or `WStompError`.
Once connected, the `WStompClient::rx` channel produces [`WStompEvent`] items, it may be a message or [`WStompError`].

* **`WStompConnectError`**: An error that occurs during the initial WebSocket and STOMP `CONNECT` handshake.

* **`WStompError`**: An error that occurs *after* a successful connection.
* `WsReceive` / `WsSend`: A WebSocket protocol error.
* `StompDecoding` / `StompEncoding`: A STOMP frame decoding/encoding error.
* `IncompleteStompFrame`: A warning indicating that data was received but was not enough to form a complete STOMP frame. The client has dropped this data. This is often safe to ignore or log as a warning.
* `WebsocketClosed`: WebSocket was closed, possibly a reason from `awc` library is inside.
* `PingFailed`: Couldn't send ping through the WebSocket protocol.
* `PingTimeout`: There was no pong for last ping.

## License

Expand Down
3 changes: 3 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ impl WStompClient {
/// You can use this struct directly by passing the `Framed` object you get from `awc` into this constructor.
/// This will create a background worker in actix system (on current thread), which will encode and decode STOMP messages for you.
/// It also manages websocket ping-pong heartbeat.
///
/// NOTE: This method does not perform automatic reconnection.
/// Use [WStompConfig::build_and_connect_with_reconnection_cb] to auto-reconnect.
pub fn from_framed(ws_framed: Framed<BoxedSocket, Codec>) -> Self {
// Channel for you to send STOMP frames to the handler task
let (app_tx, app_rx) = mpsc::channel::<Message<ToServer>>(100);
Expand Down
83 changes: 81 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub struct WStompConfig<U> {
opts: WStompConfigOpts,
}

#[derive(Default)]
#[derive(Clone)]
pub struct WStompConfigOpts {
#[cfg(feature = "rustls")]
pub ssl: bool,
Expand All @@ -12,6 +12,30 @@ pub struct WStompConfigOpts {
pub passcode: Option<String>,
pub additional_headers: Vec<(String, String)>,
pub client: Option<awc::Client>,

// Reconnection opts in seconds
pub retry_initial_interval: u64,
pub retry_max_interval: u64,
pub retry_multiplier: f64,
pub retry_max_elapsed_time: Option<u64>,
}

impl Default for WStompConfigOpts {
fn default() -> Self {
Self {
ssl: Default::default(),
auth_token: Default::default(),
login: Default::default(),
passcode: Default::default(),
additional_headers: Default::default(),
client: Default::default(),

retry_initial_interval: 3,
retry_max_interval: 60,
retry_multiplier: 1.2,
retry_max_elapsed_time: None,
}
}
}

impl<U> WStompConfig<U> {
Expand All @@ -22,47 +46,102 @@ impl<U> WStompConfig<U> {
}
}

/// Get url to which this config is assigned to use.
pub fn get_url(&self) -> &U {
&self.url
}

/// Get options for this config.
pub fn get_opts(&self) -> &WStompConfigOpts {
&self.opts
}

pub fn into_inner(self) -> (U, WStompConfigOpts) {
/// De-couple url and options in this config.
pub fn into_parts(self) -> (U, WStompConfigOpts) {
(self.url, self.opts)
}

// Setters

/// Enables TLS/SSL encryption for the connection.
///
/// When set, the client will attempt to perform a secure handshake
/// (typically for `wss://` schemes).
pub fn ssl(mut self) -> Self {
self.opts.ssl = true;
self
}

/// Sets the authentication token for the connection.
pub fn auth_token(mut self, auth_token: impl Into<String>) -> Self {
self.opts.auth_token = Some(auth_token.into());
self
}

/// Sets the `login` header for STOMP authentication.
pub fn login(mut self, login: impl Into<String>) -> Self {
self.opts.login = Some(login.into());
self
}

/// Sets the `passcode` header for STOMP authentication.
pub fn passcode(mut self, passcode: impl Into<String>) -> Self {
self.opts.passcode = Some(passcode.into());
self
}

/// Appends a list of custom headers to the connection configuration.
///
/// These headers will be included in the STOMP `CONNECT` frame.
/// This method does not replace existing headers; it extends the list.
pub fn add_headers(mut self, additional_headers: Vec<(String, String)>) -> Self {
self.opts.additional_headers.extend(additional_headers);
self
}

/// Sets a custom `awc::Client` instance.
///
/// Use this if you need to provide a pre-configured HTTP client (e.g.,
/// with custom timeouts, proxy settings, or connector configurations)
/// instead of letting the library create a default one.
pub fn client(mut self, client: awc::Client) -> Self {
self.opts.client = Some(client);
self
}

/// If [Self::build_and_connect_with_reconnection_cb] method is used,
/// sets the initial retry interval in seconds.
///
/// Example: Start retrying after 3 seconds.
pub fn retry_initial_interval(mut self, seconds: u64) -> Self {
self.opts.retry_initial_interval = seconds;
self
}

/// If [Self::build_and_connect_with_reconnection_cb] method is used,
/// sets the maximum retry interval in seconds.
///
/// Example: Cap the wait time at 30 seconds.
pub fn retry_max_interval(mut self, seconds: u64) -> Self {
self.opts.retry_max_interval = seconds;
self
}

/// If [Self::build_and_connect_with_reconnection_cb] method is used,
/// sets the multiplier for the backoff.
///
/// Example: 2.0 doubles the wait time after every failure.
pub fn retry_multiplier(mut self, multiplier: f64) -> Self {
self.opts.retry_multiplier = multiplier;
self
}

/// If [Self::build_and_connect_with_reconnection_cb] method is used,
/// sets a maximum total time to try reconnecting before giving up.
///
/// Defaults to no limit if method not invoked.
pub fn retry_max_elapsed_time(mut self, seconds: u64) -> Self {
self.opts.retry_max_elapsed_time = Some(seconds);
self
}
}
Loading
Loading