From beaed24ef8cb9afaec23268232ed08399c41e03a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Wed, 11 Feb 2026 15:12:48 +0000 Subject: [PATCH] apple.rs: return Err when channel is dropped --- Cargo.toml | 22 +++++++++++----------- src/apple.rs | 23 +++++++++++++++++------ src/linux.rs | 43 ++++++++++++++----------------------------- src/win.rs | 7 ++++--- 4 files changed, 46 insertions(+), 49 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7f7d41e..de14e52 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,26 +22,26 @@ ipnet = "2.3.1" log = "0.4.14" [target.'cfg(target_os = "linux")'.dependencies] -netlink-sys = { version = "0.8.0", default-features = false } -netlink-packet-core = { version = "0.7.0", default-features = false } -netlink-packet-route = { version = "0.17.0", default-features = false } -netlink-proto = { version = "0.11.0", default-features = false } -rtnetlink = { version = "0.13.0", default-features = false } +netlink-sys = { version = "0.8.8", default-features = false } +netlink-packet-core = { version = "0.8.1", default-features = false } +netlink-packet-route = { version = "0.28.0", default-features = false } +netlink-proto = { version = "0.12.0", default-features = false } +rtnetlink = { version = "0.20.0", default-features = false } [target.'cfg(any(target_os = "macos", target_os = "ios"))'.dependencies] -core-foundation = "0.9.2" -if-addrs = "0.10.0" -system-configuration = "0.6.0" +core-foundation = "0.9.4" +if-addrs = "0.15.0" +system-configuration = "0.7.0" tokio = { version = "1.21.2", features = ["rt"], optional = true } smol = { version = "2", optional = true } [target.'cfg(target_os = "windows")'.dependencies] -if-addrs = "0.10.0" -windows = { version = ">=0.51.0,<=0.53", features = ["Win32_NetworkManagement_IpHelper", "Win32_Foundation", "Win32_NetworkManagement_Ndis", "Win32_Networking_WinSock"] } +if-addrs = "0.15.0" +windows = { version = "0.62.0", features = ["Win32_NetworkManagement_IpHelper", "Win32_Foundation", "Win32_NetworkManagement_Ndis", "Win32_Networking_WinSock"] } [target.'cfg(not(any(target_os = "ios", target_os = "linux", target_os = "macos", target_os = "windows")))'.dependencies] async-io = "2.0.0" -if-addrs = "0.10.0" +if-addrs = "0.15.0" [dev-dependencies] env_logger = "0.10.0" diff --git a/src/apple.rs b/src/apple.rs index c3099a4..3ea46ea 100644 --- a/src/apple.rs +++ b/src/apple.rs @@ -7,7 +7,7 @@ use futures::channel::mpsc; use futures::stream::{FusedStream, Stream}; use if_addrs::IfAddr; use std::collections::VecDeque; -use std::io::Result; +use std::io::{Error, Result}; use std::pin::Pin; use std::task::{Context, Poll}; use system_configuration::dynamic_store::{ @@ -86,8 +86,12 @@ impl IfWatcher { if let Some(event) = self.queue.pop_front() { return Poll::Ready(Ok(event)); } - if Pin::new(&mut self.rx).poll_next(cx).is_pending() { - return Poll::Pending; + match Pin::new(&mut self.rx).poll_next(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => { + return Poll::Ready(Err(Error::other("Background task stopped"))) + } + Poll::Ready(_) => {} } if let Err(error) = self.resync() { return Poll::Ready(Err(error)); @@ -134,17 +138,24 @@ fn callback(_store: SCDynamicStore, _changed_keys: CFArray, info: &mut } fn background_task(tx: mpsc::Sender<()>) { - let store = SCDynamicStoreBuilder::new("global-network-watcher") + let Some(store) = SCDynamicStoreBuilder::new("global-network-watcher") .callback_context(SCDynamicStoreCallBackContext { callout: callback, info: tx, }) - .build(); + .build() + else { + log::error!("Failed to create SCDynamicStore"); + return; + }; store.set_notification_keys( &CFArray::::from_CFTypes(&[]), &CFArray::from_CFTypes(&[CFString::new("State:/Network/Interface/.*/IPv.")]), ); - let source = store.create_run_loop_source(); + let Some(source) = store.create_run_loop_source() else { + log::error!("Failed to create run loop source"); + return; + }; let run_loop = CFRunLoop::get_current(); run_loop.add_source(&source, unsafe { kCFRunLoopCommonModes }); CFRunLoop::run_current(); diff --git a/src/linux.rs b/src/linux.rs index 933165f..1338dcf 100644 --- a/src/linux.rs +++ b/src/linux.rs @@ -4,15 +4,14 @@ use futures::ready; use futures::stream::{FusedStream, Stream, TryStreamExt}; use futures::StreamExt; use netlink_packet_core::NetlinkPayload; -use netlink_packet_route::rtnl::address::nlas::Nla; -use netlink_packet_route::rtnl::{AddressMessage, RtnlMessage}; +use netlink_packet_route::address::{AddressAttribute, AddressMessage}; +use netlink_packet_route::RouteNetlinkMessage; use netlink_proto::Connection; use netlink_sys::{AsyncSocket, SocketAddr}; use rtnetlink::constants::{RTMGRP_IPV4_IFADDR, RTMGRP_IPV6_IFADDR}; use std::collections::VecDeque; use std::future::Future; -use std::io::{Error, ErrorKind, Result}; -use std::net::{Ipv4Addr, Ipv6Addr}; +use std::io::{ErrorKind, Result}; use std::pin::Pin; use std::task::{Context, Poll}; @@ -35,8 +34,8 @@ pub mod smol { } pub struct IfWatcher { - conn: Connection, - messages: Pin> + Send>>, + conn: Connection, + messages: Pin> + Send>>, addrs: FnvHashSet, queue: VecDeque, } @@ -63,8 +62,8 @@ where .address() .get() .execute() - .map_ok(RtnlMessage::NewAddress) - .map_err(|err| Error::new(ErrorKind::Other, err)); + .map_ok(RouteNetlinkMessage::NewAddress) + .map_err(std::io::Error::other); let msg_stream = messages.filter_map(|(msg, _)| async { match msg.payload { NetlinkPayload::Error(err) => Some(Err(err.to_io())), @@ -129,8 +128,8 @@ where } }; match message { - RtnlMessage::NewAddress(msg) => self.add_address(msg), - RtnlMessage::DelAddress(msg) => self.rem_address(msg), + RouteNetlinkMessage::NewAddress(msg) => self.add_address(msg), + RouteNetlinkMessage::DelAddress(msg) => self.rem_address(msg), _ => {} } } @@ -143,25 +142,11 @@ fn socket_err(error: &str) -> std::io::Error { fn iter_nets(msg: AddressMessage) -> impl Iterator { let prefix = msg.header.prefix_len; - let family = msg.header.family; - msg.nlas.into_iter().filter_map(move |nla| { - if let Nla::Address(octets) = nla { - match family { - 2 => { - let mut addr = [0; 4]; - addr.copy_from_slice(&octets); - Some(IpNet::V4( - Ipv4Net::new(Ipv4Addr::from(addr), prefix).unwrap(), - )) - } - 10 => { - let mut addr = [0; 16]; - addr.copy_from_slice(&octets); - Some(IpNet::V6( - Ipv6Net::new(Ipv6Addr::from(addr), prefix).unwrap(), - )) - } - _ => None, + msg.attributes.into_iter().filter_map(move |attr| { + if let AddressAttribute::Address(addr) = attr { + match addr { + std::net::IpAddr::V4(ipv4) => Some(IpNet::V4(Ipv4Net::new(ipv4, prefix).unwrap())), + std::net::IpAddr::V6(ipv6) => Some(IpNet::V6(Ipv6Net::new(ipv6, prefix).unwrap())), } } else { None diff --git a/src/win.rs b/src/win.rs index b0a2bce..857e585 100644 --- a/src/win.rs +++ b/src/win.rs @@ -10,7 +10,7 @@ use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; -use windows::Win32::Foundation::{BOOLEAN, HANDLE}; +use windows::Win32::Foundation::HANDLE; use windows::Win32::NetworkManagement::IpHelper::{ CancelMibChangeNotify2, NotifyIpInterfaceChange, MIB_IPINTERFACE_ROW, MIB_NOTIFICATION_TYPE, }; @@ -168,9 +168,10 @@ impl IpChangeNotification { AF_UNSPEC, Some(global_callback), Some(callback as _), - BOOLEAN(0), + false, &mut handle as _, ) + .ok() .map_err(|err| Error::new(ErrorKind::Other, err.to_string()))?; } Ok(Self { callback, handle }) @@ -180,7 +181,7 @@ impl IpChangeNotification { impl Drop for IpChangeNotification { fn drop(&mut self) { unsafe { - if let Err(err) = CancelMibChangeNotify2(self.handle) { + if let Err(err) = CancelMibChangeNotify2(self.handle).ok() { log::error!("error deregistering notification: {}", err); } drop(Box::from_raw(self.callback));