Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
d5621e4
use NonZero for topic alias
julian-graf Apr 17, 2026
6a249c3
add shared subscription topic filters
julian-graf Apr 17, 2026
9e403c0
prevent protocol error on unavailable shared subscription use
julian-graf Apr 17, 2026
3ffea5b
add explicit wildcard check to TopicFilter
julian-graf Apr 17, 2026
db1e53f
prevent protocol error on unavailable wildcard subscription use
julian-graf Apr 17, 2026
3a4a4d8
prevent protocol error on unavailable subscription identifier use
julian-graf Apr 17, 2026
49ab577
prevent protocol error on server maximum qos exceedance
julian-graf Apr 17, 2026
a533f8c
prevent protocol error on unavailable retain use
julian-graf Apr 17, 2026
eae3c53
replace InvalidTopicAlias error variant with UnsupportedByServer
julian-graf Apr 17, 2026
9faaedd
improve QoS documentation
julian-graf Apr 17, 2026
a8188e6
add shared & wildcard subscriptions to supported features
julian-graf Apr 17, 2026
03a1d9d
fix readme api example
julian-graf Apr 17, 2026
d8881b7
document changes
julian-graf Apr 17, 2026
cd3d68a
add more documentation for new preconditions
julian-graf Apr 18, 2026
efbaf16
fix fmt and clippy
julian-graf Apr 18, 2026
78ae71e
move ConnectInfo into event and rename to Connected
julian-graf Apr 19, 2026
7e7dc7b
inline ReceiveMaximum as NonZero and use private property newtype
julian-graf Apr 19, 2026
f2c28e4
improve documentation of *Config structs
julian-graf Apr 19, 2026
a0d0894
improve documentation of Connected
julian-graf Apr 19, 2026
2f79694
fix spelling of server-side
julian-graf Apr 19, 2026
ea26331
inline doc-only imports
julian-graf Apr 19, 2026
873a26d
document changes
julian-graf Apr 19, 2026
b1b0bc3
improve DisconnectOptions documentation
julian-graf Apr 19, 2026
947c31c
improve Event documentation
julian-graf Apr 20, 2026
d3c19dc
improve Client documentation
julian-graf Apr 21, 2026
ef970dd
fix message delivery retry doc url
julian-graf Apr 21, 2026
820b39e
relativize doc item paths
julian-graf Apr 21, 2026
514ae27
fix QoS references in options
julian-graf Apr 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add reason string to incoming PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK, UNSUBACK
- Detect protocol error when server sends user properties (only when `MAX_USER_PROPERTIES` > 0) or a reason string in the PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK and UNSUBACK packets
- Fix client not dropping network connection on receival of a DISCONNECT packet leading to a panic on subsequent call to `Client::abort` in debug builds
- Use `NonZero<u16>` for topic alias values in `TopicReference`
- Remove `From<TopicName>` for `TopicFilter` (a correct topic name can be an incorrect shared subscription filter)
- Add explicit support for shared subscriptions
- Prevent protocol error on shared subscriptions when shared subscriptions are not available
- Prevent protocol error on wildcard subscriptions when wildcard subscriptions are not available
- Prevent protocol error on subscription identifier being specified when subscription identifiers are not available
- Prevent protocol error on exceedance of the server's maximum quality of service
- Prevent protocol error on retained publications when retain is not available
- Replace usage of `MqttError::InvalidTopicAlias` with `MqttError::UnsupportedByServer`
- Move `ConnectInfo` into `crate::client::event` and rename to `Connected`
- Replace `ReceiveMaximum` newtype with inlined `NonZero<u16>`

## 0.5.1 - 2026-04-10

Expand Down
17 changes: 10 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ The design goal is a strict yet flexible and explicit API that leverages Rust's
- Flow control
- Configuration & session tracking
- Session recovery
- Client- & serverside maximum packet size
- Client- & server-side maximum packet size
- Subscription identifiers
- Shared & wildcard subscriptions
- Message expiry interval
- Topic alias in outgoing publications
- Request/Response
Expand Down Expand Up @@ -78,23 +79,25 @@ async fn main() {
.clean_start()
.session_expiry_interval(SessionExpiryInterval::NeverEnd)
.user_name(MqttString::from_str("user").unwrap())
.password(MqttBinary::from_slice("pass").unwrap());
.password(MqttBinary::from_slice(b"pass").unwrap());

client.connect(
transport,
&connect_options,
Some(MqttString::from_str("rust-mqtt-demo").unwrap()),
).await.unwrap();

let topic = TopicName::new(MqttString::from_str("demo/topic").unwrap()).unwrap();
let topic = MqttString::from_str("demo/topic").unwrap();

client.subscribe(
topic.as_borrowed().into(),
SubscriptionOptions::new().exactly_once(),
TopicFilter::new(topic.as_borrowed()).unwrap(),
&SubscriptionOptions::new().exactly_once(),
).await.unwrap();

let topic_reference = TopicReference::Name(TopicName::new(topic).unwrap());

let packet_identifier = client.publish(
&PublicationOptions::new(topic.as_borrowed().into()).exactly_once(),
&PublicationOptions::new(topic_reference.as_borrowed()).exactly_once(),
"Hello World!".into(),
).await.unwrap().unwrap();

Expand Down Expand Up @@ -124,7 +127,7 @@ async fn main() {
// - Republish if PUBLISH / PUBREC may have been lost
Some(CPublishFlightState::AwaitingPubrec) => client.republish(
packet_identifier,
&PublicationOptions::new(topic.into()).exactly_once(),
&PublicationOptions::new(topic_reference).exactly_once(),
"Hello World!".into(),
).await.unwrap(),
// - Re-release if PUBREL / PUBCOMP may have been lost
Expand Down
25 changes: 17 additions & 8 deletions examples/demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use rust_mqtt::{
},
},
config::{KeepAlive, SessionExpiryInterval},
types::{MqttBinary, MqttString, TopicName, VarByteInt},
types::{MqttBinary, MqttString, TopicFilter, TopicName, VarByteInt},
};
use tokio::{net::TcpStream, select, time::sleep};

Expand Down Expand Up @@ -90,9 +90,14 @@ async fn main() {
sub_options.subscription_identifier = Some(VarByteInt::from(42u16));
}

let topic = TopicName::new(MqttString::from_str("rust-mqtt/is/great").unwrap()).unwrap();
let topic_string = MqttString::from_str("rust-mqtt/is/great").unwrap();
let topic_filter = TopicFilter::new(topic_string.as_borrowed()).unwrap();
let topic_name = TopicName::new(topic_string.as_borrowed()).unwrap();

match client.subscribe(topic.clone().into(), &sub_options).await {
match client
.subscribe(topic_filter.as_borrowed(), &sub_options)
.await
{
Ok(_) => info!("Sent Subscribe"),
Err(e) => {
error!("Failed to subscribe: {e:?}");
Expand All @@ -119,8 +124,11 @@ async fn main() {
}
}

let pub_options =
PublicationOptions::new(TopicReference::Mapping(topic.clone(), 1)).exactly_once();
let pub_options = PublicationOptions::new(TopicReference::Mapping(
topic_name.as_borrowed(),
NonZero::new(1).unwrap(),
))
.exactly_once();

match client
.publish(&pub_options, Bytes::from("anything".as_bytes()))
Expand Down Expand Up @@ -195,7 +203,7 @@ async fn main() {
}

match client
.unsubscribe(topic.clone().into(), &UnsubscriptionOptions::new())
.unsubscribe(topic_filter.as_borrowed(), &UnsubscriptionOptions::new())
.await
{
Ok(_) => info!("Sent Unsubscribe"),
Expand Down Expand Up @@ -225,7 +233,8 @@ async fn main() {
}

// Start a Quality of Service 2 publish flow
let pub_options = PublicationOptions::new(TopicReference::Alias(1)).exactly_once();
let pub_options =
PublicationOptions::new(TopicReference::Alias(NonZero::new(1).unwrap())).exactly_once();

let incomplete_publish_packet_identifier = match client
.publish(&pub_options, Bytes::from("something".as_bytes()))
Expand Down Expand Up @@ -289,7 +298,7 @@ async fn main() {
}
}

let pub_options = PublicationOptions::new(TopicReference::Name(topic.clone())).exactly_once();
let pub_options = PublicationOptions::new(TopicReference::Name(topic_name)).exactly_once();

match client
.republish(
Expand Down
6 changes: 4 additions & 2 deletions src/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@ use alloc::boxed::Box;
use core::{borrow::Borrow, ops::Deref};

/// Contiguous bytes in memory. Is either a [`u8`] slice or (with crate feature "alloc") an owned
/// [`alloc::boxed::Box`]<[u8]>.
/// [`Box`]<[u8]>.
///
/// It is recommended to almost always use owned [`Bytes`] instead of a reference to [`Bytes`],
/// as it makes this type compatible for code designed for both owned and borrowed variants.
///
/// Important: Cloning this will clone the underlying Box if it is an owned variant.
/// You can however borrow another owned [`Bytes`] by calling [`Bytes::as_borrowed`].
/// The [`Bytes::as_borrowed`] method is passed on through wrapper types, for example
/// [`crate::types::MqttString`].
/// [`MqttString`].
///
/// [`MqttString`]: crate::types::MqttString
pub enum Bytes<'a> {
/// Owned variant, only available with the `alloc` feature enabled.
#[cfg(feature = "alloc")]
Expand Down
105 changes: 72 additions & 33 deletions src/client/err.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,46 +7,61 @@ use crate::{
types::{MqttString, MqttStringPair, ReasonCode, TooLargeToEncode},
};

/// The main error returned by [`crate::client::Client`].
/// The main error returned by [`Client`].
///
/// Distincts between unrecoverable and recoverable errors.
/// Recoverability in this context refers to whether the current network connection can
/// be used for further communication after the error has occured.
///
/// # Recovery
/// - For unrecoverable errors, [`crate::client::Client::abort`] can be called to send an optional DISCONNECT
/// - For unrecoverable errors, [`Client::abort`] can be called to send an optional DISCONNECT
/// packet if allowed by specification. You can then try to recover the session by calling
/// [`crate::client::Client::connect`] again without clean start.
/// [`Client::connect`] again without clean start.
/// - For recoverable errors, follow the error-specific behaviour.
///
/// [`Client`]: crate::client::Client
/// [`Client::abort`]: crate::client::Client::abort
/// [`Client::connect`]: crate::client::Client::connect
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum Error<'e, const MAX_USER_PROPERTIES: usize> {
/// An underlying Read/Write method returned an error.
///
/// Unrecoverable error. [`crate::client::Client::abort`] should be called.
/// Unrecoverable error. [`Client::abort`] should be called.
///
/// [`Client::abort`]: crate::client::Client::abort
Network(ErrorKind),

/// The remote server did something the client does not understand / does not match the specification.
///
/// Unrecoverable error. [`crate::client::Client::abort`] should be called.
/// Unrecoverable error. [`Client::abort`] should be called.
///
/// [`Client::abort`]: crate::client::Client::abort
Server,

/// A buffer provision by the [`crate::buffer::BufferProvider`] failed. Therefore a packet
/// could not be received correctly.
/// A buffer provision by the [`BufferProvider`] failed. Therefore a packet could not be received
/// correctly.
///
/// Unrecoverable error. [`crate::client::Client::abort`] should be called.
/// Unrecoverable error. [`Client::abort`] should be called.
///
/// [`BufferProvider`]: crate::buffer::BufferProvider
/// [`Client::abort`]: crate::client::Client::abort
Alloc,

/// An AUTH packet header has been received by the client. AUTH packets are not supported by the client.
/// The client has scheduled a DISCONNECT packet with [`ReasonCode::ImplementationSpecificError`].
/// The packet body has not been decoded.
///
/// Unrecoverable error. [`crate::client::Client::abort`] should be called.
/// Unrecoverable error. [`Client::abort`] should be called.
///
/// [`Client::abort`]: crate::client::Client::abort
AuthPacketReceived,

/// The client could not connect to the broker or the broker has sent a DISCONNECT packet.
///
/// Unrecoverable error. [`crate::client::Client::abort`] should be called.
/// Unrecoverable error. [`Client::abort`] should be called.
///
/// [`Client::abort`]: crate::client::Client::abort
Disconnect {
/// The [`ReasonCode`] of the causing CONNACK or DISCONNECT packet. If the disconnection is caused
/// by a CONNACK packet, the reason code ss always erroneous.
Expand All @@ -68,7 +83,9 @@ pub enum Error<'e, const MAX_USER_PROPERTIES: usize> {
/// Another unrecoverable error has been returned earlier. The underlying connection is in a state,
/// in which it refuses/is not able to perform regular communication.
///
/// Unrecoverable error. [`crate::client::Client::abort`] should be called.
/// Unrecoverable error. [`Client::abort`] should be called.
///
/// [`Client::abort`]: crate::client::Client::abort
RecoveryRequired,

/// A republish of a packet without an in flight entry was attempted.
Expand All @@ -90,51 +107,73 @@ pub enum Error<'e, const MAX_USER_PROPERTIES: usize> {

/// A packet was too long to encode its length with the variable byte integer.
///
/// This can currently only be returned from [`crate::client::Client::publish`] or
/// [`crate::client::Client::republish`].
/// This can currently only be returned from [`Client::publish`] or [`Client::republish`].
///
/// Recoverable error. No action has been taken by the client.
///
/// [`Client::publish`]: crate::client::Client::publish
/// [`Client::republish`]: crate::client::Client::republish
PacketMaximumLengthExceeded,

/// A packet is too long and would exceed the servers maximum packet size.
///
/// Recoverable error. No action has been taken by the client.
ServerMaximumPacketSizeExceeded,

/// The value of a topic alias in an outgoing PUBLISH packet was 0 or greater than the server's maximum allowed
/// value. Sending this PUBLISH packet would raise a protocol error.
///
/// Recoverable error. No action has been taken by the client.
InvalidTopicAlias,

/// An action was rejected because an internal buffer used for tracking session state is full.
///
/// Recoverable error. Try again after a [`crate::client::event::Event`] has been emitted that indicates
/// that buffer might be free again.
/// Recoverable error. Try again after a [`Event`] has been emitted that indicates that buffer
/// might be free again.
///
/// Example:
/// [`crate::client::Client::subscribe`] returns this error. Wait until a
/// [`crate::client::event::Event::Suback`] is received.
/// [`Client::subscribe`] returns this error. Wait until a [`Event::Suback`] is received.
/// This clears a spot in the subscribe packet identifiers.
///
/// [`Event`]: crate::client::event::Event
/// [`Client::subscribe`]: crate::client::Client::subscribe
/// [`Event::Suback`]: crate::client::event::Event::Suback
SessionBuffer,

/// A publish now would exceed the server's receive maximum and ultimately cause a protocol error.
///
/// Recoverable error. Try again after either [`crate::client::event::Event::PublishAcknowledged`] or
/// [`crate::client::event::Event::PublishComplete`] has been emitted that indicates that buffer might be
/// free again.
/// been emitted that indicates that buffer might be free again.
/// Recoverable error. Try again after either [`Event::PublishAcknowledged`] or
/// [`Event::PublishComplete`] has been emitted that indicates that buffer might be free again.
///
/// [`Event::PublishAcknowledged`]: crate::client::event::Event::PublishAcknowledged
/// [`Event::PublishComplete`]: crate::client::event::Event::PublishComplete
SendQuotaExceeded,

/// An operation was attempted which the server stated it does not support. If the requested operation
/// were executed as is, a protocol error would be caused.
///
/// This could be:
/// - a shared subscription (topic filter starts with "$share") being attempted despite shared
/// subscriptions not being available on the server
/// - a subscription identifier being specified despite subscription identifiers not being available on
/// the server
/// - a wildcard occuring in a topic filter despite wildcard subscriptions not being available on the
/// server
/// - a publication with a quality of service level greater than the server's maximum quality of service
/// being attempted
/// - a publication with retain set to true being attempted despite retain not being available on the
/// server
/// - a topic alias in an outgoing publication being greater than the server's maximum topic alias value
///
/// Recoverable error. No action has been taken by the client.
UnsupportedByServer,

/// A disconnect now with the given session expiry interval would cause a protocol error.
///
/// A disconnection was attempted with a session expiry interval change where the session expiry interval in the
/// CONNECT packet was zero ([`crate::config::SessionExpiryInterval::EndOnDisconnect`]) and was
/// greater than zero ([`crate::config::SessionExpiryInterval::NeverEnd`] or
/// [`crate::config::SessionExpiryInterval::Seconds`]) in the DISCONNECT packet.
/// CONNECT packet was zero ([`SessionExpiryInterval::EndOnDisconnect`]) and was greater than zero
/// ([`SessionExpiryInterval::NeverEnd`] or [`SessionExpiryInterval::Seconds`]) in the DISCONNECT packet.
///
/// Recoverable error. Try disconnecting again without an session expiry interval or with a
/// session expiry interval of zero ([`crate::config::SessionExpiryInterval::EndOnDisconnect`]).
/// session expiry interval of zero ([`SessionExpiryInterval::EndOnDisconnect`]).
///
/// [`SessionExpiryInterval::EndOnDisconnect`]: crate::config::SessionExpiryInterval::EndOnDisconnect
/// [`SessionExpiryInterval::NeverEnd`]: crate::config::SessionExpiryInterval::NeverEnd
/// [`SessionExpiryInterval::Seconds`]: crate::config::SessionExpiryInterval::Seconds
IllegalDisconnectSessionExpiryInterval,
}

Expand All @@ -149,9 +188,9 @@ impl<const MAX_USER_PROPERTIES: usize> Error<'_, MAX_USER_PROPERTIES> {
| Self::PacketIdentifierAwaitingPubcomp
| Self::PacketMaximumLengthExceeded
| Self::ServerMaximumPacketSizeExceeded
| Self::InvalidTopicAlias
| Self::SessionBuffer
| Self::SendQuotaExceeded
| Self::UnsupportedByServer
| Self::IllegalDisconnectSessionExpiryInterval
)
}
Expand Down Expand Up @@ -186,9 +225,9 @@ impl<'e> Error<'e, 0> {
Self::PacketIdentifierAwaitingPubcomp => Error::PacketIdentifierAwaitingPubcomp,
Self::PacketMaximumLengthExceeded => Error::PacketMaximumLengthExceeded,
Self::ServerMaximumPacketSizeExceeded => Error::ServerMaximumPacketSizeExceeded,
Self::InvalidTopicAlias => Error::InvalidTopicAlias,
Self::SessionBuffer => Error::SessionBuffer,
Self::SendQuotaExceeded => Error::SendQuotaExceeded,
Self::UnsupportedByServer => Error::UnsupportedByServer,
Self::IllegalDisconnectSessionExpiryInterval => {
Error::IllegalDisconnectSessionExpiryInterval
}
Expand Down
Loading