From aab6bcb6b7fc674f9743878b141bd85a49751603 Mon Sep 17 00:00:00 2001 From: Kavan Price Date: Sun, 15 Mar 2026 18:21:56 +0000 Subject: [PATCH 1/8] Add status and error types --- lib/rs-edge-driver/.gitignore | 1 + lib/rs-edge-driver/Cargo.lock | 353 +++++++++++++++++++++++++++++++ lib/rs-edge-driver/Cargo.toml | 13 ++ lib/rs-edge-driver/src/error.rs | 43 ++++ lib/rs-edge-driver/src/status.rs | 36 ++++ 5 files changed, 446 insertions(+) create mode 100644 lib/rs-edge-driver/.gitignore create mode 100644 lib/rs-edge-driver/Cargo.lock create mode 100644 lib/rs-edge-driver/Cargo.toml create mode 100644 lib/rs-edge-driver/src/error.rs create mode 100644 lib/rs-edge-driver/src/status.rs diff --git a/lib/rs-edge-driver/.gitignore b/lib/rs-edge-driver/.gitignore new file mode 100644 index 00000000..2f7896d1 --- /dev/null +++ b/lib/rs-edge-driver/.gitignore @@ -0,0 +1 @@ +target/ diff --git a/lib/rs-edge-driver/Cargo.lock b/lib/rs-edge-driver/Cargo.lock new file mode 100644 index 00000000..71367e15 --- /dev/null +++ b/lib/rs-edge-driver/Cargo.lock @@ -0,0 +1,353 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "async-trait" +version = "0.1.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "bitflags" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "843867be96c8daad0d758b57df9392b6d8d271134fce549de6ce169ff98a92af" + +[[package]] +name = "bytes" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" + +[[package]] +name = "cfg-if" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" + +[[package]] +name = "errno" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" +dependencies = [ + "libc", + "windows-sys", +] + +[[package]] +name = "itoa" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2" + +[[package]] +name = "libc" +version = "0.2.183" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d" + +[[package]] +name = "lock_api" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "224399e74b87b5f3557511d98dff8b14089b3dadafcab6bb93eab67d3aace965" +dependencies = [ + "scopeguard", +] + +[[package]] +name = "memchr" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" + +[[package]] +name = "mio" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a69bcab0ad47271a0234d9422b131806bf3968021e5dc9328caf2d4cd58557fc" +dependencies = [ + "libc", + "wasi", + "windows-sys", +] + +[[package]] +name = "once_cell" +version = "1.21.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" + +[[package]] +name = "parking_lot" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93857453250e3077bd71ff98b6a65ea6621a19bb0f559a85248955ac12c45a1a" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-link", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" + +[[package]] +name = "proc-macro2" +version = "1.0.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fd00f0bb2e90d81d1044c2b32617f68fcb9fa3bb7640c23e9c748e53fb30934" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41f2619966050689382d2b44f664f4bc593e129785a36d6ee376ddf37259b924" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "redox_syscall" +version = "0.5.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" +dependencies = [ + "bitflags", +] + +[[package]] +name = "rs-edge-driver" +version = "0.1.0" +dependencies = [ + "async-trait", + "bytes", + "serde", + "serde_json", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "serde" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.149" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" +dependencies = [ + "itoa", + "memchr", + "serde", + "serde_core", + "zmij", +] + +[[package]] +name = "signal-hook-registry" +version = "1.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b" +dependencies = [ + "errno", + "libc", +] + +[[package]] +name = "smallvec" +version = "1.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" + +[[package]] +name = "socket2" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" +dependencies = [ + "libc", + "windows-sys", +] + +[[package]] +name = "syn" +version = "2.0.117" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e665b8803e7b1d2a727f4023456bbbbe74da67099c585258af0ad9c5013b9b99" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "thiserror" +version = "2.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio" +version = "1.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27ad5e34374e03cfffefc301becb44e9dc3c17584f414349ebe29ed26661822d" +dependencies = [ + "bytes", + "libc", + "mio", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "windows-sys", +] + +[[package]] +name = "tokio-macros" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c55a2eff8b69ce66c84f85e1da1c233edc36ceb85a2058d11b0d6a3c7e7569c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" +dependencies = [ + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" +dependencies = [ + "once_cell", +] + +[[package]] +name = "unicode-ident" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" + +[[package]] +name = "wasi" +version = "0.11.1+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" + +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + +[[package]] +name = "windows-sys" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc" +dependencies = [ + "windows-link", +] + +[[package]] +name = "zmij" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" diff --git a/lib/rs-edge-driver/Cargo.toml b/lib/rs-edge-driver/Cargo.toml new file mode 100644 index 00000000..4f2106b8 --- /dev/null +++ b/lib/rs-edge-driver/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "rs-edge-driver" +version = "0.1.0" +edition = "2024" + +[dependencies] +async-trait = "0.1" +bytes = "1" +serde = { version = "1", features = ["derive"] } +serde_json = "1" +tokio = { version = "1", features = ["full"] } +tracing = "0.1" +thiserror = "2" diff --git a/lib/rs-edge-driver/src/error.rs b/lib/rs-edge-driver/src/error.rs new file mode 100644 index 00000000..d33c5ac6 --- /dev/null +++ b/lib/rs-edge-driver/src/error.rs @@ -0,0 +1,43 @@ +use thiserror::Error; + +/// Error returned by [`Handler::connect`] to indicate why +/// the connection to the southbound device failed. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ConnectError { + /// Could not reach or communicate with the device. + ConnectionFailed, + /// Connected but authentication/authorisation was rejected. + AuthFailed, +} + +/// Error returned by [`Handler::create`] when the configuration +/// supplied by the edge agent is invalid or incomplete. +#[derive(Debug, Error)] +#[error("invalid handler configuration: {reason}")] +pub struct HandlerError { + pub reason: String, +} + +impl HandlerError { + pub fn new(reason: impl Into) -> Self { + Self { + reason: reason.into(), + } + } +} + +/// Top-level errors produced by the driver runtime. +#[derive(Debug, Error)] +pub enum Error { + #[error("MQTT error: {0}")] + Mqtt(String), + + #[error("configuration error: {0}")] + Config(#[from] HandlerError), + + #[error("environment variable {var} not set")] + MissingEnv { var: &'static str }, + + #[error("{0}")] + Other(String), +} diff --git a/lib/rs-edge-driver/src/status.rs b/lib/rs-edge-driver/src/status.rs new file mode 100644 index 00000000..53f2a52c --- /dev/null +++ b/lib/rs-edge-driver/src/status.rs @@ -0,0 +1,36 @@ +use std::fmt; + +/// The operational status of the driver, published to +/// `fpEdge1/{id}/status` so the edge agent knows the driver's state. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Status { + /// Driver is not connected to MQTT. + Down, + /// Connected to MQTT, awaiting configuration from the edge agent. + Ready, + /// Received an invalid or unusable device configuration. + Conf, + /// Received invalid address mappings. + Addr, + /// Handler failed to connect to the southbound device. + Conn, + /// Handler connected but authentication was rejected. + Auth, + /// Fully operational — connected to the device and ready for data. + Up, +} + +impl fmt::Display for Status { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let s = match self { + Status::Down => "DOWN", + Status::Ready => "READY", + Status::Conf => "CONF", + Status::Addr => "ADDR", + Status::Conn => "CONN", + Status::Auth => "AUTH", + Status::Up => "UP", + }; + f.write_str(s) + } +} From 198e4635483d188866821a2f7e85c1cb826618c0 Mon Sep 17 00:00:00 2001 From: Kavan Price Date: Sun, 15 Mar 2026 18:33:02 +0000 Subject: [PATCH 2/8] Add driver config --- lib/rs-edge-driver/src/config.rs | 38 ++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 lib/rs-edge-driver/src/config.rs diff --git a/lib/rs-edge-driver/src/config.rs b/lib/rs-edge-driver/src/config.rs new file mode 100644 index 00000000..bc2986f9 --- /dev/null +++ b/lib/rs-edge-driver/src/config.rs @@ -0,0 +1,38 @@ +use std::env; +use std::time::Duration; + +use crate::error::Error; + +/// Configuration for the driver runtime, specifying how to connect +/// to the edge agent's MQTT broker. +#[derive(Debug, Clone)] +pub struct DriverConfig { + /// MQTT broker URL (e.g. `mqtt://localhost:1883`). + pub mqtt_url: String, + /// Username for MQTT authentication — also used as the driver + /// identity in `fpEdge1/{id}/...` topics. + pub username: String, + /// Password for MQTT authentication. + pub password: String, + /// How long to wait before retrying a failed device connection. + pub reconnect_delay: Duration, +} + +impl DriverConfig { + /// Build configuration from environment variables: + /// - `EDGE_MQTT` — broker URL + /// - `EDGE_USERNAME` — MQTT username / driver identity + /// - `EDGE_PASSWORD` — MQTT password + pub fn from_env() -> Result { + Ok(Self { + mqtt_url: required_env("EDGE_MQTT")?, + username: required_env("EDGE_USERNAME")?, + password: required_env("EDGE_PASSWORD")?, + reconnect_delay: Duration::from_secs(5), + }) + } +} + +fn required_env(var: &'static str) -> Result { + env::var(var).map_err(|_| Error::MissingEnv { var }) +} From 0752770e3de89ce0bca0d1b05e39dc030cf06411 Mon Sep 17 00:00:00 2001 From: Kavan Price Date: Sun, 15 Mar 2026 18:37:09 +0000 Subject: [PATCH 3/8] Implement handler trait --- lib/rs-edge-driver/src/driver.rs | 131 +++++++++++++++++++++++++++ lib/rs-edge-driver/src/handler.rs | 145 ++++++++++++++++++++++++++++++ lib/rs-edge-driver/src/lib.rs | 64 +++++++++++++ 3 files changed, 340 insertions(+) create mode 100644 lib/rs-edge-driver/src/driver.rs create mode 100644 lib/rs-edge-driver/src/handler.rs create mode 100644 lib/rs-edge-driver/src/lib.rs diff --git a/lib/rs-edge-driver/src/driver.rs b/lib/rs-edge-driver/src/driver.rs new file mode 100644 index 00000000..cfa7ce03 --- /dev/null +++ b/lib/rs-edge-driver/src/driver.rs @@ -0,0 +1,131 @@ +use std::collections::HashMap; +use std::marker::PhantomData; + +use bytes::Bytes; +use tokio::sync::mpsc; + +use crate::config::DriverConfig; +use crate::error::Error; +use crate::handler::Handler; +use crate::status::Status; + +/// A handle given to [`Handler`] implementations so they can push +/// data back to the driver asynchronously. +/// +/// This is the primary mechanism for async/event-driven handlers: +/// store the handle in your handler struct, then call +/// [`publish`](DriverHandle::publish) whenever new data arrives +/// from the device. +#[derive(Clone)] +pub struct DriverHandle { + tx: mpsc::UnboundedSender, +} + +/// Internal events sent from the handler back to the driver loop. +pub(crate) enum DriverEvent { + /// Handler is publishing data for an address. + Data { topic: String, payload: Bytes }, +} + +impl DriverHandle { + pub(crate) fn new(tx: mpsc::UnboundedSender) -> Self { + Self { tx } + } + + /// Publish data for a topic back to the edge agent. + /// + /// Used by async handlers from within their [`subscribe`](crate::Handler::subscribe) + /// callbacks. The topic should match one of the address topics + /// configured by the edge agent. + pub fn publish(&self, topic: impl Into, data: Bytes) { + let _ = self.tx.send(DriverEvent::Data { + topic: topic.into(), + payload: data, + }); + } +} + +/// The driver runtime. Manages the MQTT connection to the edge agent, +/// handler lifecycle, and data flow. +/// +/// Generic over the [`Handler`] implementation, which defines the +/// device-specific protocol logic. +/// +/// # Usage +/// +/// ```rust,ignore +/// use rs_edge_driver::*; +/// +/// #[tokio::main] +/// async fn main() -> Result<(), Error> { +/// let config = DriverConfig::from_env()?; +/// let mut driver = Driver::::new(config); +/// driver.run().await +/// } +/// ``` +pub struct Driver { + config: DriverConfig, + status: Status, + handler: Option, + /// topic → parsed address + addrs: HashMap, + /// parsed address → topic (reverse lookup for async publishing) + topics: HashMap, + _marker: PhantomData, +} + +impl Driver { + /// Create a new driver with the given configuration. + pub fn new(config: DriverConfig) -> Self { + Self { + config, + status: Status::Down, + handler: None, + addrs: HashMap::new(), + topics: HashMap::new(), + _marker: PhantomData, + } + } + + /// Run the driver's main event loop. + /// + /// This connects to the edge agent's MQTT broker, listens for + /// configuration and poll requests, and manages the handler + /// lifecycle. It runs indefinitely until an unrecoverable error + /// occurs. + /// + /// # Lifecycle + /// + /// 1. Connect to MQTT broker, publish status `READY` + /// 2. Receive device config → call `Handler::create` + /// 3. Call `Handler::connect` → status `UP` on success + /// 4. Receive address mappings → call `Handler::parse_addr` + /// 5. **Polled**: respond to poll requests via `Handler::poll` + /// **Async**: call `Handler::subscribe`, receive data via `DriverHandle::publish` + /// 6. On reconfiguration: call `Handler::close`, restart from step 2 + pub async fn run(&mut self) -> Result<(), Error> { + // TODO: implement MQTT connection and event loop + // + // This will involve: + // - Connecting to the MQTT broker at self.config.mqtt_url + // - Setting last-will to fpEdge1/{id}/status → "DOWN" + // - Subscribing to fpEdge1/{id}/active, conf, addr, poll, cmd/# + // - Publishing status updates + // - Dispatching incoming messages to handler methods + // - Managing reconnection on device connection failures + + tracing::info!( + mqtt = %self.config.mqtt_url, + id = %self.config.username, + "driver starting" + ); + + todo!("MQTT event loop not yet implemented") + } + + fn set_status(&mut self, status: Status) { + self.status = status; + tracing::info!(%status, "driver status changed"); + // TODO: publish to fpEdge1/{id}/status + } +} diff --git a/lib/rs-edge-driver/src/handler.rs b/lib/rs-edge-driver/src/handler.rs new file mode 100644 index 00000000..a59e5305 --- /dev/null +++ b/lib/rs-edge-driver/src/handler.rs @@ -0,0 +1,145 @@ +use std::hash::Hash; + +use async_trait::async_trait; +use bytes::Bytes; + +use crate::driver::DriverHandle; +use crate::error::{ConnectError, HandlerError}; + +/// The trait that driver authors implement to define how to communicate +/// with a specific southbound device. +/// +/// The library handles all MQTT communication with the edge agent; +/// implementors only need to deal with the device protocol itself. +/// +/// # Driver modes +/// +/// There are two modes of operation — choose one by overriding the +/// appropriate default method: +/// +/// - **Polled**: override [`poll`](Handler::poll) to read data from +/// the device on demand when the edge agent requests it. +/// - **Async**: override [`subscribe`](Handler::subscribe) to set up +/// subscriptions, then push data back via +/// [`DriverHandle::publish`] when values arrive. +/// +/// # Example +/// +/// ```rust,ignore +/// use async_trait::async_trait; +/// use bytes::Bytes; +/// use rs_edge_driver::*; +/// +/// struct ModbusHandler { +/// handle: DriverHandle, +/// host: String, +/// port: u16, +/// // ... client state +/// } +/// +/// #[async_trait] +/// impl Handler for ModbusHandler { +/// type Addr = ModbusAddr; +/// +/// fn create( +/// handle: DriverHandle, +/// config: serde_json::Value, +/// ) -> Result { +/// let host = config["host"].as_str() +/// .ok_or_else(|| HandlerError::new("missing 'host'"))? +/// .to_string(); +/// let port = config["port"].as_u64() +/// .ok_or_else(|| HandlerError::new("missing 'port'"))? as u16; +/// Ok(Self { handle, host, port }) +/// } +/// +/// fn parse_addr(&self, raw: &str) -> Option { +/// // parse "1,holding,100,10" into a typed address +/// todo!() +/// } +/// +/// async fn connect(&mut self) -> Result<(), ConnectError> { +/// // establish TCP connection to the Modbus device +/// todo!() +/// } +/// +/// async fn poll(&mut self, addr: &ModbusAddr) -> Option { +/// // read registers and return the raw bytes +/// todo!() +/// } +/// +/// async fn close(&mut self) { +/// // disconnect from the device +/// } +/// } +/// ``` +#[async_trait] +pub trait Handler: Send + Sized { + /// A parsed, validated representation of a device address. + /// + /// The edge agent sends addresses as strings; [`parse_addr`](Handler::parse_addr) + /// converts them into this type. Using an associated type gives + /// compile-time safety — no downcasting needed. + type Addr: Send + Sync + Hash + Eq + Clone; + + /// Construct a new handler from the configuration object sent + /// by the edge agent. + /// + /// Return `Err` if the configuration is invalid — the driver will + /// report status `CONF` to the edge agent. + /// + /// The [`DriverHandle`] can be stored and used later to push + /// async data back to the driver via [`DriverHandle::publish`]. + fn create(handle: DriverHandle, config: serde_json::Value) -> Result; + + /// Parse and validate a raw address string from the edge agent. + /// + /// Return `None` if the address is not valid for this handler. + /// Invalid addresses cause the driver to report status `ADDR`. + fn parse_addr(&self, addr: &str) -> Option; + + /// Connect (or reconnect) to the southbound device. + /// + /// Return `Ok(())` on success. The driver will set status `UP`. + /// Return `Err(ConnectError::ConnectionFailed)` or + /// `Err(ConnectError::AuthFailed)` on failure — the driver will + /// retry after the configured reconnect delay. + async fn connect(&mut self) -> Result<(), ConnectError>; + + /// Clean up resources and disconnect from the device. + /// + /// Called before the handler is dropped — either due to + /// reconfiguration or shutdown. + async fn close(&mut self); + + /// **Polled mode**: read data for a single address from the device. + /// + /// Called when the edge agent sends a poll request. Return + /// `Some(bytes)` with the raw data, or `None` if the read failed. + /// + /// The default implementation returns `None`. Override this for + /// polled drivers. + async fn poll(&mut self, _addr: &Self::Addr) -> Option { + None + } + + /// **Async mode**: set up subscriptions for the given addresses. + /// + /// Called after [`connect`](Handler::connect) succeeds and + /// addresses have been configured. When data arrives, push it + /// back to the driver via [`DriverHandle::publish`]. + /// + /// Return `true` if all subscriptions were set up successfully. + /// + /// The default implementation returns `true` (no-op). Override + /// this for async/event-driven drivers. + async fn subscribe(&mut self, _addrs: &[Self::Addr]) -> bool { + true + } + + /// Handle a command sent from the edge agent. + /// + /// Commands arrive on `fpEdge1/{id}/cmd/{name}` and are + /// forwarded here. Override this if the device supports commands. + async fn cmd(&mut self, _command: &str, _payload: Bytes) {} +} diff --git a/lib/rs-edge-driver/src/lib.rs b/lib/rs-edge-driver/src/lib.rs new file mode 100644 index 00000000..b83a19bd --- /dev/null +++ b/lib/rs-edge-driver/src/lib.rs @@ -0,0 +1,64 @@ +//! # rs-edge-driver +//! +//! A framework for building Factory+ edge device drivers in Rust. +//! +//! This library handles all communication with the edge agent (MQTT, +//! Sparkplug lifecycle, configuration management). Driver authors only +//! need to implement the [`Handler`] trait to define how to talk to +//! their specific device protocol. +//! +//! # Quick start +//! +//! 1. Implement [`Handler`] for your device type +//! 2. Create a [`DriverConfig`] (typically via [`DriverConfig::from_env`]) +//! 3. Construct a [`Driver`] and call [`Driver::run`] +//! +//! ```rust,ignore +//! use async_trait::async_trait; +//! use bytes::Bytes; +//! use rs_edge_driver::*; +//! +//! struct MyHandler { handle: DriverHandle } +//! +//! #[async_trait] +//! impl Handler for MyHandler { +//! type Addr = String; +//! +//! fn create(handle: DriverHandle, config: serde_json::Value) -> Result { +//! Ok(Self { handle }) +//! } +//! +//! fn parse_addr(&self, addr: &str) -> Option { +//! Some(addr.to_string()) +//! } +//! +//! async fn connect(&mut self) -> Result<(), ConnectError> { +//! Ok(()) +//! } +//! +//! async fn poll(&mut self, addr: &String) -> Option { +//! Some(Bytes::from_static(b"hello")) +//! } +//! +//! async fn close(&mut self) {} +//! } +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Error> { +//! let config = DriverConfig::from_env()?; +//! let mut driver = Driver::::new(config); +//! driver.run().await +//! } +//! ``` + +mod config; +mod driver; +mod error; +mod handler; +mod status; + +pub use config::DriverConfig; +pub use driver::{Driver, DriverHandle}; +pub use error::{ConnectError, Error, HandlerError}; +pub use handler::Handler; +pub use status::Status; From 897b31c610a2ce1da6fdb0f049cba5b8a528d602 Mon Sep 17 00:00:00 2001 From: Kavan Price Date: Sun, 15 Mar 2026 18:50:26 +0000 Subject: [PATCH 4/8] Fully implement driver --- lib/rs-edge-driver/Cargo.lock | 644 ++++++++++++++++++++++++++++++- lib/rs-edge-driver/Cargo.toml | 2 + lib/rs-edge-driver/src/driver.rs | 413 +++++++++++++++++++- 3 files changed, 1040 insertions(+), 19 deletions(-) diff --git a/lib/rs-edge-driver/Cargo.lock b/lib/rs-edge-driver/Cargo.lock index 71367e15..81dd20f4 100644 --- a/lib/rs-edge-driver/Cargo.lock +++ b/lib/rs-edge-driver/Cargo.lock @@ -25,12 +25,49 @@ version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" +[[package]] +name = "cc" +version = "1.2.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a0dd1ca384932ff3641c8718a02769f1698e7563dc6974ffd03346116310423" +dependencies = [ + "find-msvc-tools", + "shlex", +] + [[package]] name = "cfg-if" version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" + +[[package]] +name = "displaydoc" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "errno" version = "0.3.14" @@ -38,7 +75,176 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.61.2", +] + +[[package]] +name = "find-msvc-tools" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" + +[[package]] +name = "flume" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" +dependencies = [ + "futures-core", + "futures-sink", + "spin", +] + +[[package]] +name = "form_urlencoded" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb4cb245038516f5f85277875cdaa4f7d2c9a0fa0468de06ed190163b1581fcf" +dependencies = [ + "percent-encoding", +] + +[[package]] +name = "futures-core" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" + +[[package]] +name = "futures-sink" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c39754e157331b013978ec91992bde1ac089843443c49cbc7f46150b0fad0893" + +[[package]] +name = "futures-task" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393" + +[[package]] +name = "futures-util" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" +dependencies = [ + "futures-core", + "futures-task", + "pin-project-lite", + "slab", +] + +[[package]] +name = "getrandom" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + +[[package]] +name = "icu_collections" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c6b649701667bbe825c3b7e6388cb521c23d88644678e83c0c4d0a621a34b43" +dependencies = [ + "displaydoc", + "potential_utf", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_locale_core" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edba7861004dd3714265b4db54a3c390e880ab658fec5f7db895fae2046b5bb6" +dependencies = [ + "displaydoc", + "litemap", + "tinystr", + "writeable", + "zerovec", +] + +[[package]] +name = "icu_normalizer" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f6c8828b67bf8908d82127b2054ea1b4427ff0230ee9141c54251934ab1b599" +dependencies = [ + "icu_collections", + "icu_normalizer_data", + "icu_properties", + "icu_provider", + "smallvec", + "zerovec", +] + +[[package]] +name = "icu_normalizer_data" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7aedcccd01fc5fe81e6b489c15b247b8b0690feb23304303a9e560f37efc560a" + +[[package]] +name = "icu_properties" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "020bfc02fe870ec3a66d93e677ccca0562506e5872c650f893269e08615d74ec" +dependencies = [ + "icu_collections", + "icu_locale_core", + "icu_properties_data", + "icu_provider", + "zerotrie", + "zerovec", +] + +[[package]] +name = "icu_properties_data" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "616c294cf8d725c6afcd8f55abc17c56464ef6211f9ed59cccffe534129c77af" + +[[package]] +name = "icu_provider" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85962cf0ce02e1e0a629cc34e7ca3e373ce20dda4c4d7294bbd0bf1fdb59e614" +dependencies = [ + "displaydoc", + "icu_locale_core", + "writeable", + "yoke", + "zerofrom", + "zerotrie", + "zerovec", +] + +[[package]] +name = "idna" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b0875f23caa03898994f6ddc501886a45c7d3d62d04d2d90788d47be1b1e4de" +dependencies = [ + "idna_adapter", + "smallvec", + "utf8_iter", +] + +[[package]] +name = "idna_adapter" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3acae9609540aa318d1bc588455225fb2085b9ed0c4f6bd0d9d5bcd86f1a0344" +dependencies = [ + "icu_normalizer", + "icu_properties", ] [[package]] @@ -53,6 +259,12 @@ version = "0.2.183" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d" +[[package]] +name = "litemap" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77" + [[package]] name = "lock_api" version = "0.4.14" @@ -62,6 +274,12 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "log" +version = "0.4.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" + [[package]] name = "memchr" version = "2.8.0" @@ -76,7 +294,7 @@ checksum = "a69bcab0ad47271a0234d9422b131806bf3968021e5dc9328caf2d4cd58557fc" dependencies = [ "libc", "wasi", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -85,6 +303,12 @@ version = "1.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" +[[package]] +name = "openssl-probe" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" + [[package]] name = "parking_lot" version = "0.12.5" @@ -108,12 +332,27 @@ dependencies = [ "windows-link", ] +[[package]] +name = "percent-encoding" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" + [[package]] name = "pin-project-lite" version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" +[[package]] +name = "potential_utf" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b73949432f5e2a09657003c25bca5e19a0e9c84f8058ca374f49e0ebe605af77" +dependencies = [ + "zerovec", +] + [[package]] name = "proc-macro2" version = "1.0.106" @@ -141,17 +380,116 @@ dependencies = [ "bitflags", ] +[[package]] +name = "ring" +version = "0.17.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7" +dependencies = [ + "cc", + "cfg-if", + "getrandom", + "libc", + "untrusted", + "windows-sys 0.52.0", +] + [[package]] name = "rs-edge-driver" version = "0.1.0" dependencies = [ "async-trait", "bytes", + "rumqttc", "serde", "serde_json", - "thiserror", + "thiserror 2.0.18", "tokio", "tracing", + "url", +] + +[[package]] +name = "rumqttc" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1568e15fab2d546f940ed3a21f48bbbd1c494c90c99c4481339364a497f94a9" +dependencies = [ + "bytes", + "flume", + "futures-util", + "log", + "rustls-native-certs", + "rustls-pemfile", + "rustls-webpki", + "thiserror 1.0.69", + "tokio", + "tokio-rustls", +] + +[[package]] +name = "rustls" +version = "0.22.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" +dependencies = [ + "log", + "ring", + "rustls-pki-types", + "rustls-webpki", + "subtle", + "zeroize", +] + +[[package]] +name = "rustls-native-certs" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "rustls-pki-types", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-pemfile" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" +dependencies = [ + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be040f8b0a225e40375822a563fa9524378b9d63112f53e19ffff34df5d33fdd" +dependencies = [ + "zeroize", +] + +[[package]] +name = "rustls-webpki" +version = "0.102.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9" +dependencies = [ + "ring", + "rustls-pki-types", + "untrusted", +] + +[[package]] +name = "schannel" +version = "0.1.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91c1b7e4904c873ef0710c1f407dde2e6287de2bebc1bbbf7d430bb7cbffd939" +dependencies = [ + "windows-sys 0.61.2", ] [[package]] @@ -160,6 +498,29 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "security-framework" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ce2691df843ecc5d231c0b14ece2acc3efb62c0a398c7e1d875f3983ce020e3" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "serde" version = "1.0.228" @@ -203,6 +564,12 @@ dependencies = [ "zmij", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook-registry" version = "1.4.8" @@ -213,6 +580,12 @@ dependencies = [ "libc", ] +[[package]] +name = "slab" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5" + [[package]] name = "smallvec" version = "1.15.1" @@ -226,9 +599,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.61.2", ] +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + +[[package]] +name = "stable_deref_trait" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" + +[[package]] +name = "subtle" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" + [[package]] name = "syn" version = "2.0.117" @@ -240,13 +634,44 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "synstructure" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "thiserror" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl 1.0.69", +] + [[package]] name = "thiserror" version = "2.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4" dependencies = [ - "thiserror-impl", + "thiserror-impl 2.0.18", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -260,6 +685,16 @@ dependencies = [ "syn", ] +[[package]] +name = "tinystr" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42d3e9c45c09de15d06dd8acf5f4e0e399e85927b7f00711024eb7ae10fa4869" +dependencies = [ + "displaydoc", + "zerovec", +] + [[package]] name = "tokio" version = "1.50.0" @@ -274,7 +709,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -288,6 +723,17 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-rustls" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" +dependencies = [ + "rustls", + "rustls-pki-types", + "tokio", +] + [[package]] name = "tracing" version = "0.1.44" @@ -325,6 +771,30 @@ version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + +[[package]] +name = "url" +version = "2.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff67a8a4397373c3ef660812acab3268222035010ab8680ec4215f38ba3d0eed" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", + "serde", +] + +[[package]] +name = "utf8_iter" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" + [[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" @@ -337,6 +807,15 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets", +] + [[package]] name = "windows-sys" version = "0.61.2" @@ -346,6 +825,159 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" + +[[package]] +name = "writeable" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" + +[[package]] +name = "yoke" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72d6e5c6afb84d73944e5cedb052c4680d5657337201555f9f2a16b7406d4954" +dependencies = [ + "stable_deref_trait", + "yoke-derive", + "zerofrom", +] + +[[package]] +name = "yoke-derive" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + +[[package]] +name = "zerofrom" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50cc42e0333e05660c3587f3bf9d0478688e15d870fab3346451ce7f8c9fbea5" +dependencies = [ + "zerofrom-derive", +] + +[[package]] +name = "zerofrom-derive" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + +[[package]] +name = "zeroize" +version = "1.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0" + +[[package]] +name = "zerotrie" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a59c17a5562d507e4b54960e8569ebee33bee890c70aa3fe7b97e85a9fd7851" +dependencies = [ + "displaydoc", + "yoke", + "zerofrom", +] + +[[package]] +name = "zerovec" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c28719294829477f525be0186d13efa9a3c602f7ec202ca9e353d310fb9a002" +dependencies = [ + "yoke", + "zerofrom", + "zerovec-derive", +] + +[[package]] +name = "zerovec-derive" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "zmij" version = "1.0.21" diff --git a/lib/rs-edge-driver/Cargo.toml b/lib/rs-edge-driver/Cargo.toml index 4f2106b8..58cf631d 100644 --- a/lib/rs-edge-driver/Cargo.toml +++ b/lib/rs-edge-driver/Cargo.toml @@ -8,6 +8,8 @@ async-trait = "0.1" bytes = "1" serde = { version = "1", features = ["derive"] } serde_json = "1" +rumqttc = "0.24" tokio = { version = "1", features = ["full"] } tracing = "0.1" thiserror = "2" +url = "2" diff --git a/lib/rs-edge-driver/src/driver.rs b/lib/rs-edge-driver/src/driver.rs index cfa7ce03..e81357c6 100644 --- a/lib/rs-edge-driver/src/driver.rs +++ b/lib/rs-edge-driver/src/driver.rs @@ -2,10 +2,12 @@ use std::collections::HashMap; use std::marker::PhantomData; use bytes::Bytes; +use rumqttc::{AsyncClient, Event, Incoming, MqttOptions, QoS}; +use serde::Deserialize; use tokio::sync::mpsc; use crate::config::DriverConfig; -use crate::error::Error; +use crate::error::{ConnectError, Error}; use crate::handler::Handler; use crate::status::Status; @@ -45,6 +47,13 @@ impl DriverHandle { } } +/// The address configuration packet sent by the edge agent. +#[derive(Deserialize)] +struct AddrConfig { + version: u32, + addrs: HashMap, +} + /// The driver runtime. Manages the MQTT connection to the edge agent, /// handler lifecycle, and data flow. /// @@ -87,6 +96,16 @@ impl Driver { } } + /// Build the `fpEdge1/{id}/{msg}` topic string. + fn topic(&self, msg: &str) -> String { + format!("fpEdge1/{}/{}", self.config.username, msg) + } + + /// Build the `fpEdge1/{id}/{msg}/{data}` topic string. + fn topic_with(&self, msg: &str, data: &str) -> String { + format!("fpEdge1/{}/{}/{}", self.config.username, msg, data) + } + /// Run the driver's main event loop. /// /// This connects to the edge agent's MQTT broker, listens for @@ -104,28 +123,396 @@ impl Driver { /// **Async**: call `Handler::subscribe`, receive data via `DriverHandle::publish` /// 6. On reconfiguration: call `Handler::close`, restart from step 2 pub async fn run(&mut self) -> Result<(), Error> { - // TODO: implement MQTT connection and event loop - // - // This will involve: - // - Connecting to the MQTT broker at self.config.mqtt_url - // - Setting last-will to fpEdge1/{id}/status → "DOWN" - // - Subscribing to fpEdge1/{id}/active, conf, addr, poll, cmd/# - // - Publishing status updates - // - Dispatching incoming messages to handler methods - // - Managing reconnection on device connection failures - tracing::info!( mqtt = %self.config.mqtt_url, id = %self.config.username, "driver starting" ); - todo!("MQTT event loop not yet implemented") + let (tx, mut rx) = mpsc::unbounded_channel::(); + self.run_loop(&tx, &mut rx).await + } + + async fn run_loop( + &mut self, + tx: &mpsc::UnboundedSender, + rx: &mut mpsc::UnboundedReceiver, + ) -> Result<(), Error> { + loop { + tracing::info!("connecting to MQTT broker"); + let result = self.mqtt_session(tx, rx).await; + match result { + Ok(()) => { + tracing::info!("MQTT session ended cleanly"); + } + Err(e) => { + tracing::error!(%e, "MQTT session error"); + } + } + + // Clean up handler on disconnect + if let Some(handler) = self.handler.as_mut() { + handler.close().await; + } + self.handler = None; + self.clear_addrs(); + self.status = Status::Down; + + tracing::info!( + delay = ?self.config.reconnect_delay, + "reconnecting to MQTT broker" + ); + tokio::time::sleep(self.config.reconnect_delay).await; + } + } + + /// Run a single MQTT session. Returns when the connection is lost. + async fn mqtt_session( + &mut self, + tx: &mpsc::UnboundedSender, + rx: &mut mpsc::UnboundedReceiver, + ) -> Result<(), Error> { + let (client, mut eventloop) = self.connect_mqtt()?; + + // Subscribe to our control topics + let subs = ["active", "conf", "addr", "poll", "cmd/#"]; + for sub in &subs { + client + .subscribe(self.topic(sub), QoS::AtMostOnce) + .await + .map_err(|e| Error::Mqtt(e.to_string()))?; + } + + self.publish_status(&client).await; + + // Main event loop: select between MQTT events and handler data + loop { + tokio::select! { + event = eventloop.poll() => { + match event { + Ok(Event::Incoming(incoming)) => { + self.handle_incoming(&client, tx, incoming).await?; + } + Ok(_) => {} + Err(e) => { + return Err(Error::Mqtt(e.to_string())); + } + } + } + Some(event) = rx.recv() => { + self.handle_driver_event(&client, event).await; + } + } + } + } + + /// Create the MQTT client and set up connection options. + fn connect_mqtt(&self) -> Result<(AsyncClient, rumqttc::EventLoop), Error> { + let url = url::Url::parse(&self.config.mqtt_url) + .map_err(|e| Error::Mqtt(format!("invalid MQTT URL: {e}")))?; + + let host = url + .host_str() + .ok_or_else(|| Error::Mqtt("MQTT URL has no host".into()))?; + let port = url.port().unwrap_or(1883); + + let mut opts = MqttOptions::new(&self.config.username, host, port); + opts.set_credentials(&self.config.username, &self.config.password); + opts.set_last_will(rumqttc::LastWill::new( + self.topic("status"), + "DOWN", + QoS::AtLeastOnce, + true, + )); + opts.set_keep_alive(std::time::Duration::from_secs(30)); + + let (client, eventloop) = AsyncClient::new(opts, 64); + Ok((client, eventloop)) + } + + /// Handle an incoming MQTT message. + async fn handle_incoming( + &mut self, + client: &AsyncClient, + tx: &mpsc::UnboundedSender, + incoming: Incoming, + ) -> Result<(), Error> { + let publish = match incoming { + Incoming::Publish(p) => p, + Incoming::ConnAck(_) => { + self.set_status(Status::Ready); + self.publish_status(client).await; + return Ok(()); + } + _ => return Ok(()), + }; + + let prefix = self.topic(""); + let suffix = publish + .topic + .strip_prefix(&prefix) + .unwrap_or(&publish.topic); + + // Split into message type and optional data + let (msg, data) = match suffix.split_once('/') { + Some((m, d)) => (m, Some(d)), + None => (suffix, None), + }; + + match msg { + "active" => self.on_active(&publish.payload), + "conf" => self.on_conf(client, tx, &publish.payload).await, + "addr" => self.on_addr(client, &publish.payload).await, + "poll" => self.on_poll(client, &publish.payload).await, + "cmd" => { + if let Some(name) = data { + self.on_cmd(name, Bytes::from(publish.payload.to_vec())) + .await; + } + } + other => { + tracing::warn!(msg = other, "unhandled message type"); + } + } + + Ok(()) + } + + /// Handle `active` message from the edge agent. + fn on_active(&mut self, payload: &[u8]) { + if payload == b"ONLINE" { + tracing::info!("edge agent online"); + self.set_status(Status::Ready); + } + } + + /// Handle `conf` message — create a new handler. + async fn on_conf( + &mut self, + client: &AsyncClient, + tx: &mpsc::UnboundedSender, + payload: &[u8], + ) { + let conf: serde_json::Value = match serde_json::from_slice(payload) { + Ok(v) => v, + Err(e) => { + tracing::error!(%e, "failed to parse conf JSON"); + self.set_status(Status::Conf); + self.publish_status(client).await; + return; + } + }; + + tracing::debug!(?conf, "received device configuration"); + + // Close old handler + if let Some(handler) = self.handler.as_mut() { + handler.close().await; + } + self.handler = None; + self.clear_addrs(); + + // Create new handler + let handle = DriverHandle::new(tx.clone()); + match H::create(handle, conf) { + Ok(handler) => { + self.handler = Some(handler); + self.connect_handler(client).await; + } + Err(e) => { + tracing::error!(reason = %e, "handler rejected configuration"); + self.set_status(Status::Conf); + self.publish_status(client).await; + } + } + } + + /// Attempt to connect the handler to its southbound device, + /// retrying on failure after the configured delay. + async fn connect_handler(&mut self, client: &AsyncClient) { + loop { + let handler = match self.handler.as_mut() { + Some(h) => h, + None => return, + }; + + match handler.connect().await { + Ok(()) => { + self.set_status(Status::Up); + self.publish_status(client).await; + self.try_subscribe().await; + return; + } + Err(ConnectError::ConnectionFailed) => { + tracing::warn!("handler connection failed, retrying"); + self.set_status(Status::Conn); + self.publish_status(client).await; + } + Err(ConnectError::AuthFailed) => { + tracing::warn!("handler auth failed, retrying"); + self.set_status(Status::Auth); + self.publish_status(client).await; + } + } + + tokio::time::sleep(self.config.reconnect_delay).await; + } + } + + /// Handle `addr` message — parse and store address mappings. + async fn on_addr(&mut self, client: &AsyncClient, payload: &[u8]) { + let pkt: AddrConfig = match serde_json::from_slice(payload) { + Ok(v) => v, + Err(e) => { + tracing::error!(%e, "failed to parse addr JSON"); + self.set_status(Status::Addr); + self.publish_status(client).await; + return; + } + }; + + if pkt.version != 1 { + tracing::error!(version = pkt.version, "unsupported addr config version"); + self.set_status(Status::Addr); + self.publish_status(client).await; + return; + } + + let handler = match self.handler.as_ref() { + Some(h) => h, + None => { + tracing::warn!("received addrs without handler"); + return; + } + }; + + // Parse all addresses into a temporary vec first to avoid + // holding an immutable borrow on self.handler while mutating + // self.addrs/topics. + let mut parsed_addrs = Vec::with_capacity(pkt.addrs.len()); + for (topic, raw_addr) in &pkt.addrs { + match handler.parse_addr(raw_addr) { + Some(parsed) => { + parsed_addrs.push((topic.clone(), parsed)); + } + None => { + tracing::error!(addr = raw_addr, "handler rejected address"); + self.clear_addrs(); + self.set_status(Status::Addr); + self.publish_status(client).await; + return; + } + } + } + + self.clear_addrs(); + for (topic, parsed) in parsed_addrs { + self.topics.insert(parsed.clone(), topic.clone()); + self.addrs.insert(topic, parsed); + } + + tracing::info!(count = self.addrs.len(), "addresses configured"); + self.try_subscribe().await; + } + + /// Handle `poll` message — poll the handler for each requested topic. + async fn on_poll(&mut self, client: &AsyncClient, payload: &[u8]) { + let payload_str = match std::str::from_utf8(payload) { + Ok(s) => s, + Err(_) => return, + }; + + // Collect (topic, addr) pairs first so we don't hold a borrow + // on self.addrs while also borrowing self.handler mutably. + let polls: Vec<(String, H::Addr)> = payload_str + .lines() + .filter_map(|line| { + let topic = line.trim(); + if topic.is_empty() { + return None; + } + match self.addrs.get(topic) { + Some(a) => Some((topic.to_owned(), a.clone())), + None => { + tracing::warn!(topic, "poll for unknown topic"); + None + } + } + }) + .collect(); + + let handler = match self.handler.as_mut() { + Some(h) => h, + None => return, + }; + + let id = &self.config.username; + for (topic, addr) in &polls { + if let Some(data) = handler.poll(addr).await { + let mtopic = format!("fpEdge1/{id}/data/{topic}"); + if let Err(e) = client.publish(&mtopic, QoS::AtMostOnce, false, data).await { + tracing::error!(%e, %topic, "failed to publish poll data"); + } + } + } + } + + /// Handle `cmd` message — forward to handler. + async fn on_cmd(&mut self, command: &str, payload: Bytes) { + if let Some(handler) = self.handler.as_mut() { + handler.cmd(command, payload).await; + } + } + + /// Try to call handler.subscribe() if we're UP and have addresses. + async fn try_subscribe(&mut self) { + if self.status != Status::Up || self.addrs.is_empty() { + return; + } + + let specs: Vec = self.addrs.values().cloned().collect(); + + if let Some(handler) = self.handler.as_mut() { + if !handler.subscribe(&specs).await { + tracing::error!("handler subscription failed"); + } + } + } + + /// Handle a DriverEvent from the handler channel (async data publish). + async fn handle_driver_event(&self, client: &AsyncClient, event: DriverEvent) { + match event { + DriverEvent::Data { topic, payload } => { + // Look up the MQTT data topic from the address topic + let mtopic = self.topic_with("data", &topic); + if let Err(e) = client + .publish(&mtopic, QoS::AtMostOnce, false, payload) + .await + { + tracing::error!(%e, topic, "failed to publish async data"); + } + } + } + } + + fn clear_addrs(&mut self) { + self.addrs.clear(); + self.topics.clear(); } fn set_status(&mut self, status: Status) { self.status = status; tracing::info!(%status, "driver status changed"); - // TODO: publish to fpEdge1/{id}/status + } + + async fn publish_status(&self, client: &AsyncClient) { + let topic = self.topic("status"); + let payload = self.status.to_string(); + if let Err(e) = client + .publish(&topic, QoS::AtLeastOnce, true, payload) + .await + { + tracing::error!(%e, "failed to publish status"); + } } } From 1c947baca18d8ec23fae733d8e75e495ad050c85 Mon Sep 17 00:00:00 2001 From: Kavan Price Date: Sun, 15 Mar 2026 18:52:28 +0000 Subject: [PATCH 5/8] Add example test driver --- lib/rs-edge-driver/Cargo.lock | 66 +++++++++++ lib/rs-edge-driver/Cargo.toml | 7 ++ lib/rs-edge-driver/examples/test_driver.rs | 123 +++++++++++++++++++++ 3 files changed, 196 insertions(+) create mode 100644 lib/rs-edge-driver/examples/test_driver.rs diff --git a/lib/rs-edge-driver/Cargo.lock b/lib/rs-edge-driver/Cargo.lock index 81dd20f4..0edcaf25 100644 --- a/lib/rs-edge-driver/Cargo.lock +++ b/lib/rs-edge-driver/Cargo.lock @@ -253,6 +253,12 @@ version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2" +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + [[package]] name = "libc" version = "0.2.183" @@ -297,6 +303,15 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "nu-ansi-term" +version = "0.50.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "once_cell" version = "1.21.4" @@ -406,6 +421,7 @@ dependencies = [ "thiserror 2.0.18", "tokio", "tracing", + "tracing-subscriber", "url", ] @@ -564,6 +580,15 @@ dependencies = [ "zmij", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -685,6 +710,15 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + [[package]] name = "tinystr" version = "0.8.2" @@ -763,6 +797,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319" +dependencies = [ + "nu-ansi-term", + "sharded-slab", + "smallvec", + "thread_local", + "tracing-core", + "tracing-log", ] [[package]] @@ -795,6 +855,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" diff --git a/lib/rs-edge-driver/Cargo.toml b/lib/rs-edge-driver/Cargo.toml index 58cf631d..75f3a630 100644 --- a/lib/rs-edge-driver/Cargo.toml +++ b/lib/rs-edge-driver/Cargo.toml @@ -13,3 +13,10 @@ tokio = { version = "1", features = ["full"] } tracing = "0.1" thiserror = "2" url = "2" + +[dev-dependencies] +tracing-subscriber = "0.3" + +[[example]] +name = "test_driver" +path = "examples/test_driver.rs" diff --git a/lib/rs-edge-driver/examples/test_driver.rs b/lib/rs-edge-driver/examples/test_driver.rs new file mode 100644 index 00000000..27868882 --- /dev/null +++ b/lib/rs-edge-driver/examples/test_driver.rs @@ -0,0 +1,123 @@ +use std::f64::consts::TAU; +use std::time::Instant; + +use async_trait::async_trait; +use bytes::{BufMut, Bytes, BytesMut}; +use rs_edge_driver::*; + +/// A parsed address spec describing a waveform to generate. +#[derive(Clone, Hash, PartialEq, Eq)] +struct WaveSpec { + func: WaveFunc, + period_ms: u64, + amplitude_bits: u64, + packing: Packing, +} + +#[derive(Clone, Copy, Hash, PartialEq, Eq)] +enum WaveFunc { + Const, + Sin, + Saw, +} + +#[derive(Clone, Copy, Hash, PartialEq, Eq)] +enum Packing { + DoubleBE, + DoubleLE, + FloatBE, + FloatLE, +} + +impl WaveFunc { + fn eval(&self, period: f64, amplitude: f64, t: f64) -> f64 { + match self { + WaveFunc::Const => amplitude, + WaveFunc::Sin => amplitude * (TAU * (t / period)).sin(), + WaveFunc::Saw => (amplitude / period) * (t % period), + } + } +} + +impl Packing { + fn pack(&self, val: f64) -> Bytes { + let mut buf = BytesMut::with_capacity(8); + match self { + Packing::DoubleBE => buf.put_f64(val), + Packing::DoubleLE => buf.put_f64_le(val), + Packing::FloatBE => buf.put_f32(val as f32), + Packing::FloatLE => buf.put_f32_le(val as f32), + } + buf.freeze() + } +} + +struct TestHandler { + start: Instant, +} + +#[async_trait] +impl Handler for TestHandler { + type Addr = WaveSpec; + + fn create(_handle: DriverHandle, _config: serde_json::Value) -> Result { + Ok(Self { + start: Instant::now(), + }) + } + + fn parse_addr(&self, addr: &str) -> Option { + let parts: Vec<&str> = addr.split(':').collect(); + if parts.len() != 4 { + return None; + } + + let func = match parts[0] { + "const" => WaveFunc::Const, + "sin" => WaveFunc::Sin, + "saw" => WaveFunc::Saw, + _ => return None, + }; + + let period_ms = parts[1].parse().ok()?; + let amplitude_bits = parts[2].parse().ok()?; + + let packing = match parts[3] { + "bd" => Packing::DoubleBE, + "ld" => Packing::DoubleLE, + "bf" => Packing::FloatBE, + "lf" => Packing::FloatLE, + _ => return None, + }; + + Some(WaveSpec { + func, + period_ms, + amplitude_bits, + packing, + }) + } + + async fn connect(&mut self) -> Result<(), ConnectError> { + Ok(()) + } + + async fn poll(&mut self, spec: &WaveSpec) -> Option { + let t = self.start.elapsed().as_secs_f64() * 1000.0; + let val = spec + .func + .eval(spec.period_ms as f64, spec.amplitude_bits as f64, t); + Some(spec.packing.pack(val)) + } + + async fn close(&mut self) {} +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + tracing_subscriber::fmt::init(); + + let config = DriverConfig::from_env()?; + let mut driver = Driver::::new(config); + driver.run().await +} From b4423ff426b7208864602e90187f44af00ffca75 Mon Sep 17 00:00:00 2001 From: Kavan Price Date: Mon, 16 Mar 2026 22:00:41 +0000 Subject: [PATCH 6/8] Abstract out repeated calls --- lib/rs-edge-driver/src/driver.rs | 64 +++++++++++++------------------- 1 file changed, 26 insertions(+), 38 deletions(-) diff --git a/lib/rs-edge-driver/src/driver.rs b/lib/rs-edge-driver/src/driver.rs index e81357c6..05784055 100644 --- a/lib/rs-edge-driver/src/driver.rs +++ b/lib/rs-edge-driver/src/driver.rs @@ -151,11 +151,7 @@ impl Driver { } // Clean up handler on disconnect - if let Some(handler) = self.handler.as_mut() { - handler.close().await; - } - self.handler = None; - self.clear_addrs(); + self.close_handler().await; self.status = Status::Down; tracing::info!( @@ -183,7 +179,7 @@ impl Driver { .map_err(|e| Error::Mqtt(e.to_string()))?; } - self.publish_status(&client).await; + self.set_status(Status::Ready, &client).await; // Main event loop: select between MQTT events and handler data loop { @@ -240,8 +236,7 @@ impl Driver { let publish = match incoming { Incoming::Publish(p) => p, Incoming::ConnAck(_) => { - self.set_status(Status::Ready); - self.publish_status(client).await; + self.set_status(Status::Ready, client).await; return Ok(()); } _ => return Ok(()), @@ -260,7 +255,7 @@ impl Driver { }; match msg { - "active" => self.on_active(&publish.payload), + "active" => self.on_active(client, &publish.payload).await, "conf" => self.on_conf(client, tx, &publish.payload).await, "addr" => self.on_addr(client, &publish.payload).await, "poll" => self.on_poll(client, &publish.payload).await, @@ -279,10 +274,10 @@ impl Driver { } /// Handle `active` message from the edge agent. - fn on_active(&mut self, payload: &[u8]) { + async fn on_active(&mut self, client: &AsyncClient, payload: &[u8]) { if payload == b"ONLINE" { tracing::info!("edge agent online"); - self.set_status(Status::Ready); + self.set_status(Status::Ready, client).await; } } @@ -297,20 +292,14 @@ impl Driver { Ok(v) => v, Err(e) => { tracing::error!(%e, "failed to parse conf JSON"); - self.set_status(Status::Conf); - self.publish_status(client).await; + self.set_status(Status::Conf, client).await; return; } }; tracing::debug!(?conf, "received device configuration"); - // Close old handler - if let Some(handler) = self.handler.as_mut() { - handler.close().await; - } - self.handler = None; - self.clear_addrs(); + self.close_handler().await; // Create new handler let handle = DriverHandle::new(tx.clone()); @@ -321,8 +310,7 @@ impl Driver { } Err(e) => { tracing::error!(reason = %e, "handler rejected configuration"); - self.set_status(Status::Conf); - self.publish_status(client).await; + self.set_status(Status::Conf, client).await; } } } @@ -338,20 +326,17 @@ impl Driver { match handler.connect().await { Ok(()) => { - self.set_status(Status::Up); - self.publish_status(client).await; + self.set_status(Status::Up, client).await; self.try_subscribe().await; return; } Err(ConnectError::ConnectionFailed) => { tracing::warn!("handler connection failed, retrying"); - self.set_status(Status::Conn); - self.publish_status(client).await; + self.set_status(Status::Conn, client).await; } Err(ConnectError::AuthFailed) => { tracing::warn!("handler auth failed, retrying"); - self.set_status(Status::Auth); - self.publish_status(client).await; + self.set_status(Status::Auth, client).await; } } @@ -365,16 +350,14 @@ impl Driver { Ok(v) => v, Err(e) => { tracing::error!(%e, "failed to parse addr JSON"); - self.set_status(Status::Addr); - self.publish_status(client).await; + self.set_status(Status::Addr, client).await; return; } }; if pkt.version != 1 { tracing::error!(version = pkt.version, "unsupported addr config version"); - self.set_status(Status::Addr); - self.publish_status(client).await; + self.set_status(Status::Addr, client).await; return; } @@ -398,8 +381,7 @@ impl Driver { None => { tracing::error!(addr = raw_addr, "handler rejected address"); self.clear_addrs(); - self.set_status(Status::Addr); - self.publish_status(client).await; + self.set_status(Status::Addr, client).await; return; } } @@ -500,14 +482,20 @@ impl Driver { self.topics.clear(); } - fn set_status(&mut self, status: Status) { - self.status = status; - tracing::info!(%status, "driver status changed"); + /// Close the current handler (if any) and reset address state. + async fn close_handler(&mut self) { + if let Some(handler) = self.handler.as_mut() { + handler.close().await; + } + self.handler = None; + self.clear_addrs(); } - async fn publish_status(&self, client: &AsyncClient) { + async fn set_status(&mut self, status: Status, client: &AsyncClient) { + self.status = status; + tracing::info!(%status, "driver status changed"); let topic = self.topic("status"); - let payload = self.status.to_string(); + let payload = status.to_string(); if let Err(e) = client .publish(&topic, QoS::AtLeastOnce, true, payload) .await From d6cca181dbf7e1fcb92127d4f6a1330a98539e20 Mon Sep 17 00:00:00 2001 From: Kavan Price Date: Mon, 16 Mar 2026 23:08:42 +0000 Subject: [PATCH 7/8] Fix blocking polling --- lib/rs-edge-driver/Cargo.lock | 59 +++++++++ lib/rs-edge-driver/Cargo.toml | 1 + lib/rs-edge-driver/src/config.rs | 5 + lib/rs-edge-driver/src/driver.rs | 209 +++++++++++++++++++++++++------ 4 files changed, 235 insertions(+), 39 deletions(-) diff --git a/lib/rs-edge-driver/Cargo.lock b/lib/rs-edge-driver/Cargo.lock index 0edcaf25..c89ff67a 100644 --- a/lib/rs-edge-driver/Cargo.lock +++ b/lib/rs-edge-driver/Cargo.lock @@ -104,12 +104,65 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b147ee9d1f6d097cef9ce628cd2ee62288d963e16fb287bd9286455b241382d" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d" +dependencies = [ + "futures-core", + "futures-sink", +] + [[package]] name = "futures-core" version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" +[[package]] +name = "futures-executor" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf29c38818342a3b26b5b923639e7b1f4a61fc5e76102d4b1981c6dc7a7579d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" + +[[package]] +name = "futures-macro" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.32" @@ -128,8 +181,13 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "slab", ] @@ -415,6 +473,7 @@ version = "0.1.0" dependencies = [ "async-trait", "bytes", + "futures", "rumqttc", "serde", "serde_json", diff --git a/lib/rs-edge-driver/Cargo.toml b/lib/rs-edge-driver/Cargo.toml index 75f3a630..a9ad93b4 100644 --- a/lib/rs-edge-driver/Cargo.toml +++ b/lib/rs-edge-driver/Cargo.toml @@ -6,6 +6,7 @@ edition = "2024" [dependencies] async-trait = "0.1" bytes = "1" +futures = "0.3" serde = { version = "1", features = ["derive"] } serde_json = "1" rumqttc = "0.24" diff --git a/lib/rs-edge-driver/src/config.rs b/lib/rs-edge-driver/src/config.rs index bc2986f9..f17b7073 100644 --- a/lib/rs-edge-driver/src/config.rs +++ b/lib/rs-edge-driver/src/config.rs @@ -16,6 +16,10 @@ pub struct DriverConfig { pub password: String, /// How long to wait before retrying a failed device connection. pub reconnect_delay: Duration, + /// Whether to poll addresses serially (one at a time) rather than + /// in parallel. Serial mode is needed for protocols like Modbus + /// that cannot handle concurrent requests. + pub serial_poll: bool, } impl DriverConfig { @@ -29,6 +33,7 @@ impl DriverConfig { username: required_env("EDGE_USERNAME")?, password: required_env("EDGE_PASSWORD")?, reconnect_delay: Duration::from_secs(5), + serial_poll: false, }) } } diff --git a/lib/rs-edge-driver/src/driver.rs b/lib/rs-edge-driver/src/driver.rs index 05784055..ec13382f 100644 --- a/lib/rs-edge-driver/src/driver.rs +++ b/lib/rs-edge-driver/src/driver.rs @@ -1,16 +1,25 @@ use std::collections::HashMap; use std::marker::PhantomData; +use std::sync::Arc; +use std::time::Duration; use bytes::Bytes; use rumqttc::{AsyncClient, Event, Incoming, MqttOptions, QoS}; use serde::Deserialize; -use tokio::sync::mpsc; +use tokio::sync::{Mutex, mpsc}; use crate::config::DriverConfig; use crate::error::{ConnectError, Error}; use crate::handler::Handler; use crate::status::Status; +/// Maximum number of poll batches that can queue before we start +/// dropping requests (backpressure). Matches the JS/Python versions. +const POLL_QUEUE_MAX: usize = 20; + +/// Timeout for a single poll operation. +const POLL_TIMEOUT: Duration = Duration::from_secs(30); + /// A handle given to [`Handler`] implementations so they can push /// data back to the driver asynchronously. /// @@ -75,7 +84,7 @@ struct AddrConfig { pub struct Driver { config: DriverConfig, status: Status, - handler: Option, + handler: Option>>, /// topic → parsed address addrs: HashMap, /// parsed address → topic (reverse lookup for async publishing) @@ -83,7 +92,7 @@ pub struct Driver { _marker: PhantomData, } -impl Driver { +impl Driver { /// Create a new driver with the given configuration. pub fn new(config: DriverConfig) -> Self { Self { @@ -181,17 +190,35 @@ impl Driver { self.set_status(Status::Ready, &client).await; + // Create the bounded poll channel for this session + let (poll_tx, poll_rx) = mpsc::channel::>(POLL_QUEUE_MAX); + + // Shared handler reference for the poll worker. Updated by + // on_conf when a new handler is created. + let poll_handler: Arc>>>> = Arc::new(Mutex::new(None)); + + // Spawn the poll worker task + let poll_worker = tokio::spawn(Self::poll_worker( + poll_rx, + poll_handler.clone(), + client.clone(), + self.config.username.clone(), + self.config.serial_poll, + )); + // Main event loop: select between MQTT events and handler data - loop { + let result: Result<(), Error> = loop { tokio::select! { event = eventloop.poll() => { match event { Ok(Event::Incoming(incoming)) => { - self.handle_incoming(&client, tx, incoming).await?; + self.handle_incoming( + &client, tx, &poll_tx, &poll_handler, incoming, + ).await?; } Ok(_) => {} Err(e) => { - return Err(Error::Mqtt(e.to_string())); + break Err(Error::Mqtt(e.to_string())); } } } @@ -199,9 +226,107 @@ impl Driver { self.handle_driver_event(&client, event).await; } } + }; + + // Clean up the poll worker when the session ends + poll_worker.abort(); + result + } + + /// Background task that processes poll requests from the channel. + async fn poll_worker( + mut poll_rx: mpsc::Receiver>, + handler_ref: Arc>>>>, + client: AsyncClient, + id: String, + serial: bool, + ) { + while let Some(polls) = poll_rx.recv().await { + let handler_arc = { + let guard = handler_ref.lock().await; + match guard.clone() { + Some(h) => h, + None => continue, + } + }; + + if serial { + Self::poll_serial(&handler_arc, &client, &id, &polls).await; + } else { + Self::poll_parallel(&handler_arc, &client, &id, polls).await; + } } } + /// Poll addresses one at a time, holding the handler lock for each. + async fn poll_serial( + handler: &Arc>, + client: &AsyncClient, + id: &str, + polls: &[(String, H::Addr)], + ) { + for (topic, addr) in polls { + let result = tokio::time::timeout(POLL_TIMEOUT, async { + let mut h = handler.lock().await; + h.poll(addr).await + }) + .await; + + match result { + Ok(Some(data)) => { + let mtopic = format!("fpEdge1/{id}/data/{topic}"); + if let Err(e) = client.publish(&mtopic, QoS::AtMostOnce, false, data).await { + tracing::error!(%e, %topic, "failed to publish poll data"); + } + } + Ok(None) => {} + Err(_) => { + tracing::warn!(%topic, "poll timed out"); + } + } + } + } + + /// Poll all addresses concurrently, each in its own task. + async fn poll_parallel( + handler: &Arc>, + client: &AsyncClient, + id: &str, + polls: Vec<(String, H::Addr)>, + ) { + let mut tasks = Vec::with_capacity(polls.len()); + + for (topic, addr) in polls { + let handler = handler.clone(); + let client = client.clone(); + let id = id.to_owned(); + + tasks.push(tokio::spawn(async move { + let result = tokio::time::timeout(POLL_TIMEOUT, async { + let mut h = handler.lock().await; + h.poll(&addr).await + }) + .await; + + match result { + Ok(Some(data)) => { + let mtopic = format!("fpEdge1/{id}/data/{topic}"); + if let Err(e) = client.publish(&mtopic, QoS::AtMostOnce, false, data).await + { + tracing::error!(%e, %topic, "failed to publish poll data"); + } + } + Ok(None) => {} + Err(_) => { + tracing::warn!(%topic, "poll timed out"); + } + } + })); + } + + futures::future::join_all(tasks).await; + } + /// Create the MQTT client and set up connection options. fn connect_mqtt(&self) -> Result<(AsyncClient, rumqttc::EventLoop), Error> { let url = url::Url::parse(&self.config.mqtt_url) @@ -231,6 +356,8 @@ impl Driver { &mut self, client: &AsyncClient, tx: &mpsc::UnboundedSender, + poll_tx: &mpsc::Sender>, + poll_handler: &Arc>>>>, incoming: Incoming, ) -> Result<(), Error> { let publish = match incoming { @@ -256,9 +383,12 @@ impl Driver { match msg { "active" => self.on_active(client, &publish.payload).await, - "conf" => self.on_conf(client, tx, &publish.payload).await, + "conf" => { + self.on_conf(client, tx, poll_handler, &publish.payload) + .await; + } "addr" => self.on_addr(client, &publish.payload).await, - "poll" => self.on_poll(client, &publish.payload).await, + "poll" => self.on_poll(poll_tx, &publish.payload), "cmd" => { if let Some(name) = data { self.on_cmd(name, Bytes::from(publish.payload.to_vec())) @@ -286,6 +416,7 @@ impl Driver { &mut self, client: &AsyncClient, tx: &mpsc::UnboundedSender, + poll_handler: &Arc>>>>, payload: &[u8], ) { let conf: serde_json::Value = match serde_json::from_slice(payload) { @@ -300,12 +431,16 @@ impl Driver { tracing::debug!(?conf, "received device configuration"); self.close_handler().await; + // Clear the poll worker's handler reference + *poll_handler.lock().await = None; // Create new handler let handle = DriverHandle::new(tx.clone()); match H::create(handle, conf) { Ok(handler) => { - self.handler = Some(handler); + let handler = Arc::new(Mutex::new(handler)); + self.handler = Some(handler.clone()); + *poll_handler.lock().await = Some(handler); self.connect_handler(client).await; } Err(e) => { @@ -319,12 +454,13 @@ impl Driver { /// retrying on failure after the configured delay. async fn connect_handler(&mut self, client: &AsyncClient) { loop { - let handler = match self.handler.as_mut() { + let handler = match self.handler.as_ref() { Some(h) => h, None => return, }; - match handler.connect().await { + let result = handler.lock().await.connect().await; + match result { Ok(()) => { self.set_status(Status::Up, client).await; self.try_subscribe().await; @@ -369,23 +505,25 @@ impl Driver { } }; - // Parse all addresses into a temporary vec first to avoid - // holding an immutable borrow on self.handler while mutating - // self.addrs/topics. + // Parse all addresses while holding the handler lock, collecting + // into a temporary vec to release the lock before mutating self. + let guard = handler.lock().await; let mut parsed_addrs = Vec::with_capacity(pkt.addrs.len()); for (topic, raw_addr) in &pkt.addrs { - match handler.parse_addr(raw_addr) { + match guard.parse_addr(raw_addr) { Some(parsed) => { parsed_addrs.push((topic.clone(), parsed)); } None => { tracing::error!(addr = raw_addr, "handler rejected address"); + drop(guard); self.clear_addrs(); self.set_status(Status::Addr, client).await; return; } } } + drop(guard); self.clear_addrs(); for (topic, parsed) in parsed_addrs { @@ -397,15 +535,17 @@ impl Driver { self.try_subscribe().await; } - /// Handle `poll` message — poll the handler for each requested topic. - async fn on_poll(&mut self, client: &AsyncClient, payload: &[u8]) { + /// Handle `poll` message — resolve topics and send to the poll worker. + /// + /// This returns immediately without awaiting any handler calls, + /// keeping the MQTT event loop responsive. If the poll queue is + /// full, the request is dropped (backpressure). + fn on_poll(&self, poll_tx: &mpsc::Sender>, payload: &[u8]) { let payload_str = match std::str::from_utf8(payload) { Ok(s) => s, Err(_) => return, }; - // Collect (topic, addr) pairs first so we don't hold a borrow - // on self.addrs while also borrowing self.handler mutably. let polls: Vec<(String, H::Addr)> = payload_str .lines() .filter_map(|line| { @@ -423,26 +563,19 @@ impl Driver { }) .collect(); - let handler = match self.handler.as_mut() { - Some(h) => h, - None => return, - }; + if polls.is_empty() { + return; + } - let id = &self.config.username; - for (topic, addr) in &polls { - if let Some(data) = handler.poll(addr).await { - let mtopic = format!("fpEdge1/{id}/data/{topic}"); - if let Err(e) = client.publish(&mtopic, QoS::AtMostOnce, false, data).await { - tracing::error!(%e, %topic, "failed to publish poll data"); - } - } + if poll_tx.try_send(polls).is_err() { + tracing::warn!("poll queue full, dropping request"); } } /// Handle `cmd` message — forward to handler. async fn on_cmd(&mut self, command: &str, payload: Bytes) { - if let Some(handler) = self.handler.as_mut() { - handler.cmd(command, payload).await; + if let Some(handler) = self.handler.as_ref() { + handler.lock().await.cmd(command, payload).await; } } @@ -454,8 +587,8 @@ impl Driver { let specs: Vec = self.addrs.values().cloned().collect(); - if let Some(handler) = self.handler.as_mut() { - if !handler.subscribe(&specs).await { + if let Some(handler) = self.handler.as_ref() { + if !handler.lock().await.subscribe(&specs).await { tracing::error!("handler subscription failed"); } } @@ -465,7 +598,6 @@ impl Driver { async fn handle_driver_event(&self, client: &AsyncClient, event: DriverEvent) { match event { DriverEvent::Data { topic, payload } => { - // Look up the MQTT data topic from the address topic let mtopic = self.topic_with("data", &topic); if let Err(e) = client .publish(&mtopic, QoS::AtMostOnce, false, payload) @@ -484,10 +616,9 @@ impl Driver { /// Close the current handler (if any) and reset address state. async fn close_handler(&mut self) { - if let Some(handler) = self.handler.as_mut() { - handler.close().await; + if let Some(handler) = self.handler.take() { + handler.lock().await.close().await; } - self.handler = None; self.clear_addrs(); } From b38f3ed47d879d059b32827fb411043867abe430 Mon Sep 17 00:00:00 2001 From: Kavan Price Date: Tue, 17 Mar 2026 00:25:29 +0000 Subject: [PATCH 8/8] Fix instant broker disconnection --- lib/rs-edge-driver/src/driver.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/rs-edge-driver/src/driver.rs b/lib/rs-edge-driver/src/driver.rs index ec13382f..042fb9fd 100644 --- a/lib/rs-edge-driver/src/driver.rs +++ b/lib/rs-edge-driver/src/driver.rs @@ -343,7 +343,7 @@ impl Driver { self.topic("status"), "DOWN", QoS::AtLeastOnce, - true, + false, )); opts.set_keep_alive(std::time::Duration::from_secs(30)); @@ -628,7 +628,7 @@ impl Driver { let topic = self.topic("status"); let payload = status.to_string(); if let Err(e) = client - .publish(&topic, QoS::AtLeastOnce, true, payload) + .publish(&topic, QoS::AtLeastOnce, false, payload) .await { tracing::error!(%e, "failed to publish status");