diff --git a/CHANGELOG.md b/CHANGELOG.md index 8225d8d..ecb76a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add reason string to incoming PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK, UNSUBACK - Detect protocol error when server sends user properties (only when `MAX_USER_PROPERTIES` > 0) or a reason string in the PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK and UNSUBACK packets - Fix client not dropping network connection on receival of a DISCONNECT packet leading to a panic on subsequent call to `Client::abort` in debug builds +- Use `NonZero` for topic alias values in `TopicReference` +- Remove `From` for `TopicFilter` (a correct topic name can be an incorrect shared subscription filter) +- Add explicit support for shared subscriptions +- Prevent protocol error on shared subscriptions when shared subscriptions are not available +- Prevent protocol error on wildcard subscriptions when wildcard subscriptions are not available +- Prevent protocol error on subscription identifier being specified when subscription identifiers are not available +- Prevent protocol error on exceedance of the server's maximum quality of service +- Prevent protocol error on retained publications when retain is not available +- Replace usage of `MqttError::InvalidTopicAlias` with `MqttError::UnsupportedByServer` +- Move `ConnectInfo` into `crate::client::event` and rename to `Connected` +- Replace `ReceiveMaximum` newtype with inlined `NonZero` ## 0.5.1 - 2026-04-10 diff --git a/README.md b/README.md index 3a4df8b..de31ed3 100644 --- a/README.md +++ b/README.md @@ -26,8 +26,9 @@ The design goal is a strict yet flexible and explicit API that leverages Rust's - Flow control - Configuration & session tracking - Session recovery -- Client- & serverside maximum packet size +- Client- & server-side maximum packet size - Subscription identifiers +- Shared & wildcard subscriptions - Message expiry interval - Topic alias in outgoing publications - Request/Response @@ -78,7 +79,7 @@ async fn main() { .clean_start() .session_expiry_interval(SessionExpiryInterval::NeverEnd) .user_name(MqttString::from_str("user").unwrap()) - .password(MqttBinary::from_slice("pass").unwrap()); + .password(MqttBinary::from_slice(b"pass").unwrap()); client.connect( transport, @@ -86,15 +87,17 @@ async fn main() { Some(MqttString::from_str("rust-mqtt-demo").unwrap()), ).await.unwrap(); - let topic = TopicName::new(MqttString::from_str("demo/topic").unwrap()).unwrap(); + let topic = MqttString::from_str("demo/topic").unwrap(); client.subscribe( - topic.as_borrowed().into(), - SubscriptionOptions::new().exactly_once(), + TopicFilter::new(topic.as_borrowed()).unwrap(), + &SubscriptionOptions::new().exactly_once(), ).await.unwrap(); + let topic_reference = TopicReference::Name(TopicName::new(topic).unwrap()); + let packet_identifier = client.publish( - &PublicationOptions::new(topic.as_borrowed().into()).exactly_once(), + &PublicationOptions::new(topic_reference.as_borrowed()).exactly_once(), "Hello World!".into(), ).await.unwrap().unwrap(); @@ -124,7 +127,7 @@ async fn main() { // - Republish if PUBLISH / PUBREC may have been lost Some(CPublishFlightState::AwaitingPubrec) => client.republish( packet_identifier, - &PublicationOptions::new(topic.into()).exactly_once(), + &PublicationOptions::new(topic_reference).exactly_once(), "Hello World!".into(), ).await.unwrap(), // - Re-release if PUBREL / PUBCOMP may have been lost diff --git a/examples/demo.rs b/examples/demo.rs index 5673a97..2f6b09e 100644 --- a/examples/demo.rs +++ b/examples/demo.rs @@ -18,7 +18,7 @@ use rust_mqtt::{ }, }, config::{KeepAlive, SessionExpiryInterval}, - types::{MqttBinary, MqttString, TopicName, VarByteInt}, + types::{MqttBinary, MqttString, TopicFilter, TopicName, VarByteInt}, }; use tokio::{net::TcpStream, select, time::sleep}; @@ -90,9 +90,14 @@ async fn main() { sub_options.subscription_identifier = Some(VarByteInt::from(42u16)); } - let topic = TopicName::new(MqttString::from_str("rust-mqtt/is/great").unwrap()).unwrap(); + let topic_string = MqttString::from_str("rust-mqtt/is/great").unwrap(); + let topic_filter = TopicFilter::new(topic_string.as_borrowed()).unwrap(); + let topic_name = TopicName::new(topic_string.as_borrowed()).unwrap(); - match client.subscribe(topic.clone().into(), &sub_options).await { + match client + .subscribe(topic_filter.as_borrowed(), &sub_options) + .await + { Ok(_) => info!("Sent Subscribe"), Err(e) => { error!("Failed to subscribe: {e:?}"); @@ -119,8 +124,11 @@ async fn main() { } } - let pub_options = - PublicationOptions::new(TopicReference::Mapping(topic.clone(), 1)).exactly_once(); + let pub_options = PublicationOptions::new(TopicReference::Mapping( + topic_name.as_borrowed(), + NonZero::new(1).unwrap(), + )) + .exactly_once(); match client .publish(&pub_options, Bytes::from("anything".as_bytes())) @@ -195,7 +203,7 @@ async fn main() { } match client - .unsubscribe(topic.clone().into(), &UnsubscriptionOptions::new()) + .unsubscribe(topic_filter.as_borrowed(), &UnsubscriptionOptions::new()) .await { Ok(_) => info!("Sent Unsubscribe"), @@ -225,7 +233,8 @@ async fn main() { } // Start a Quality of Service 2 publish flow - let pub_options = PublicationOptions::new(TopicReference::Alias(1)).exactly_once(); + let pub_options = + PublicationOptions::new(TopicReference::Alias(NonZero::new(1).unwrap())).exactly_once(); let incomplete_publish_packet_identifier = match client .publish(&pub_options, Bytes::from("something".as_bytes())) @@ -289,7 +298,7 @@ async fn main() { } } - let pub_options = PublicationOptions::new(TopicReference::Name(topic.clone())).exactly_once(); + let pub_options = PublicationOptions::new(TopicReference::Name(topic_name)).exactly_once(); match client .republish( diff --git a/src/bytes.rs b/src/bytes.rs index 389b70f..015b894 100644 --- a/src/bytes.rs +++ b/src/bytes.rs @@ -3,7 +3,7 @@ use alloc::boxed::Box; use core::{borrow::Borrow, ops::Deref}; /// Contiguous bytes in memory. Is either a [`u8`] slice or (with crate feature "alloc") an owned -/// [`alloc::boxed::Box`]<[u8]>. +/// [`Box`]<[u8]>. /// /// It is recommended to almost always use owned [`Bytes`] instead of a reference to [`Bytes`], /// as it makes this type compatible for code designed for both owned and borrowed variants. @@ -11,7 +11,9 @@ use core::{borrow::Borrow, ops::Deref}; /// Important: Cloning this will clone the underlying Box if it is an owned variant. /// You can however borrow another owned [`Bytes`] by calling [`Bytes::as_borrowed`]. /// The [`Bytes::as_borrowed`] method is passed on through wrapper types, for example -/// [`crate::types::MqttString`]. +/// [`MqttString`]. +/// +/// [`MqttString`]: crate::types::MqttString pub enum Bytes<'a> { /// Owned variant, only available with the `alloc` feature enabled. #[cfg(feature = "alloc")] diff --git a/src/client/err.rs b/src/client/err.rs index b3937eb..4d636d9 100644 --- a/src/client/err.rs +++ b/src/client/err.rs @@ -7,46 +7,61 @@ use crate::{ types::{MqttString, MqttStringPair, ReasonCode, TooLargeToEncode}, }; -/// The main error returned by [`crate::client::Client`]. +/// The main error returned by [`Client`]. /// /// Distincts between unrecoverable and recoverable errors. /// Recoverability in this context refers to whether the current network connection can /// be used for further communication after the error has occured. /// /// # Recovery -/// - For unrecoverable errors, [`crate::client::Client::abort`] can be called to send an optional DISCONNECT +/// - For unrecoverable errors, [`Client::abort`] can be called to send an optional DISCONNECT /// packet if allowed by specification. You can then try to recover the session by calling -/// [`crate::client::Client::connect`] again without clean start. +/// [`Client::connect`] again without clean start. /// - For recoverable errors, follow the error-specific behaviour. +/// +/// [`Client`]: crate::client::Client +/// [`Client::abort`]: crate::client::Client::abort +/// [`Client::connect`]: crate::client::Client::connect #[derive(Debug, Clone, PartialEq, Eq)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] pub enum Error<'e, const MAX_USER_PROPERTIES: usize> { /// An underlying Read/Write method returned an error. /// - /// Unrecoverable error. [`crate::client::Client::abort`] should be called. + /// Unrecoverable error. [`Client::abort`] should be called. + /// + /// [`Client::abort`]: crate::client::Client::abort Network(ErrorKind), /// The remote server did something the client does not understand / does not match the specification. /// - /// Unrecoverable error. [`crate::client::Client::abort`] should be called. + /// Unrecoverable error. [`Client::abort`] should be called. + /// + /// [`Client::abort`]: crate::client::Client::abort Server, - /// A buffer provision by the [`crate::buffer::BufferProvider`] failed. Therefore a packet - /// could not be received correctly. + /// A buffer provision by the [`BufferProvider`] failed. Therefore a packet could not be received + /// correctly. /// - /// Unrecoverable error. [`crate::client::Client::abort`] should be called. + /// Unrecoverable error. [`Client::abort`] should be called. + /// + /// [`BufferProvider`]: crate::buffer::BufferProvider + /// [`Client::abort`]: crate::client::Client::abort Alloc, /// An AUTH packet header has been received by the client. AUTH packets are not supported by the client. /// The client has scheduled a DISCONNECT packet with [`ReasonCode::ImplementationSpecificError`]. /// The packet body has not been decoded. /// - /// Unrecoverable error. [`crate::client::Client::abort`] should be called. + /// Unrecoverable error. [`Client::abort`] should be called. + /// + /// [`Client::abort`]: crate::client::Client::abort AuthPacketReceived, /// The client could not connect to the broker or the broker has sent a DISCONNECT packet. /// - /// Unrecoverable error. [`crate::client::Client::abort`] should be called. + /// Unrecoverable error. [`Client::abort`] should be called. + /// + /// [`Client::abort`]: crate::client::Client::abort Disconnect { /// The [`ReasonCode`] of the causing CONNACK or DISCONNECT packet. If the disconnection is caused /// by a CONNACK packet, the reason code ss always erroneous. @@ -68,7 +83,9 @@ pub enum Error<'e, const MAX_USER_PROPERTIES: usize> { /// Another unrecoverable error has been returned earlier. The underlying connection is in a state, /// in which it refuses/is not able to perform regular communication. /// - /// Unrecoverable error. [`crate::client::Client::abort`] should be called. + /// Unrecoverable error. [`Client::abort`] should be called. + /// + /// [`Client::abort`]: crate::client::Client::abort RecoveryRequired, /// A republish of a packet without an in flight entry was attempted. @@ -90,10 +107,12 @@ pub enum Error<'e, const MAX_USER_PROPERTIES: usize> { /// A packet was too long to encode its length with the variable byte integer. /// - /// This can currently only be returned from [`crate::client::Client::publish`] or - /// [`crate::client::Client::republish`]. + /// This can currently only be returned from [`Client::publish`] or [`Client::republish`]. /// /// Recoverable error. No action has been taken by the client. + /// + /// [`Client::publish`]: crate::client::Client::publish + /// [`Client::republish`]: crate::client::Client::republish PacketMaximumLengthExceeded, /// A packet is too long and would exceed the servers maximum packet size. @@ -101,40 +120,60 @@ pub enum Error<'e, const MAX_USER_PROPERTIES: usize> { /// Recoverable error. No action has been taken by the client. ServerMaximumPacketSizeExceeded, - /// The value of a topic alias in an outgoing PUBLISH packet was 0 or greater than the server's maximum allowed - /// value. Sending this PUBLISH packet would raise a protocol error. - /// - /// Recoverable error. No action has been taken by the client. - InvalidTopicAlias, - /// An action was rejected because an internal buffer used for tracking session state is full. /// - /// Recoverable error. Try again after a [`crate::client::event::Event`] has been emitted that indicates - /// that buffer might be free again. + /// Recoverable error. Try again after a [`Event`] has been emitted that indicates that buffer + /// might be free again. /// /// Example: - /// [`crate::client::Client::subscribe`] returns this error. Wait until a - /// [`crate::client::event::Event::Suback`] is received. + /// [`Client::subscribe`] returns this error. Wait until a [`Event::Suback`] is received. /// This clears a spot in the subscribe packet identifiers. + /// + /// [`Event`]: crate::client::event::Event + /// [`Client::subscribe`]: crate::client::Client::subscribe + /// [`Event::Suback`]: crate::client::event::Event::Suback SessionBuffer, /// A publish now would exceed the server's receive maximum and ultimately cause a protocol error. /// - /// Recoverable error. Try again after either [`crate::client::event::Event::PublishAcknowledged`] or - /// [`crate::client::event::Event::PublishComplete`] has been emitted that indicates that buffer might be - /// free again. - /// been emitted that indicates that buffer might be free again. + /// Recoverable error. Try again after either [`Event::PublishAcknowledged`] or + /// [`Event::PublishComplete`] has been emitted that indicates that buffer might be free again. + /// + /// [`Event::PublishAcknowledged`]: crate::client::event::Event::PublishAcknowledged + /// [`Event::PublishComplete`]: crate::client::event::Event::PublishComplete SendQuotaExceeded, + /// An operation was attempted which the server stated it does not support. If the requested operation + /// were executed as is, a protocol error would be caused. + /// + /// This could be: + /// - a shared subscription (topic filter starts with "$share") being attempted despite shared + /// subscriptions not being available on the server + /// - a subscription identifier being specified despite subscription identifiers not being available on + /// the server + /// - a wildcard occuring in a topic filter despite wildcard subscriptions not being available on the + /// server + /// - a publication with a quality of service level greater than the server's maximum quality of service + /// being attempted + /// - a publication with retain set to true being attempted despite retain not being available on the + /// server + /// - a topic alias in an outgoing publication being greater than the server's maximum topic alias value + /// + /// Recoverable error. No action has been taken by the client. + UnsupportedByServer, + /// A disconnect now with the given session expiry interval would cause a protocol error. /// /// A disconnection was attempted with a session expiry interval change where the session expiry interval in the - /// CONNECT packet was zero ([`crate::config::SessionExpiryInterval::EndOnDisconnect`]) and was - /// greater than zero ([`crate::config::SessionExpiryInterval::NeverEnd`] or - /// [`crate::config::SessionExpiryInterval::Seconds`]) in the DISCONNECT packet. + /// CONNECT packet was zero ([`SessionExpiryInterval::EndOnDisconnect`]) and was greater than zero + /// ([`SessionExpiryInterval::NeverEnd`] or [`SessionExpiryInterval::Seconds`]) in the DISCONNECT packet. /// /// Recoverable error. Try disconnecting again without an session expiry interval or with a - /// session expiry interval of zero ([`crate::config::SessionExpiryInterval::EndOnDisconnect`]). + /// session expiry interval of zero ([`SessionExpiryInterval::EndOnDisconnect`]). + /// + /// [`SessionExpiryInterval::EndOnDisconnect`]: crate::config::SessionExpiryInterval::EndOnDisconnect + /// [`SessionExpiryInterval::NeverEnd`]: crate::config::SessionExpiryInterval::NeverEnd + /// [`SessionExpiryInterval::Seconds`]: crate::config::SessionExpiryInterval::Seconds IllegalDisconnectSessionExpiryInterval, } @@ -149,9 +188,9 @@ impl Error<'_, MAX_USER_PROPERTIES> { | Self::PacketIdentifierAwaitingPubcomp | Self::PacketMaximumLengthExceeded | Self::ServerMaximumPacketSizeExceeded - | Self::InvalidTopicAlias | Self::SessionBuffer | Self::SendQuotaExceeded + | Self::UnsupportedByServer | Self::IllegalDisconnectSessionExpiryInterval ) } @@ -186,9 +225,9 @@ impl<'e> Error<'e, 0> { Self::PacketIdentifierAwaitingPubcomp => Error::PacketIdentifierAwaitingPubcomp, Self::PacketMaximumLengthExceeded => Error::PacketMaximumLengthExceeded, Self::ServerMaximumPacketSizeExceeded => Error::ServerMaximumPacketSizeExceeded, - Self::InvalidTopicAlias => Error::InvalidTopicAlias, Self::SessionBuffer => Error::SessionBuffer, Self::SendQuotaExceeded => Error::SendQuotaExceeded, + Self::UnsupportedByServer => Error::UnsupportedByServer, Self::IllegalDisconnectSessionExpiryInterval => { Error::IllegalDisconnectSessionExpiryInterval } diff --git a/src/client/event.rs b/src/client/event.rs index 94d67a5..2baaa86 100644 --- a/src/client/event.rs +++ b/src/client/event.rs @@ -2,8 +2,6 @@ use heapless::Vec; -#[allow(unused_imports)] -use crate::types::QoS; use crate::{ bytes::Bytes, types::{ @@ -13,6 +11,33 @@ use crate::{ v5::{packet::GenericPubackPacket, property::Property}, }; +/// Contains information taken from a connection handshake which the client does not have to +/// store for correct operational behaviour. +/// +/// Does not include the [`ReasonCode`] as it is always [`ReasonCode::Success`] +/// (0x00) if this event is returned. +#[derive(Debug, Clone)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub struct Connected<'i, const MAX_USER_PROPERTIES: usize> { + /// If set to true, a previous session has been continued by the server for this connection. + pub session_present: bool, + + /// The server can assign a different client identifier than the one in the CONNECT packet + /// or must assign a client identifier if none was included in the CONNECT packet. This is + /// the final client identifier value used for this session and connection. + pub client_identifier: MqttString<'i>, + + /// The user property entries in the CONNACK packet. If the vector is full, this list might + /// not be exhaustive. + pub user_properties: Vec, MAX_USER_PROPERTIES>, + + /// Response information used to create response topics. + pub response_information: Option>, + + /// Another server which can be used. + pub server_reference: Option>, +} + /// Events emitted by the client when receiving an MQTT packet. #[derive(Debug)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] @@ -23,81 +48,87 @@ pub enum Event<'e, const MAX_SUBSCRIPTION_IDENTIFIERS: usize, const MAX_USER_PRO /// The server sent a PUBLISH packet. /// /// The client has acted as follows: - /// - [`QoS`] 0: No action - /// - [`QoS`] 1: A PUBACK packet has been sent to the server. - /// - [`QoS`] 2: A PUBREC packet has been sent to the server and the packet identifier is tracked as in flight + /// - [`QoS::AtMostOnce`]: No action + /// - [`QoS::AtLeastOnce`]: A PUBACK packet has been sent to the server. + /// - [`QoS::ExactlyOnce`]: A PUBREC packet has been sent to the server and the packet + /// identifier is tracked as in flight + /// + /// [`QoS::AtMostOnce`]: crate::types::QoS::AtMostOnce + /// [`QoS::AtLeastOnce`]: crate::types::QoS::AtLeastOnce + /// [`QoS::ExactlyOnce`]: crate::types::QoS::ExactlyOnce Publish(Publish<'e, MAX_SUBSCRIPTION_IDENTIFIERS, MAX_USER_PROPERTIES>), /// The server sent a SUBACK packet matching a SUBSCRIBE packet. /// - /// The subscription process is complete and was successful if the reason code indicates success. - /// The SUBSCRIBE packet won't have to be resent. + /// The subscription process is complete and was successful if the [`ReasonCode`] indicates + /// success. The SUBSCRIBE packet won't have to be resent. Suback(Suback<'e, MAX_USER_PROPERTIES>), /// The server sent an UNSUBACK packet matching an UNSUBSCRIBE packet. /// - /// The unsubscription process is complete and was successful if the reason code indicates success. - /// The UNSUBSCRIBE packet won't have to be resent. + /// The unsubscription process is complete and was successful if the [`ReasonCode`] + /// indicates success. The UNSUBSCRIBE packet won't have to be resent. Unsuback(Suback<'e, MAX_USER_PROPERTIES>), - /// The server sent a PUBACK or PUBREC with an erroneous reason code, - /// therefore rejecting the publication. + /// The server sent a PUBACK or PUBREC with an erroneous [`ReasonCode`], therefore + /// rejecting the publication. The publication process is aborted, the client has + /// removed this publication's flight state from its session and has not responded + /// with another packet. The publication can be retried with [`Client::publish`]. /// - /// The included reason code is always erroneous. + /// The included [`ReasonCode`] is always erroneous. /// - /// The publication process is aborted. + /// [`Client::publish`]: crate::client::Client::publish PublishRejected(Pubrej<'e, MAX_USER_PROPERTIES>), - /// The server sent a PUBACK packet matching a [`QoS`] 1 PUBLISH packet - /// confirming that the PUBLISH has been received. + /// The server sent a PUBACK packet matching a [`QoS::AtLeastOnce`] PUBLISH packet + /// confirming that the PUBLISH has been received. The [`QoS::AtLeastOnce`] + /// publication process is complete, the PUBLISH packet won't have to be resent. /// - /// The included reason code is always successful. + /// The included [`ReasonCode`] is always successful. /// - /// The [`QoS`] 1 publication process is complete, - /// the PUBLISH packet won't have to be resent. + /// [`QoS::AtLeastOnce`]: crate::types::QoS::AtLeastOnce PublishAcknowledged(Puback<'e, MAX_USER_PROPERTIES>), - /// The server sent a PUBREC packet matching a [`QoS`] 2 PUBLISH packet - /// confirming that the PUBLISH has been received. - /// - /// The included reason code is always successful. + /// The server sent a PUBREC packet matching a [`QoS::ExactlyOnce`] PUBLISH packet + /// confirming that the PUBLISH has been received. The first handshake of the + /// [`QoS::ExactlyOnce`] publication process is complete, the PUBLISH packet won't + /// have to be resent. The client has responded with a PUBREL packet. /// - /// The client has responded with a PUBREL packet. + /// The included [`ReasonCode`] is always successful. /// - /// The first handshake of the [`QoS`] 2 publication process is complete, - /// the PUBLISH packet won't have to be resent. + /// [`QoS::ExactlyOnce`]: crate::types::QoS::ExactlyOnce PublishReceived(Puback<'e, MAX_USER_PROPERTIES>), - /// The server sent a PUBREL packet matching a [`QoS`] 2 PUBREC packet - /// confirming that the PUBREC has been received. - /// - /// The included reason code is always successful. - /// + /// The server sent a PUBREL packet matching a [`QoS::ExactlyOnce`] PUBREC packet + /// confirming that the PUBREC has been received. The [`QoS::ExactlyOnce`] + /// publication process is complete, the PUBREC packet won't have to be resent. /// The client has responded with a PUBCOMP packet. /// - /// The [`QoS`] 2 publication process is complete, - /// the PUBREC packet won't have to be resent. + /// The included [`ReasonCode`] is always successful. + /// + /// [`QoS::ExactlyOnce`]: crate::types::QoS::ExactlyOnce PublishReleased(Puback<'e, MAX_USER_PROPERTIES>), - /// The server sent a PUBCOMP packet matching a [`QoS`] 2 PUBREL packet - /// confirming that the PUBREL has been received. + /// The server sent a PUBCOMP packet matching a [`QoS::ExactlyOnce`] PUBREL packet + /// confirming that the PUBREL has been received. The [`QoS::ExactlyOnce`] + /// publication process is complete, the PUBREL packet won't have to be resent. /// - /// The included reason code is always successful. + /// The included [`ReasonCode`] is always successful. /// - /// The [`QoS`] 2 publication process is complete, - /// the PUBREL packet won't have to be resent. + /// [`QoS::ExactlyOnce`]: crate::types::QoS::ExactlyOnce PublishComplete(Puback<'e, MAX_USER_PROPERTIES>), - /// The server sent a SUBACK, PUBACK, PUBREC, PUBREL or PUBCOMP + /// The server sent a SUBACK, UNSUBACK, PUBACK, PUBREC, PUBREL or PUBCOMP /// packet with a packet identifier that is not in flight (anymore). /// /// The client has not responded to the server or has responded appropriately /// to prevent a potential protocol deadlock. Ignored, - /// The server sent a [`QoS`] 2 PUBLISH packet which would cause a duplicate. - /// + /// The server sent a [`QoS::ExactlyOnce`] PUBLISH packet which would cause a duplicate. /// The client has responded with a PUBREC packet. + /// + /// [`QoS::ExactlyOnce`]: crate::types::QoS::ExactlyOnce Duplicate, } diff --git a/src/client/info/connect.rs b/src/client/info/connect.rs deleted file mode 100644 index 5c40bcd..0000000 --- a/src/client/info/connect.rs +++ /dev/null @@ -1,30 +0,0 @@ -use heapless::Vec; - -use crate::types::{MqttString, MqttStringPair}; - -/// Information taken from a connection handshake the client does not have to store -/// for correct operational behaviour and does not store for optimization purposes. -/// -/// Does not include the reason code as it is always [`crate::types::ReasonCode::Success`] (0x00) -/// if this is returned. -#[derive(Debug, Clone)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub struct Info<'i, const MAX_USER_PROPERTIES: usize> { - /// If set to true, a previous session is continued by the server for this connection. - pub session_present: bool, - - /// The server can assign a different client identifier than the one in the CONNECT packet - /// or must assign a client identifier if none was included in the CONNECT packet. - /// This is the final client identifier value used for this session. - pub client_identifier: MqttString<'i>, - - /// The user property entries in the CONNACK packet. If the vector is full, this list might - /// not be exhaustive. - pub user_properties: Vec, MAX_USER_PROPERTIES>, - - /// Response information used to create response topics. - pub response_information: Option>, - - /// Another server which can be used. - pub server_reference: Option>, -} diff --git a/src/client/info/mod.rs b/src/client/info/mod.rs deleted file mode 100644 index c78aea3..0000000 --- a/src/client/info/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -//! Contains information types returned after certain client actions. - -mod connect; - -pub use connect::Info as ConnectInfo; diff --git a/src/client/mod.rs b/src/client/mod.rs index 321975d..95e51dc 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -8,8 +8,7 @@ use crate::{ buffer::BufferProvider, bytes::Bytes, client::{ - event::{Event, Puback, Publish, Pubrej, Suback}, - info::ConnectInfo, + event::{Connected, Event, Puback, Publish, Pubrej, Suback}, options::{ ConnectOptions, DisconnectOptions, PublicationOptions, SubscriptionOptions, TopicReference, UnsubscriptionOptions, @@ -39,7 +38,6 @@ use crate::{ mod err; pub mod event; -pub mod info; pub mod options; pub mod raw; @@ -157,7 +155,7 @@ impl< /// Returns the amount of publications the client is allowed to make according to the server's /// receive maximum. Does not account local space for storing publication state. fn remaining_send_quota(&self) -> u16 { - self.server_config.receive_maximum.into_inner().get() - self.session.in_flight_cpublishes() + self.server_config.receive_maximum.get() - self.session.in_flight_cpublishes() } fn is_packet_identifier_used(&self, packet_identifier: PacketIdentifier) -> bool { @@ -277,7 +275,7 @@ impl< net: N, options: &ConnectOptions<'_>, client_identifier: Option>, - ) -> Result, MqttError<'c, MAX_USER_PROPERTIES>> + ) -> Result, MqttError<'c, MAX_USER_PROPERTIES>> where 'c: 'd, { @@ -457,7 +455,7 @@ impl< server_keep_alive.map_or(options.keep_alive, Property::into_inner); if let Some(r) = receive_maximum { - self.server_config.receive_maximum = r; + self.server_config.receive_maximum = r.into_inner(); } if let Some(m) = maximum_qos { self.server_config.maximum_qos = m.into_inner(); @@ -483,7 +481,7 @@ impl< info!("connected to server (session present: {})", session_present); - Ok(ConnectInfo { + Ok(Connected { session_present, client_identifier, user_properties: user_properties @@ -536,10 +534,17 @@ impl< /// If no [`Event::Suback`] is received within a custom time, /// this method can be used to send the SUBSCRIBE packet again. /// - /// A subscription identifier should only be set if the server supports - /// subscription identifiers (Can be checked with [`Self::server_config`]). - /// The client does not double-check whether this feature is supported and will - /// always include the subscription identifier argument if present. + /// Note: + /// * A topic filter with one or more wildcards should only be used if the server + /// supports wildcard subscriptions. + /// * A subscription identifier should only be set if the server supports + /// subscription identifiers. + /// * A topic filter of a shared subscriptions should only be used if the server + /// supports shared subscriptions. + /// + /// The server support of these requirements can be checked via [`Client::server_config`]. + /// If a violation occurs, the client will not subscribe but prevent the protocol error + /// and return an error. /// /// # Returns: /// The packet identifier of the sent SUBSCRIBE packet. @@ -551,6 +556,13 @@ impl< /// * [`MqttError::SessionBuffer`] if the buffer for outgoing SUBSCRIBE packet identifiers is full /// * [`MqttError::ServerMaximumPacketSizeExceeded`] if the server's maximum packet size would be /// exceeded by sending this SUBSCRIBE packet + /// * [`MqttError::UnsupportedByServer`] + /// * if the server specified in its CONNACK that wildcard subscriptions are not available and + /// the topic filter is the topic filter of a shared subscription + /// * if the server specified in its CONNACK that subscription identifiers are not available and + /// the [`SubscriptionOptions`] include a subscription identifier + /// * if the server specified in its CONNACK that shared subscriptions are not available and the + /// topic filter is the topic filter of a shared subscription /// /// # Panics /// @@ -568,6 +580,20 @@ impl< MAX_USER_PROPERTIES ); + if !self.server_config.wildcard_subscription_supported && topic_filter.has_wildcard() { + return Err(MqttError::UnsupportedByServer); + } + + if !self.server_config.subscription_identifiers_supported + && options.subscription_identifier.is_some() + { + return Err(MqttError::UnsupportedByServer); + } + + if !self.server_config.shared_subscription_supported && topic_filter.is_shared() { + return Err(MqttError::UnsupportedByServer); + } + if self.pending_suback.is_full() { info!("maximum concurrent subscriptions reached"); return Err(MqttError::SessionBuffer); @@ -672,11 +698,22 @@ impl< Ok(pid) } - /// Publish a message. If [`QoS`] is greater than 0, the packet identifier is also kept track of by the client + /// Publish a message. If [`QoS`] is greater than [`QoS::AtMostOnce`], the packet identifier is + /// also kept track of by the client. + /// + /// Note: + /// * The [`QoS`] should be less than or equal to the server's maximum [`QoS`]. + /// * The retain flag should only be set if the server supports retain. + /// * A topic alias must be less than or equal to the server's maximum topic alias. + /// + /// The server support of these requirements can be checked via [`Client::server_config`]. + /// If a violation occurs, the client will not publish but prevent the protocol error + /// and return an error. /// /// # Returns: - /// - In case of [`QoS`] 0: [`None`] - /// - In case of [`QoS`] 1 or 2: [`Some`] with the packet identifier of the published packet + /// - In case of [`QoS::AtMostOnce`]: [`None`] + /// - In case of [`QoS::AtLeastOnce`] or [`QoS::ExactlyOnce`]: [`Some`] with the packet identifier + /// of the published packet. This value is required in case of a republication attempt. /// /// # Errors /// @@ -685,13 +722,17 @@ impl< /// * [`MqttError::SendQuotaExceeded`] if the server's control flow limit is reached and sending /// the PUBLISH would exceed the limit causing a protocol error /// * [`MqttError::SessionBuffer`] if the buffer for outgoing PUBLISH packet identifiers is full - /// * [`MqttError::InvalidTopicAlias`] if a topic alias is used and - /// * its value is 0 - /// * its value is greater than the server's maximum topic alias /// * [`MqttError::PacketMaximumLengthExceeded`] if the PUBLISH packet is too long to be encoded /// with MQTT's [`VarByteInt`] /// * [`MqttError::ServerMaximumPacketSizeExceeded`] if the server's maximum packet size would be /// exceeded by sending this PUBLISH packet + /// * [`MqttError::UnsupportedByServer`] + /// * if the quality of service level in the [`PublicationOptions`] is greater than the maximum + /// value specified in the server's CONNACK packet + /// * if the server specified in its CONNACK that retain is not available and a publication with + /// the retain flag set to true is attempted + /// * if a topic alias is used and its value is greater than the maximum value specified in the + /// server's CONNACK packet /// /// # Panics /// @@ -709,6 +750,23 @@ impl< MAX_USER_PROPERTIES ); + if options.qos > self.server_config.maximum_qos { + return Err(MqttError::UnsupportedByServer); + } + + if !self.server_config.retain_supported && options.retain { + return Err(MqttError::UnsupportedByServer); + } + + if options + .topic + .alias() + .map(NonZero::get) + .is_some_and(|a| a > self.server_config.topic_alias_maximum) + { + return Err(MqttError::UnsupportedByServer); + } + if options.qos > QoS::AtMostOnce { if self.remaining_send_quota() == 0 { info!("server receive maximum reached"); @@ -726,14 +784,6 @@ impl< QoS::ExactlyOnce => IdentifiedQoS::ExactlyOnce(self.packet_identifier()), }; - if options - .topic - .alias() - .is_some_and(|a| !(1..=self.server_config.topic_alias_maximum).contains(&a)) - { - return Err(MqttError::InvalidTopicAlias); - } - let packet = PublishPacket::<0, MAX_USER_PROPERTIES>::new( false, identified_qos, @@ -797,11 +847,22 @@ impl< /// as resending packets at any other time is a protocol error. /// (Compare [Message delivery retry](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901238), \[MQTT-4.4.0-1\]). /// - /// For a packet to be resent: - /// - it must have a quality of service > 0 - /// - its packet identifier must have an in flight entry with a quality of service matching the - /// quality of service in the options parameter - /// - in case of quality of service 2, it must not already be awaiting a PUBCOMP packet + /// Note: + /// * Server-side constraints: + /// * The [`QoS`] should be less than or equal to the server's maximum [`QoS`]. + /// * The retain flag should only be set if the server supports retain. + /// * A topic alias must be less than or equal to the server's maximum topic alias. + /// * Client-side preconditions: + /// * The [`QoS`] must be [`QoS::AtLeastOnce`] or [`QoS::ExactlyOnce`] and must be the same as + /// that of the original publication. + /// * The packet identifier must have an in flight entry with the same [`QoS`] as the value + /// in the options parameter. + /// * If [`QoS`] is [`QoS::ExactlyOnce`], the in flight entry it must not already be awaiting + /// the PUBCOMP packet. + /// + /// If a violation occurs, the client will not publish but prevent the protocol error + /// and return an error. The server support of these requirements can be checked via + /// [`Client::server_config`]. /// /// # Errors /// @@ -813,13 +874,17 @@ impl< /// has already been received and the server has therefore already received the PUBLISH /// * [`MqttError::PacketIdentifierNotInFlight`] if this packet identifier is not tracked in the /// client's session - /// * [`MqttError::InvalidTopicAlias`] if a topic alias is used and - /// * its value is 0 - /// * its value is greater than the server's maximum topic alias /// * [`MqttError::PacketMaximumLengthExceeded`] if the PUBLISH packet is too long to be encoded /// with MQTT's [`VarByteInt`] /// * [`MqttError::ServerMaximumPacketSizeExceeded`] if the server's maximum packet size would be /// exceeded by sending this PUBLISH packet + /// * [`MqttError::UnsupportedByServer`] + /// * if the quality of service level in the [`PublicationOptions`] is greater than the maximum + /// value specified in the server's CONNACK packet + /// * if the server specified in its CONNACK that retain is not available and a publication with + /// the retain flag set to true is attempted + /// * if a topic alias is used and its value is greater than the maximum value specified in the + /// server's CONNACK packet /// /// # Panics /// @@ -839,8 +904,27 @@ impl< MAX_USER_PROPERTIES ); - if options.qos == QoS::AtMostOnce { - panic!("QoS 0 packets cannot be republished"); + assert_ne!( + options.qos, + QoS::AtMostOnce, + "QoS 0 packets cannot be republished" + ); + + if options.qos > self.server_config.maximum_qos { + return Err(MqttError::UnsupportedByServer); + } + + if !self.server_config.retain_supported && options.retain { + return Err(MqttError::UnsupportedByServer); + } + + if options + .topic + .alias() + .map(NonZero::get) + .is_some_and(|a| a > self.server_config.topic_alias_maximum) + { + return Err(MqttError::UnsupportedByServer); } let identified_qos = match self.session.cpublish_flight_state(packet_identifier) { @@ -878,14 +962,6 @@ impl< } }; - if options - .topic - .alias() - .is_some_and(|a| !(1..=self.server_config.topic_alias_maximum).contains(&a)) - { - return Err(MqttError::InvalidTopicAlias); - } - let packet = PublishPacket::<0, MAX_USER_PROPERTIES>::new( true, identified_qos, @@ -933,9 +1009,12 @@ impl< /// Resends all pending PUBREL packets. /// - /// This method must be called and must only be called after a reconnection - /// with clean start set to 0, as resending packets at any other time is a protocol error. - /// (Compare [Message delivery retry](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901238), \[MQTT-4.4.0-1\]). + /// This method must only be called immediately upon reconnection before any call to [`Client::publish`] + /// or [`Client::republish`] (with [`QoS::ExactlyOnce`]) followed by a call to [`Client::poll`] or + /// [`Client::poll_body`] returning [`Event::PublishReceived`], as this combination of events results in + /// a new session entry that would be rereleased in this method, causing a protocol error (Compare + /// [Message delivery retry](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901238), + /// \[MQTT-4.4.0-1\]). /// /// This method assumes that the server's receive maximum after the reconnection is great enough /// to handle as many publication flows as dragged between the two connections. @@ -972,6 +1051,10 @@ impl< /// After an MQTT communication fails, usually either the client or the server closes the connection. /// /// This is not cancel-safe but you can set a timeout if reconnecting later anyway or you don't reuse the client. + /// + /// # Panics + /// + /// This function may panic if the client has not returned an unrecoverable error before. #[inline] pub async fn abort(&mut self) { match self.raw.abort().await { @@ -1144,7 +1227,8 @@ impl< /// * the server causes a protocol error /// * the packet following this header exceeds the client's maximum packet size /// * the server sends a PUBLISH packet with an invalid topic alias - /// * the server exceeded the client's receive maximum with a new [`QoS`] 2 PUBLISH + /// * the server exceeded the client's receive maximum with a new [`QoS::ExactlyOnce`] + /// PUBLISH /// * the server sends a PUBACK/PUBREC/PUBREL/PUBCOMP packet which mismatches what /// the client expects for this packet identifier from its session state /// * the fixed header has the packet type CONNECT/SUBSCRIBE/UNSUBSCRIBE/PINGREQ diff --git a/src/client/options/connect.rs b/src/client/options/connect.rs index d0248cc..866f5f3 100644 --- a/src/client/options/connect.rs +++ b/src/client/options/connect.rs @@ -41,8 +41,10 @@ pub struct Options<'c> { pub request_problem_information: bool, /// Arbitrary key-value pairs of strings sent as the user property entries of the CONNECT packet. - /// Note that this slice's length must be less than [`crate::client::Client`]'s const generic - /// parameter `MAX_USER_PROPERTIES`. + /// Note that this slice's length must be less than [`Client`]'s const generic parameter + /// `MAX_USER_PROPERTIES`. + /// + /// [`Client`]: crate::client::Client pub user_properties: &'c [MqttStringPair<'c>], /// The user name the client wishes to authenticate with. @@ -120,8 +122,10 @@ impl<'c> Options<'c> { self.request_problem_information = true; self } - /// Sets the user properties. Note that this slice's length must be less than - /// [`crate::client::Client`]'s const generic parameter `MAX_USER_PROPERTIES`. + /// Sets the user properties. Note that this slice's length must be less than [`Client`]'s + /// const generic parameter `MAX_USER_PROPERTIES`. + /// + /// [`Client`]: crate::client::Client #[must_use] pub const fn user_properties(mut self, user_properties: &'c [MqttStringPair<'c>]) -> Self { self.user_properties = user_properties; diff --git a/src/client/options/disconnect.rs b/src/client/options/disconnect.rs index 3956c5c..c330d0d 100644 --- a/src/client/options/disconnect.rs +++ b/src/client/options/disconnect.rs @@ -1,24 +1,31 @@ use crate::{config::SessionExpiryInterval, types::MqttStringPair}; -#[allow(unused_imports)] -use crate::types::ReasonCode; - /// Options for a disconnection to the server with a DISCONNECT packet. #[derive(Debug, Clone, Copy)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] pub struct Options<'d> { /// If set to true, the client uses [`ReasonCode::DisconnectWithWillMessage`] in the /// DISCONNECT packet and the server publishes the will message. + /// + /// [`ReasonCode::DisconnectWithWillMessage`]: crate::types::ReasonCode::DisconnectWithWillMessage pub publish_will: bool, - /// The session expiry interval property. Not allowed to be set to a non-zero value - /// if the session expiry interval property in the CONNECT packet has been 0. - /// This value overrides the session expiry interval negotiated in the handshake. + /// The session expiry interval property. This value overrides the session expiry interval + /// negotiated in the handshake. + /// + /// Must not to a non-zero value (`Some(`[`SessionExpiryInterval::EndOnDisconnect`]`)`) if the + /// session expiry interval property in the CONNECT packet has been zero (can be checked via + /// [`Client::client_config`]). The client will not disconnect if a violation occurs but prevent + /// the protocol error and return an error. + /// + /// [`Client::client_config`]: crate::client::Client::client_config pub session_expiry_interval: Option, /// Arbitrary key-value pairs of strings sent as the user property entries of the DISCONNECT - /// packet. Note that this slice's length must be less than [`crate::client::Client`]'s const - /// generic parameter `MAX_USER_PROPERTIES`. + /// packet. Note that this slice's length must be less than [`Client`]'s const generic parameter + /// `MAX_USER_PROPERTIES`. + /// + /// [`Client`]: crate::client::Client pub user_properties: &'d [MqttStringPair<'d>], } @@ -51,8 +58,10 @@ impl<'d> Options<'d> { self.session_expiry_interval = Some(interval); self } - /// Sets the user properties. Note that this slice's length must be less than - /// [`crate::client::Client`]'s const generic parameter `MAX_USER_PROPERTIES`. + /// Sets the user properties. Note that this slice's length must be less than [`Client`]'s + /// const generic parameter `MAX_USER_PROPERTIES`. + /// + /// [`Client`]: crate::client::Client #[must_use] pub const fn user_properties(mut self, user_properties: &'d [MqttStringPair<'d>]) -> Self { self.user_properties = user_properties; diff --git a/src/client/options/publish.rs b/src/client/options/publish.rs index 629aecd..0056138 100644 --- a/src/client/options/publish.rs +++ b/src/client/options/publish.rs @@ -1,3 +1,5 @@ +use core::num::NonZero; + use const_fn::const_fn; use crate::types::{MqttBinary, MqttString, MqttStringPair, QoS, TopicName}; @@ -10,17 +12,35 @@ pub struct Options<'p> { /// The quality of service level used by the server to send this publication /// to subscribed clients is the minimum of this value and the quality of service /// value of the receiving client's subscription. + /// + /// Must be less than or equal to the server's maximum quality of service level + /// (can be checked via [`Client::server_config`]). The client will not publish + /// if a violation occurs but prevent the protocol error and return an error. + /// + /// [`Client::server_config`]: crate::client::Client::server_config pub qos: QoS, /// Depicts the value of the retain flag in the PUBLISH packet. /// If set to 1, the server should retain the message on this topic. /// Retained messages with quality of service 0 can be discarded /// at any time by the server. + /// + /// Must be false when the server does not support retain (can be checked via + /// [`Client::server_config`]). The client will not publish if a violation occurs + /// but prevent the protocol error and return an error. + /// + /// [`Client::server_config`]: crate::client::Client::server_config pub retain: bool, /// The topic that the message is published on. The topic can be referenced over /// an existing topic alias mapping or by specifying the topic name and optionally /// mapping a topic alias to it. + /// + /// If an alias is used, it must be less than or equal to the server's maximum topic + /// alias (can be checked via [`Client::server_config`]). The client will not publish + /// if a violation occurs but prevent the protocol error and return an error. + /// + /// [`Client::server_config`]: crate::client::Client::server_config pub topic: TopicReference<'p>, /// Indicates whether the message is valid UTF-8. If [`None`], there is no statement @@ -41,7 +61,9 @@ pub struct Options<'p> { pub correlation_data: Option>, /// Arbitrary key-value pairs of strings. Note that this slice's length must be less than - /// [`crate::client::Client`]'s const generic parameter `MAX_USER_PROPERTIES`. + /// [`Client`]'s const generic parameter `MAX_USER_PROPERTIES`. + /// + /// [`Client`]: crate::client::Client pub user_properties: &'p [MqttStringPair<'p>], /// The custom content type of the message. @@ -67,22 +89,33 @@ impl<'p> Options<'p> { } /// Sets the Quality of Service level. + /// + /// Note that this level must be less than or equal to the server's + /// maximum quality of service level. #[must_use] pub const fn qos(mut self, qos: QoS) -> Self { self.qos = qos; self } - /// Sets the Quality of Service level to 1 (At Least Once). + /// Sets the Quality of Service level to 1 ([`QoS::AtLeastOnce`]). + /// + /// Note that this is only allowed if the server's maximum quality + /// of service is 1 or 2. #[must_use] pub const fn at_least_once(self) -> Self { self.qos(QoS::AtLeastOnce) } - /// Sets the Quality of Service level to 2 (Exactly Once). + /// Sets the Quality of Service level to 2 ([`QoS::ExactlyOnce`]). + /// + /// Note that this is only allowed if the server's maximum quality + /// of service is 2. #[must_use] pub const fn exactly_once(self) -> Self { self.qos(QoS::ExactlyOnce) } /// Sets the retain flag to true. + /// + /// Note that this is only allowed if the server supports retain. #[must_use] pub const fn retain(mut self) -> Self { self.retain = true; @@ -114,8 +147,10 @@ impl<'p> Options<'p> { self.correlation_data = Some(data); self } - /// Sets the user properties. Note that this slice's length must be less than - /// [`crate::client::Client`]'s const generic parameter `MAX_USER_PROPERTIES`. + /// Sets the user properties. Note that this slice's length must be less than [`Client`]'s + /// const generic parameter `MAX_USER_PROPERTIES`. + /// + /// [`Client`]: crate::client::Client #[must_use] pub const fn user_properties(mut self, user_properties: &'p [MqttStringPair<'p>]) -> Self { self.user_properties = user_properties; @@ -142,15 +177,15 @@ pub enum TopicReference<'t> { /// Publish to an already mapped topic alias. The alias must have been defined earlier /// in the network connection. - Alias(u16), + Alias(NonZero), /// Create a new topic alias or replace an existing topic alias. /// The alias lasts until the end of the network connection. - Mapping(TopicName<'t>, u16), + Mapping(TopicName<'t>, NonZero), } impl<'t> TopicReference<'t> { - pub(crate) fn alias(&self) -> Option { + pub(crate) fn alias(&self) -> Option> { match self { Self::Name(_) => None, Self::Alias(alias) => Some(*alias), @@ -165,7 +200,9 @@ impl<'t> TopicReference<'t> { } } - /// Delegates to [`crate::Bytes::as_borrowed`]. + /// Delegates to [`Bytes::as_borrowed`]. + /// + /// [`Bytes::as_borrowed`]: crate::Bytes::as_borrowed #[must_use] pub fn as_borrowed(&'t self) -> Self { match self { diff --git a/src/client/options/subscribe.rs b/src/client/options/subscribe.rs index 86a298b..f3a4f1c 100644 --- a/src/client/options/subscribe.rs +++ b/src/client/options/subscribe.rs @@ -4,7 +4,7 @@ use crate::types::{MqttStringPair, QoS, VarByteInt}; #[derive(Debug, Clone, Copy)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] pub struct Options<'s> { - /// Serverside retain handling configuration for this subscription. + /// Server-side retain handling configuration for this subscription. pub retain_handling: RetainHandling, /// If set to true, the server sets the retain flag of a PUBLISH packet matching @@ -20,19 +20,27 @@ pub struct Options<'s> { /// If set to true on a shared subscription, a protocol error is triggered. pub no_local: bool, - /// The maximum quality of service that the server can forward publications - /// matching this subscription with to the client. A quality of service level + /// The maximum quality of service that the server is allowed to forward publications + /// at matching this subscription with to the client. A quality of service level /// lower than this can be granted by the server. pub qos: QoS, /// The subscription identifier of the subscription. The server will set /// subscription identifier properties in its PUBLISH packets to the values of /// all matching subscriptions with a subscription identifier. + /// + /// Must be [`None`] when the server does not support retain (can be checked via + /// [`Client::server_config`]). The client will not subscribe if a violation occurs + /// but prevent the protocol error and return an error. + /// + /// [`Client::server_config`]: crate::client::Client::server_config pub subscription_identifier: Option, - /// Arbitrary key-value pairs of strings sent as the user property entries of the SUBSCRIBE packet. - /// Note that this slice's length must be less than [`crate::client::Client`]'s const generic - /// parameter `MAX_USER_PROPERTIES`. + /// Arbitrary key-value pairs of strings sent as the user property entries of the + /// SUBSCRIBE packet. Note that this slice's length must be less than [`Client`]'s + /// const generic parameter `MAX_USER_PROPERTIES`. + /// + /// [`Client`]: crate::client::Client pub user_properties: &'s [MqttStringPair<'s>], } @@ -63,17 +71,17 @@ impl<'s> Options<'s> { self.qos = qos; self } - /// Sets the Quality of Service level to 1 (At Least Once). + /// Sets the Quality of Service level to 1 ([`QoS::AtLeastOnce`]). #[must_use] pub const fn at_least_once(self) -> Self { self.qos(QoS::AtLeastOnce) } - /// Sets the Quality of Service level to 2 (Exactly Once). + /// Sets the Quality of Service level to 2 ([`QoS::ExactlyOnce`]). #[must_use] pub const fn exactly_once(self) -> Self { self.qos(QoS::ExactlyOnce) } - /// Sets the serverside retain handling configuration for this subscription. + /// Sets the server-side retain handling configuration for this subscription. #[must_use] pub const fn retain_handling(mut self, retain_handling: RetainHandling) -> Self { self.retain_handling = retain_handling; @@ -92,13 +100,17 @@ impl<'s> Options<'s> { self } /// Sets the subscription identifier property. + /// + /// Note that this is only allowed if the server supports subscription identifiers. #[must_use] pub const fn subscription_identifier(mut self, subscription_identifier: VarByteInt) -> Self { self.subscription_identifier = Some(subscription_identifier); self } - /// Sets the user properties. Note that this slice's length must be less than - /// [`crate::client::Client`]'s const generic parameter `MAX_USER_PROPERTIES`. + /// Sets the user properties. Note that this slice's length must be less than [`Client`]'s + /// const generic parameter `MAX_USER_PROPERTIES`. + /// + /// [`Client`]: crate::client::Client #[must_use] pub const fn user_properties(mut self, user_properties: &'s [MqttStringPair<'s>]) -> Self { self.user_properties = user_properties; @@ -106,7 +118,7 @@ impl<'s> Options<'s> { } } -/// Serverside retain handling configuration for a subscription. +/// Server-side retain handling configuration for a subscription. #[derive(Debug, Clone, Copy, Default)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] pub enum RetainHandling { diff --git a/src/client/options/unsubscribe.rs b/src/client/options/unsubscribe.rs index 101c16f..d1df545 100644 --- a/src/client/options/unsubscribe.rs +++ b/src/client/options/unsubscribe.rs @@ -4,9 +4,11 @@ use crate::types::MqttStringPair; #[derive(Debug, Clone, Copy)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] pub struct Options<'s> { - /// Arbitrary key-value pairs of strings sent as the user property entries of the UNSUBSCRIBE packet. - /// Note that this slice's length must be less than [`crate::client::Client`]'s const generic - /// parameter `MAX_USER_PROPERTIES`. + /// Arbitrary key-value pairs of strings sent as the user property entries of the + /// UNSUBSCRIBE packet. Note that this slice's length must be less than [`Client`]'s + /// const generic parameter `MAX_USER_PROPERTIES`. + /// + /// [`Client`]: crate::client::Client pub user_properties: &'s [MqttStringPair<'s>], } @@ -25,8 +27,10 @@ impl<'s> Options<'s> { } } - /// Sets the user properties. Note that this slice's length must be less than - /// [`crate::client::Client`]'s const generic parameter `MAX_USER_PROPERTIES`. + /// Sets the user properties. Note that this slice's length must be less than [`Client`]'s + /// const generic parameter `MAX_USER_PROPERTIES`. + /// + /// [`Client`]: crate::client::Client #[must_use] pub const fn user_properties(mut self, user_properties: &'s [MqttStringPair<'s>]) -> Self { self.user_properties = user_properties; diff --git a/src/client/options/will.rs b/src/client/options/will.rs index 47b2881..f79f1ac 100644 --- a/src/client/options/will.rs +++ b/src/client/options/will.rs @@ -52,7 +52,9 @@ pub struct Options<'c> { pub correlation_data: Option>, /// The user properties in the will publication. Note that this slice's length must be less than - /// [`crate::client::Client`]'s const generic parameter `MAX_USER_PROPERTIES`. + /// [`Client`]'s const generic parameter `MAX_USER_PROPERTIES`. + /// + /// [`Client`]: crate::client::Client pub user_properties: &'c [MqttStringPair<'c>], /// The payload of the will publication. @@ -85,12 +87,12 @@ impl<'c> Options<'c> { self.will_qos = qos; self } - /// Sets the Quality of Service level to 1 (At Least Once). + /// Sets the Quality of Service level to 1 ([`QoS::AtLeastOnce`]). #[must_use] pub const fn at_least_once(self) -> Self { self.qos(QoS::AtLeastOnce) } - /// Sets the Quality of Service level to 2 (Exactly Once). + /// Sets the Quality of Service level to 2 ([`QoS::ExactlyOnce`]). #[must_use] pub const fn exactly_once(self) -> Self { self.qos(QoS::ExactlyOnce) @@ -141,8 +143,10 @@ impl<'c> Options<'c> { self.correlation_data = Some(correlation_data); self } - /// Sets the user properties. Note that this slice's length must be less than - /// [`crate::client::Client`]'s const generic parameter `MAX_USER_PROPERTIES`. + /// Sets the user properties. Note that this slice's length must be less than [`Client`]'s + /// const generic parameter `MAX_USER_PROPERTIES`. + /// + /// [`Client`]: crate::client::Client #[must_use] pub const fn user_properties(mut self, user_properties: &'c [MqttStringPair<'c>]) -> Self { self.user_properties = user_properties; diff --git a/src/client/raw/err.rs b/src/client/raw/err.rs index ee7cb04..f31fc65 100644 --- a/src/client/raw/err.rs +++ b/src/client/raw/err.rs @@ -14,7 +14,9 @@ pub(crate) enum Error { /// The network is in a faulty state. Disconnected, - /// A buffer provision by the [`crate::buffer::BufferProvider`] failed. + /// A buffer provision by the [`BufferProvider`] failed. + /// + /// [`BufferProvider`]: crate::buffer::BufferProvider Alloc(B), /// Malformed packet or Protocol Error. diff --git a/src/config/client.rs b/src/config/client.rs index 4c5176e..2169b10 100644 --- a/src/config/client.rs +++ b/src/config/client.rs @@ -1,23 +1,43 @@ use crate::{config::SessionExpiryInterval, types::VarByteInt}; /// Configuration of the client which must be upheld by the server. +/// These values are used by the client to enforce protocol correctness. +/// +/// Other values which might be added in the future here are: +/// * Client's receive maximum: this is currently a const generic of the client type +/// and is therefore not required as an in-memory value. +/// * Client's topic alias maximum: incoming topic aliases are currently not supported, +/// this is configured into the protocol via a topic alias maximum value of 0. #[derive(Debug, Clone, Copy)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] pub struct Config { - /// The session expiry interval requested by the client. - /// This field is used to determine whether a non-zero session expiry interval - /// can be used when disconnecting. - /// This value is configured when calling [`crate::client::Client::connect`] - /// from the value in [`crate::client::options::ConnectOptions`] + /// The session expiry interval requested by the client. Note that this is NOT + /// necessarily the actual session expiry interval value in force which is used by + /// the server, as the server can overwrite this value in its CONNACK packet. The + /// negotiated and applicable value is stored in [`SharedConfig`]. + /// + /// This field is still required however to determine whether a non-zero session + /// expiry interval can be used when disconnecting. + /// + /// This value is configured when calling [`Client::connect`] from the value in + /// [`ConnectOptions`]. + /// + /// [`SharedConfig`]: crate::config::SharedConfig + /// [`Client::connect`]: crate::client::Client::connect + /// [`ConnectOptions`]: crate::client::options::ConnectOptions pub session_expiry_interval: SessionExpiryInterval, /// The maximum packet size the client is willing to accept transformed into the /// maximum value of the remaining length that is therefore accepted. - /// This value is configured when calling [`crate::client::Client::connect`] - /// from the value in [`crate::client::options::ConnectOptions`] + /// + /// This value is configured when calling [`Client::connect`] from the value in + /// [`ConnectOptions`]. + /// + /// [`Client::connect`]: crate::client::Client::connect + /// [`ConnectOptions`]: crate::client::options::ConnectOptions pub maximum_accepted_remaining_length: u32, - /// Whether reason string and user properties are sent (by the server) in case + /// Whether reason strings and user properties are sent (by the server) in case /// of failures. In reality, this depends a lot on the specific server implementation /// and only follows this rule: /// @@ -27,12 +47,12 @@ pub struct Config { /// /// [MQTTv5.0 3.1.2.11.7](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901053) /// - /// This value is configured when calling [`crate::client::Client::connect`] - /// from the value in [`crate::client::options::ConnectOptions`] + /// This value is configured when calling [`Client::connect`] from the value in + /// [`ConnectOptions`]. + /// + /// [`Client::connect`]: crate::client::Client::connect + /// [`ConnectOptions`]: crate::client::options::ConnectOptions pub request_problem_information: bool, - // receive_maximum - // topic_alias_maximum - // request_response_information this requests a response_information property in the CONNACK } impl Default for Config { diff --git a/src/config/mod.rs b/src/config/mod.rs index 5e5bb7d..f681617 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -50,8 +50,10 @@ pub enum SessionExpiryInterval { #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] pub enum MaximumPacketSize { - /// There is no imposed limit on how large packets can be. - /// The technical limit is [`crate::types::VarByteInt::MAX_ENCODABLE`] + 5 (size of fixed header). + /// There is no imposed limit on how large packets can be. The technical limit is + /// [`VarByteInt::MAX_ENCODABLE`] + 5 (size of fixed header). + /// + /// [`VarByteInt::MAX_ENCODABLE`]: crate::types::VarByteInt::MAX_ENCODABLE #[default] Unlimited, @@ -69,10 +71,3 @@ impl MaximumPacketSize { } } } - -/// Maximum concurrent publications with a Quality of Service > 0. -/// -/// Default is 65536 / [`u16::MAX`] and is used when no receive maximum is present. Can't be zero. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub struct ReceiveMaximum(pub NonZero); diff --git a/src/config/server.rs b/src/config/server.rs index d10517d..0ecce91 100644 --- a/src/config/server.rs +++ b/src/config/server.rs @@ -1,16 +1,15 @@ use core::num::NonZero; -use crate::{ - config::{MaximumPacketSize, ReceiveMaximum}, - types::QoS, -}; +use crate::{config::MaximumPacketSize, types::QoS}; /// Configuration of the server which must be upheld by the client. +/// These values are used by the client to enforce protocol correctness. #[derive(Debug, Clone, Copy)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] pub struct Config { - /// Maximum concurrent [`QoS`] 1 & 2 publications that the server is willing to accept. - pub receive_maximum: ReceiveMaximum, + /// Maximum concurrent [`QoS::AtLeastOnce`] & [`QoS::ExactlyOnce`] publications that the + /// server is willing to accept. + pub receive_maximum: NonZero, /// Maximum supported [`QoS`] by the server. pub maximum_qos: QoS, @@ -25,18 +24,18 @@ pub struct Config { /// Equal to the number of distinct topic aliases the server supports. pub topic_alias_maximum: u16, - /// Serverside support for wildcard subscriptions. + /// Server-side support for wildcard subscriptions. pub wildcard_subscription_supported: bool, - /// Serverside support for subscription identifiers. + /// Server-side support for subscription identifiers. pub subscription_identifiers_supported: bool, - /// Serverside support for shared subscriptions. + /// Server-side support for shared subscriptions. pub shared_subscription_supported: bool, } impl Default for Config { fn default() -> Self { Self { - receive_maximum: ReceiveMaximum(NonZero::new(u16::MAX).unwrap()), + receive_maximum: NonZero::new(u16::MAX).unwrap(), maximum_qos: QoS::ExactlyOnce, retain_supported: true, maximum_packet_size: MaximumPacketSize::default(), diff --git a/src/config/shared.rs b/src/config/shared.rs index c98383c..c2b45a7 100644 --- a/src/config/shared.rs +++ b/src/config/shared.rs @@ -4,9 +4,16 @@ use crate::config::{KeepAlive, SessionExpiryInterval}; #[derive(Debug, Clone, Copy, Default)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] pub struct Config { - /// The negotiated [`KeepAlive`] interval in seconds allowed to expire between sending two packets without the connection being closed. + /// The negotiated [`KeepAlive`] interval in seconds allowed to expire between + /// sending two packets without the connection being closed. pub keep_alive: KeepAlive, /// The negotiated [`SessionExpiryInterval`] after the connection has been closed. + /// This value may be altered by the client with a DISCONNECT packet, but if the + /// session expiry interval requested with the CONNECT packet (in [`ClientConfig`]) + /// is [`SessionExpiryInterval::EndOnDisconnect`] (a value of zero), a non-zero + /// session expiry interval value in the DISCONNECT packet is not allowed. + /// + /// [`ClientConfig`]: crate::config::ClientConfig pub session_expiry_interval: SessionExpiryInterval, } diff --git a/src/io/read.rs b/src/io/read.rs index 26d59b7..b37472d 100644 --- a/src/io/read.rs +++ b/src/io/read.rs @@ -1,4 +1,4 @@ -use core::{cmp::min, marker::PhantomData}; +use core::{cmp::min, marker::PhantomData, num::NonZero}; use crate::{ buffer::BufferProvider, @@ -42,6 +42,11 @@ macro_rules! int_read_impl { .map(Self::from_be_bytes) } } + impl Readable for NonZero<$int> { + async fn read(read: &mut R) -> Result> { + NonZero::new(<$int>::read(read).await?).ok_or(ReadError::ProtocolError) + } + } }; } diff --git a/src/io/write.rs b/src/io/write.rs index f929d4a..0863fae 100644 --- a/src/io/write.rs +++ b/src/io/write.rs @@ -1,3 +1,5 @@ +use core::num::NonZero; + use crate::{ eio::Write, fmt::unreachable, @@ -56,6 +58,14 @@ macro_rules! int_write_impl { self.to_be_bytes().write(write).await } } + impl Writable for NonZero<$int> { + fn written_len(&self) -> usize { + self.get().written_len() + } + async fn write(&self, write: &mut W) -> Result<(), WriteError> { + self.get().write(write).await + } + } }; } diff --git a/src/session/flight.rs b/src/session/flight.rs index 5ee5e9d..ff99634 100644 --- a/src/session/flight.rs +++ b/src/session/flight.rs @@ -1,9 +1,9 @@ use crate::types::PacketIdentifier; -#[allow(unused_imports)] -use crate::types::QoS; - -/// An incomplete [`QoS`] 1 or 2 publication. +/// An incomplete [`QoS::AtLeastOnce`] or [`QoS::ExactlyOnce`] publication. +/// +/// [`QoS::AtLeastOnce`]: crate::types::QoS::AtLeastOnce +/// [`QoS::ExactlyOnce`]: crate::types::QoS::ExactlyOnce #[derive(Debug, Clone, Copy, PartialEq, Eq)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] pub struct InFlightPublish { @@ -13,26 +13,37 @@ pub struct InFlightPublish { pub state: S, } -/// The state of an incomplete [`QoS`] 1 or 2 publication by the client. +/// The state of an incomplete [`QoS::AtLeastOnce`] or [`QoS::ExactlyOnce`] publication by the client. +/// +/// [`QoS::AtLeastOnce`]: crate::types::QoS::AtLeastOnce +/// [`QoS::ExactlyOnce`]: crate::types::QoS::ExactlyOnce #[derive(Debug, Clone, Copy, PartialEq, Eq)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] pub enum CPublishFlightState { - /// A [`QoS`] 1 PUBLISH packet has been sent. + /// A [`QoS::AtLeastOnce`] PUBLISH packet has been sent. /// The next step in the handshake is the server sending a PUBACK packet. + /// + /// [`QoS::AtLeastOnce`]: crate::types::QoS::AtLeastOnce AwaitingPuback, - /// A [`QoS`] 2 PUBLISH packet has been sent. + /// A [`QoS::ExactlyOnce`] PUBLISH packet has been sent. /// The next step in the handshake is the server sending a PUBREC packet. + /// + /// [`QoS::ExactlyOnce`]: crate::types::QoS::ExactlyOnce AwaitingPubrec, /// A PUBREC packet has been received and responded to with a PUBREL packet. /// The last step in the handshake is the server sending a PUBCOMP packet. AwaitingPubcomp, } -/// The state of an incomplete [`QoS`] 2 publication by the server. +/// The state of an incomplete [`QoS::ExactlyOnce`] publication by the server. +/// +/// [`QoS::ExactlyOnce`]: crate::types::QoS::ExactlyOnce #[derive(Debug, Clone, Copy, PartialEq, Eq)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] pub enum SPublishFlightState { - /// A [`QoS`] 2 packet has been received and responded to with a PUBREC packet. + /// A [`QoS::ExactlyOnce`] packet has been received and responded to with a PUBREC packet. /// The next step in the handshake is the server sending a PUBREL packet. + /// + /// [`QoS::ExactlyOnce`]: crate::types::QoS::ExactlyOnce AwaitingPubrel, } diff --git a/src/types/qos.rs b/src/types/qos.rs index 15a6cfe..0d5b5d1 100644 --- a/src/types/qos.rs +++ b/src/types/qos.rs @@ -1,14 +1,29 @@ use crate::types::PacketIdentifier; -/// MQTT's Quality of Service +/// MQTT's Quality of Service levels #[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] pub enum QoS { - /// Quality of Service Level 0. Publications with this level are only sent once. + /// Quality of Service Level 0. Reliable delivery of publications at this level is guaranteed solely + /// by the underlying transport mechanism within a continuous network connection. Publications can + /// be lost if a disconnection on protocol or transport level occurs around the transmission time. + /// A single message is never delivered more than once. Retransmissions cannot be marked as such on + /// protocol level. AtMostOnce = 0, - /// Quality of Service Level 1. Publications with this level are sent until a PUBACK indicates that it was received. + + /// Quality of Service Level 1. Reliable delivery of publications is guaranteed at the cost of a + /// two-way handshake on protocol level, even with disconnects around (re-)transmission time, + /// given a reconnection to the same, non-expired session is successful and a retransmission is + /// subsequently attempted. A single message can be delivered more than once, but actual duplicates + /// cannot reliably be distinguished from needed retransmissions. AtLeastOnce = 1, - /// Quality of Service Level 2. Publications with this level are followed by a handshake assuring it is received once. + + /// Quality of Service Level 2. Reliable, exactly-once delivery of publications is guaranteed at + /// the cost of a four-way handshake at protocol level, even with disconnects around + /// (re-)transmission time, given a reconnection to the same, non-expired session is successful and + /// retransmission of the PUBLISH or PUBREL packets is subsequently attempted. Delivery of + /// publications may be further delayed than at other quality of service levels due to the required + /// release of the message after the three-way handshake within the four-way handshake. ExactlyOnce = 2, } impl From for QoS { diff --git a/src/types/string.rs b/src/types/string.rs index 63fce21..947bf47 100644 --- a/src/types/string.rs +++ b/src/types/string.rs @@ -247,7 +247,9 @@ impl<'s> MqttString<'s> { unsafe { from_utf8_unchecked(self.0.as_bytes()) } } - /// Delegates to [`crate::Bytes::as_borrowed`]. + /// Delegates to [`Bytes::as_borrowed`]. + /// + /// [`Bytes::as_borrowed`]: crate::Bytes::as_borrowed #[inline] #[must_use] pub const fn as_borrowed(&'s self) -> Self { @@ -293,7 +295,9 @@ impl<'s> MqttStringPair<'s> { Self { name, value } } - /// Delegates to [`crate::Bytes::as_borrowed`]. + /// Delegates to [`Bytes::as_borrowed`]. + /// + /// [`Bytes::as_borrowed`]: crate::Bytes::as_borrowed #[inline] #[must_use] pub const fn as_borrowed(&'s self) -> Self { diff --git a/src/types/topic.rs b/src/types/topic.rs index f302472..eb90052 100644 --- a/src/types/topic.rs +++ b/src/types/topic.rs @@ -82,7 +82,9 @@ impl<'t> TopicName<'t> { Self(string) } - /// Delegates to [`crate::Bytes::as_borrowed`]. + /// Delegates to [`Bytes::as_borrowed`]. + /// + /// [`Bytes::as_borrowed`]: crate::Bytes::as_borrowed #[inline] #[must_use] pub const fn as_borrowed(&'t self) -> Self { @@ -121,10 +123,34 @@ impl<'t> TopicFilter<'t> { return false; } - let mut i = 0; + let is_shared = s.len() >= 6 + && s[0] == b'$' + && s[1] == b's' + && s[2] == b'h' + && s[3] == b'a' + && s[4] == b'r' + && s[5] == b'e'; + + if is_shared { + // Minimal shared filter "$share/a/a" has length 10 + if s.len() < 10 { + return false; + } + // Check first '/' so that checking the sharename is simpler in the loop + if s[6] != b'/' { + return false; + } + } + + let mut i = match is_shared { + true => 7, + false => 0, + }; let mut level_len = 0; let mut level_contains_wildcard = false; + let mut checking_share_name = is_shared; + while i < s.len() { let b = s[i]; @@ -148,6 +174,28 @@ impl<'t> TopicFilter<'t> { } if b == b'/' { + if checking_share_name { + // ... a ShareName that is at least one character long [MQTT-4.8.2-1]. + if level_len == 0 { + return false; + } + + // The ShareName MUST NOT contain the characters "/", "+" or "#", but MUST be + // followed by a "/" character. ... + // Topic Filter [MQTT-4.8.2-2] + if level_contains_wildcard { + return false; + } + + // ... but MUST be followed by a "/" character. This "/" character MUST be + // followed by a Topic Filter [MQTT-4.8.2-2] + if i + 1 == s.len() { + return false; + } + + checking_share_name = false; + } + level_len = 0; level_contains_wildcard = false; } else { @@ -168,7 +216,40 @@ impl<'t> TopicFilter<'t> { true } - /// Creates a new topic filter while checking for correct syntax of the topic filter string + /// Returns whether the topic filter is the topic filter of a shared subscription. + pub const fn is_shared(&self) -> bool { + let s = self.0.as_str().as_bytes(); + + s.len() >= 10 + && s[0] == b'$' + && s[1] == b's' + && s[2] == b'h' + && s[3] == b'a' + && s[4] == b'r' + && s[5] == b'e' + } + + /// Returns whether the topic filter contains one or more of the wildcard characters `#` or `+`. + pub const fn has_wildcard(&self) -> bool { + let s = self.0.as_str().as_bytes(); + + let mut i = 0; + while i < s.len() { + let b = s[i]; + + if b == b'#' || b == b'+' { + return true; + } + + i += 1; + } + + false + } + + /// Creates a new topic filter while checking for correct syntax of the topic filter string. + /// If the filter starts with "$share", the constraints for a shared subscription's topic + /// filter are also enforced. #[const_fn(cfg(not(feature = "alloc")))] #[must_use] pub fn new(string: MqttString<'t>) -> Option { @@ -196,7 +277,9 @@ impl<'t> TopicFilter<'t> { Self(string) } - /// Delegates to [`crate::Bytes::as_borrowed`]. + /// Delegates to [`Bytes::as_borrowed`]. + /// + /// [`Bytes::as_borrowed`]: crate::Bytes::as_borrowed #[inline] #[must_use] pub const fn as_borrowed(&'t self) -> Self { @@ -214,11 +297,6 @@ impl<'t> From> for MqttString<'t> { value.0 } } -impl<'t> From> for TopicFilter<'t> { - fn from(value: TopicName<'t>) -> Self { - Self(value.0) - } -} #[derive(Debug, Clone, PartialEq, Eq)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] @@ -282,15 +360,41 @@ mod unit { use crate::types::{MqttString, TopicFilter, TopicName}; macro_rules! assert_valid { - ($t:ty, $l:literal) => { - let s = assert_ok!(MqttString::from_str($l)); - assert!(<$t>::new(s).is_some()) - }; + ($t:ty, $l:literal) => {{ + let s = assert_ok!(MqttString::from_str($l), "{}", $l); + let t = <$t>::new(s); + assert!(t.is_some(), "{}", $l); + let t = t.unwrap(); + + t + }}; + } + macro_rules! assert_valid_shared { + ($t:ty, $l:literal) => {{ + let s = assert_ok!(MqttString::from_str($l), "{}", $l); + let t = <$t>::new(s); + assert!(t.is_some(), "{}", $l); + let t = t.unwrap(); + assert!(t.is_shared(), "{}", $l); + + t + }}; + } + macro_rules! assert_valid_non_shared { + ($t:ty, $l:literal) => {{ + let s = assert_ok!(MqttString::from_str($l), "{}", $l); + let t = <$t>::new(s); + assert!(t.is_some(), "{}", $l); + let t = t.unwrap(); + assert!(!t.is_shared(), "{}", $l); + + t + }}; } macro_rules! assert_invalid { ($t:ty, $l:literal) => { match MqttString::from_str($l) { - Ok(s) => assert!(<$t>::new(s).is_none()), + Ok(s) => assert!(<$t>::new(s).is_none(), "{}", $l), Err(_) => {} } }; @@ -369,26 +473,96 @@ mod unit { #[test] fn topic_filter_valid() { - assert_valid!(TopicFilter, "#"); - assert_valid!(TopicFilter, "/#"); - assert_valid!(TopicFilter, "a/#"); - - assert_valid!(TopicFilter, "+"); - assert_valid!(TopicFilter, "/+"); - assert_valid!(TopicFilter, "+/"); - assert_valid!(TopicFilter, "a/+"); - assert_valid!(TopicFilter, "+/a"); - - assert_valid!(TopicFilter, "/"); - assert_valid!(TopicFilter, "//"); - assert_valid!(TopicFilter, "r"); - - assert_valid!(TopicFilter, "r/i/g/+/t"); - assert_valid!(TopicFilter, "correct/+/path"); - assert_valid!(TopicFilter, "right/path/#"); - assert_valid!(TopicFilter, "right"); - assert_valid!(TopicFilter, "sport/tennis/player1"); - assert_valid!(TopicFilter, "sport/tennis/player1/ranking"); - assert_valid!(TopicFilter, "sport/tennis/player1/score/wimbledon"); + assert_valid_non_shared!(TopicFilter, "#"); + assert_valid_non_shared!(TopicFilter, "/#"); + assert_valid_non_shared!(TopicFilter, "a/#"); + + assert_valid_non_shared!(TopicFilter, "+"); + assert_valid_non_shared!(TopicFilter, "/+"); + assert_valid_non_shared!(TopicFilter, "+/"); + assert_valid_non_shared!(TopicFilter, "a/+"); + assert_valid_non_shared!(TopicFilter, "+/a"); + + assert_valid_non_shared!(TopicFilter, "/"); + assert_valid_non_shared!(TopicFilter, "//"); + assert_valid_non_shared!(TopicFilter, "r"); + + assert_valid_non_shared!(TopicFilter, "fshare/"); + assert_valid_non_shared!(TopicFilter, "$fhare/"); + assert_valid_non_shared!(TopicFilter, "$sfare/"); + assert_valid_non_shared!(TopicFilter, "$shfre/"); + assert_valid_non_shared!(TopicFilter, "$shafe/"); + assert_valid_non_shared!(TopicFilter, "$sharf/"); + assert_valid_non_shared!(TopicFilter, "share/"); + assert_valid_non_shared!(TopicFilter, "$hare/"); + assert_valid_non_shared!(TopicFilter, "$sare/"); + assert_valid_non_shared!(TopicFilter, "$shre/"); + assert_valid_non_shared!(TopicFilter, "$shae/"); + + assert_valid_non_shared!(TopicFilter, "r/i/g/+/t"); + assert_valid_non_shared!(TopicFilter, "correct/+/path"); + assert_valid_non_shared!(TopicFilter, "right/path/#"); + assert_valid_non_shared!(TopicFilter, "right"); + assert_valid_non_shared!(TopicFilter, "sport/tennis/player1"); + assert_valid_non_shared!(TopicFilter, "sport/tennis/player1/ranking"); + assert_valid_non_shared!(TopicFilter, "sport/tennis/player1/score/wimbledon"); + } + + #[test] + fn topic_filter_shared_no_share_name() { + assert_invalid!(TopicFilter, "$share"); + assert_invalid!(TopicFilter, "$share/"); + assert_invalid!(TopicFilter, "$share//"); + } + + #[test] + fn topic_filter_shared_invalid_share_name() { + assert_invalid!(TopicFilter, "$share/+/"); + assert_invalid!(TopicFilter, "$share/#/a"); + assert_invalid!(TopicFilter, "$share/a+a/a"); + assert_invalid!(TopicFilter, "$share/a+/a"); + assert_invalid!(TopicFilter, "$share/+a/a"); + assert_invalid!(TopicFilter, "$share/a#a/a"); + assert_invalid!(TopicFilter, "$share/a#/a"); + assert_invalid!(TopicFilter, "$share/#a/a"); + assert_invalid!(TopicFilter, "$share///a"); + } + + #[test] + fn topic_filter_shared_no_topic_filter() { + assert_invalid!(TopicFilter, "$share/a/"); + } + + #[test] + fn topic_filter_shared_valid() { + assert_valid_shared!(TopicFilter, "$share/a/a"); + assert_valid_shared!(TopicFilter, "$share/a/+"); + assert_valid_shared!(TopicFilter, "$share/a/#"); + assert_valid_shared!(TopicFilter, "$share/a//"); + assert_valid_shared!(TopicFilter, "$share/consumer1/sport/tennis/+"); + assert_valid_shared!(TopicFilter, "$share/consumer2/sport/tennis/+"); + assert_valid_shared!(TopicFilter, "$share/consumer1//finance"); + assert_valid_shared!(TopicFilter, "$share/consumer1/+/finance"); + } + + #[test] + fn topic_filter_has_wildcard() { + assert!(!assert_valid!(TopicFilter, "a").has_wildcard()); + assert!(!assert_valid!(TopicFilter, "/").has_wildcard()); + assert!(!assert_valid!(TopicFilter, "//").has_wildcard()); + assert!(!assert_valid!(TopicFilter, "/a/").has_wildcard()); + assert!(!assert_valid!(TopicFilter, "$").has_wildcard()); + assert!(!assert_valid!(TopicFilter, "$a/$/").has_wildcard()); + assert!(!assert_valid!(TopicFilter, "$share/consumer1//finance").has_wildcard()); + + assert!(assert_valid!(TopicFilter, "#").has_wildcard()); + assert!(assert_valid!(TopicFilter, "+").has_wildcard()); + assert!(assert_valid!(TopicFilter, "/+/").has_wildcard()); + assert!(assert_valid!(TopicFilter, "+/").has_wildcard()); + assert!(assert_valid!(TopicFilter, "/#").has_wildcard()); + assert!(assert_valid!(TopicFilter, "/+").has_wildcard()); + assert!(assert_valid!(TopicFilter, "/+").has_wildcard()); + assert!(assert_valid!(TopicFilter, "$share/consumer1/+/finance").has_wildcard()); + assert!(assert_valid!(TopicFilter, "$share/consumer1/#").has_wildcard()); } } diff --git a/src/v5/packet/connack.rs b/src/v5/packet/connack.rs index a0cd905..8e4b046 100644 --- a/src/v5/packet/connack.rs +++ b/src/v5/packet/connack.rs @@ -2,7 +2,7 @@ use heapless::Vec; use crate::{ buffer::BufferProvider, - config::{MaximumPacketSize, ReceiveMaximum, SessionExpiryInterval}, + config::{MaximumPacketSize, SessionExpiryInterval}, eio::Read, fmt::{trace, verbose}, header::{FixedHeader, PacketType}, @@ -11,7 +11,7 @@ use crate::{ types::{ReasonCode, VarByteInt}, v5::property::{ AssignedClientIdentifier, AtMostOnceProperty, MaximumQoS, PropertyType, ReasonString, - ResponseInformation, RetainAvailable, ServerKeepAlive, ServerReference, + ReceiveMaximum, ResponseInformation, RetainAvailable, ServerKeepAlive, ServerReference, SharedSubscriptionAvailable, SubscriptionIdentifierAvailable, TopicAliasMaximum, UserProperty, WildcardSubscriptionAvailable, }, @@ -216,16 +216,16 @@ mod unit { use core::num::NonZero; use crate::{ - config::{KeepAlive, MaximumPacketSize, ReceiveMaximum, SessionExpiryInterval}, + config::{KeepAlive, MaximumPacketSize, SessionExpiryInterval}, test::rx::decode, types::{MqttString, MqttStringPair, QoS, ReasonCode}, v5::{ packet::ConnackPacket, property::{ - AssignedClientIdentifier, MaximumQoS, ReasonString, ResponseInformation, - RetainAvailable, ServerKeepAlive, ServerReference, SharedSubscriptionAvailable, - SubscriptionIdentifierAvailable, TopicAliasMaximum, UserProperty, - WildcardSubscriptionAvailable, + AssignedClientIdentifier, MaximumQoS, ReasonString, ReceiveMaximum, + ResponseInformation, RetainAvailable, ServerKeepAlive, ServerReference, + SharedSubscriptionAvailable, SubscriptionIdentifierAvailable, TopicAliasMaximum, + UserProperty, WildcardSubscriptionAvailable, }, }, }; diff --git a/src/v5/packet/connect.rs b/src/v5/packet/connect.rs index c127a82..566b22d 100644 --- a/src/v5/packet/connect.rs +++ b/src/v5/packet/connect.rs @@ -3,14 +3,14 @@ use core::{num::NonZero, ops::Not}; use heapless::Vec; use crate::{ - config::{KeepAlive, MaximumPacketSize, ReceiveMaximum, SessionExpiryInterval}, + config::{KeepAlive, MaximumPacketSize, SessionExpiryInterval}, eio::Write, header::{FixedHeader, PacketType}, io::write::{Writable, wlen}, packet::{Packet, TxError, TxPacket}, types::{MqttBinary, MqttString, QoS, VarByteInt, Will}, v5::property::{ - AuthenticationData, AuthenticationMethod, RequestProblemInformation, + AuthenticationData, AuthenticationMethod, ReceiveMaximum, RequestProblemInformation, RequestResponseInformation, TopicAliasMaximum, UserProperty, }, }; diff --git a/src/v5/packet/publish.rs b/src/v5/packet/publish.rs index 8443c76..3b55d42 100644 --- a/src/v5/packet/publish.rs +++ b/src/v5/packet/publish.rs @@ -445,7 +445,7 @@ mod unit { true, IdentifiedQoS::ExactlyOnce(PacketIdentifier::new(NonZero::new(9624).unwrap())), true, - TopicReference::Alias(23408), + TopicReference::Alias(NonZero::new(23408).unwrap()), Some(false.into()), Some(481123u32.into()), Some(TopicName::new(MqttString::from_str("uno, dos, tres, catorce").unwrap()).unwrap()), @@ -634,7 +634,7 @@ mod unit { packet.topic, TopicReference::Mapping( TopicName::new(MqttString::try_from("test").unwrap()).unwrap(), - 10, + NonZero::new(10).unwrap(), ) ); assert_eq!(packet.message, Bytes::from("hello".as_bytes())); diff --git a/src/v5/property/values.rs b/src/v5/property/values.rs index 3e909e4..63b9c2c 100644 --- a/src/v5/property/values.rs +++ b/src/v5/property/values.rs @@ -2,7 +2,7 @@ use core::num::NonZero; use crate::{ buffer::BufferProvider, - config::{KeepAlive, MaximumPacketSize, ReceiveMaximum, SessionExpiryInterval}, + config::{KeepAlive, MaximumPacketSize, SessionExpiryInterval}, eio::{Read, Write}, fmt::verbose, io::{ @@ -119,8 +119,11 @@ property!(RequestResponseInformation, bool); property!(ResponseInformation<'c>, MqttString<'c>); property!(ServerReference<'c>, MqttString<'c>); property!(ReasonString<'c>, MqttString<'c>); +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub struct ReceiveMaximum(pub(crate) NonZero); property!(TopicAliasMaximum, u16); -property!(TopicAlias, u16); +property!(TopicAlias, NonZero); #[derive(Debug, Clone, Copy, PartialEq, Eq)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] pub struct MaximumQoS(pub(crate) QoS); diff --git a/tests/common/utils.rs b/tests/common/utils.rs index d2e02bf..926708a 100644 --- a/tests/common/utils.rs +++ b/tests/common/utils.rs @@ -39,13 +39,10 @@ fn unique_number() -> u64 { pub fn unique_topic() -> (TopicName<'static>, TopicFilter<'static>) { let s = format!("rust/mqtt/is-amazing/{}", unique_number()); let b = s.into_bytes().into_boxed_slice(); - let s = MqttBinary::from_bytes(Bytes::Owned(b)) - .unwrap() - .try_into() - .unwrap(); + let s = MqttString::from_utf8_binary(MqttBinary::from_bytes(Bytes::Owned(b)).unwrap()).unwrap(); - let n = TopicName::new(s).unwrap(); - let f = n.clone().into(); + let n = TopicName::new(s.clone()).unwrap(); + let f = TopicFilter::new(s).unwrap(); (n, f) } diff --git a/tests/integration/connect_options.rs b/tests/integration/connect_options.rs index 42b18de..6efa9ab 100644 --- a/tests/integration/connect_options.rs +++ b/tests/integration/connect_options.rs @@ -37,8 +37,9 @@ async fn maximum_packet_size_not_exceeded() { .clone() .maximum_packet_size(NonZero::new(MAX_PACKET_SIZE).unwrap()); - let topic_name = TopicName::new(MqttString::from_str("a").unwrap()).unwrap(); - let topic_filter = TopicFilter::from(topic_name.clone()); + let s = MqttString::from_str("a").unwrap(); + let topic_name = TopicName::new(s.clone()).unwrap(); + let topic_filter = TopicFilter::new(s).unwrap(); let msg = [0u8; PAYLOAD_SIZE].as_slice(); @@ -85,8 +86,9 @@ async fn maximum_packet_size_barely_exceeded() { .clone() .maximum_packet_size(NonZero::new(MAX_PACKET_SIZE).unwrap()); - let topic_name = TopicName::new(MqttString::from_str("b").unwrap()).unwrap(); - let topic_filter = TopicFilter::from(topic_name.clone()); + let s = MqttString::from_str("b").unwrap(); + let topic_name = TopicName::new(s.clone()).unwrap(); + let topic_filter = TopicFilter::new(s).unwrap(); let msg = [0u8; PAYLOAD_SIZE].as_slice(); @@ -138,8 +140,9 @@ async fn maximum_packet_size_decently_exceeded() { .clone() .maximum_packet_size(NonZero::new(MAX_PACKET_SIZE).unwrap()); - let topic_name = TopicName::new(MqttString::from_str("c").unwrap()).unwrap(); - let topic_filter = TopicFilter::from(topic_name.clone()); + let s = MqttString::from_str("c").unwrap(); + let topic_name = TopicName::new(s.clone()).unwrap(); + let topic_filter = TopicFilter::new(s).unwrap(); let msg = [0u8; PAYLOAD_SIZE].as_slice(); @@ -193,8 +196,9 @@ async fn maximum_packet_size_at_varbyteint_boundary_not_exceeded() { .clone() .maximum_packet_size(NonZero::new(MAX_PACKET_SIZE).unwrap()); - let topic_name = TopicName::new(MqttString::from_str("d").unwrap()).unwrap(); - let topic_filter = TopicFilter::from(topic_name.clone()); + let s = MqttString::from_str("d").unwrap(); + let topic_name = TopicName::new(s.clone()).unwrap(); + let topic_filter = TopicFilter::new(s).unwrap(); let msg = [0u8; PAYLOAD_SIZE].as_slice(); @@ -240,8 +244,9 @@ async fn maximum_packet_size_at_varbyteint_boundary_exceeded() { .clone() .maximum_packet_size(NonZero::new(MAX_PACKET_SIZE).unwrap()); - let topic_name = TopicName::new(MqttString::from_str("e").unwrap()).unwrap(); - let topic_filter = TopicFilter::from(topic_name.clone()); + let s = MqttString::from_str("e").unwrap(); + let topic_name = TopicName::new(s.clone()).unwrap(); + let topic_filter = TopicFilter::new(s).unwrap(); let msg = [0u8; PAYLOAD_SIZE].as_slice(); @@ -786,7 +791,7 @@ async fn send_maximum_buffer_exceeded() { assert_ok!(c.connect(tcp, NO_SESSION_CONNECT_OPTIONS, None).await); - let server_receive_maximum = c.server_config().receive_maximum.0.get(); + let server_receive_maximum = c.server_config().receive_maximum.get(); assert!( server_receive_maximum as usize > SEND_MAXIMUM_BUFFER_SIZE, "server receive maximum must be greater than {SEND_MAXIMUM_BUFFER_SIZE} for this test" @@ -823,7 +828,7 @@ async fn server_receive_maximum_exceeded() { assert_ok!(c.connect(tcp, NO_SESSION_CONNECT_OPTIONS, None).await); - let server_receive_maximum = c.server_config().receive_maximum.0.get(); + let server_receive_maximum = c.server_config().receive_maximum.get(); assert!( server_receive_maximum < 256, "server receive maximum must be less than 256 for this test" diff --git a/tests/integration/pub_properties.rs b/tests/integration/pub_properties.rs index f555342..e34f4e8 100644 --- a/tests/integration/pub_properties.rs +++ b/tests/integration/pub_properties.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{num::NonZero, time::Duration}; use rust_mqtt::{ client::{ @@ -156,13 +156,16 @@ async fn topic_alias_basic() { let publisher = async { sleep(Duration::from_secs(1)).await; - let pub_options = PublicationOptions::new(TopicReference::Mapping(topic_name.clone(), 1)) - .retain() - .at_least_once(); + let pub_options = PublicationOptions::new(TopicReference::Mapping( + topic_name.clone(), + NonZero::new(1).unwrap(), + )) + .retain() + .at_least_once(); assert_published!(tx, pub_options.clone(), msg.into()); - let pub_options = PublicationOptions::new(TopicReference::Alias(1)) + let pub_options = PublicationOptions::new(TopicReference::Alias(NonZero::new(1).unwrap())) .retain() .at_least_once(); @@ -201,19 +204,25 @@ async fn topic_alias_remap() { let publisher = async { sleep(Duration::from_secs(1)).await; - let pub_options = PublicationOptions::new(TopicReference::Mapping(topic_name1.clone(), 1)) - .retain() - .at_least_once(); + let pub_options = PublicationOptions::new(TopicReference::Mapping( + topic_name1.clone(), + NonZero::new(1).unwrap(), + )) + .retain() + .at_least_once(); assert_published!(tx, pub_options.clone(), msg.into()); - let pub_options = PublicationOptions::new(TopicReference::Mapping(topic_name2.clone(), 1)) - .retain() - .at_least_once(); + let pub_options = PublicationOptions::new(TopicReference::Mapping( + topic_name2.clone(), + NonZero::new(1).unwrap(), + )) + .retain() + .at_least_once(); assert_published!(tx, pub_options.clone(), msg.into()); - let pub_options = PublicationOptions::new(TopicReference::Alias(1)) + let pub_options = PublicationOptions::new(TopicReference::Alias(NonZero::new(1).unwrap())) .retain() .at_least_once(); diff --git a/tests/load/mod.rs b/tests/load/mod.rs index 771b5b1..709787a 100644 --- a/tests/load/mod.rs +++ b/tests/load/mod.rs @@ -1,7 +1,7 @@ use log::info; use rust_mqtt::{ client::options::{PublicationOptions, TopicReference}, - types::{IdentifiedQoS, QoS, TopicName}, + types::{IdentifiedQoS, QoS, TopicFilter, TopicName}, }; use tokio::{ join, @@ -60,7 +60,11 @@ async fn receive_multiple( let options = DEFAULT_QOS0_SUB_OPTIONS.qos(qos); info!("[Receiver] Subscribing to topic {:?}", topic_name.as_ref()); - assert_subscribe!(client, &options, topic_name.into()); + assert_subscribe!( + client, + &options, + TopicFilter::new(topic_name.as_ref().as_borrowed()).unwrap() + ); info!("[Receiver] Subscription confirmed, signaling ready"); assert_ok!(ready_tx.send(()));