Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,26 @@ ipnet = "2.3.1"
log = "0.4.14"

[target.'cfg(target_os = "linux")'.dependencies]
netlink-sys = { version = "0.8.0", default-features = false }
netlink-packet-core = { version = "0.7.0", default-features = false }
netlink-packet-route = { version = "0.17.0", default-features = false }
netlink-proto = { version = "0.11.0", default-features = false }
rtnetlink = { version = "0.13.0", default-features = false }
netlink-sys = { version = "0.8.8", default-features = false }
netlink-packet-core = { version = "0.8.1", default-features = false }
netlink-packet-route = { version = "0.28.0", default-features = false }
netlink-proto = { version = "0.12.0", default-features = false }
rtnetlink = { version = "0.20.0", default-features = false }

[target.'cfg(any(target_os = "macos", target_os = "ios"))'.dependencies]
core-foundation = "0.9.2"
if-addrs = "0.10.0"
system-configuration = "0.6.0"
core-foundation = "0.9.4"
if-addrs = "0.15.0"
system-configuration = "0.7.0"
tokio = { version = "1.21.2", features = ["rt"], optional = true }
smol = { version = "2", optional = true }

[target.'cfg(target_os = "windows")'.dependencies]
if-addrs = "0.10.0"
windows = { version = ">=0.51.0,<=0.53", features = ["Win32_NetworkManagement_IpHelper", "Win32_Foundation", "Win32_NetworkManagement_Ndis", "Win32_Networking_WinSock"] }
if-addrs = "0.15.0"
windows = { version = "0.62.0", features = ["Win32_NetworkManagement_IpHelper", "Win32_Foundation", "Win32_NetworkManagement_Ndis", "Win32_Networking_WinSock"] }

[target.'cfg(not(any(target_os = "ios", target_os = "linux", target_os = "macos", target_os = "windows")))'.dependencies]
async-io = "2.0.0"
if-addrs = "0.10.0"
if-addrs = "0.15.0"

[dev-dependencies]
env_logger = "0.10.0"
Expand Down
23 changes: 17 additions & 6 deletions src/apple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use futures::channel::mpsc;
use futures::stream::{FusedStream, Stream};
use if_addrs::IfAddr;
use std::collections::VecDeque;
use std::io::Result;
use std::io::{Error, Result};
use std::pin::Pin;
use std::task::{Context, Poll};
use system_configuration::dynamic_store::{
Expand Down Expand Up @@ -86,8 +86,12 @@ impl IfWatcher {
if let Some(event) = self.queue.pop_front() {
return Poll::Ready(Ok(event));
}
if Pin::new(&mut self.rx).poll_next(cx).is_pending() {
return Poll::Pending;
match Pin::new(&mut self.rx).poll_next(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => {
return Poll::Ready(Err(Error::other("Background task stopped")))
}
Poll::Ready(_) => {}
}
if let Err(error) = self.resync() {
return Poll::Ready(Err(error));
Expand Down Expand Up @@ -134,17 +138,24 @@ fn callback(_store: SCDynamicStore, _changed_keys: CFArray<CFString>, info: &mut
}

fn background_task(tx: mpsc::Sender<()>) {
let store = SCDynamicStoreBuilder::new("global-network-watcher")
let Some(store) = SCDynamicStoreBuilder::new("global-network-watcher")
.callback_context(SCDynamicStoreCallBackContext {
callout: callback,
info: tx,
})
.build();
.build()
else {
log::error!("Failed to create SCDynamicStore");
return;
Comment on lines +148 to +149
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we return the error here? IfWatcher::new (which calls background_task) already returns a Result, so we could propagate it to the user.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah but it calls background_task inside a spawned thread not in the context of the new function itself

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. But we could split it into, e.g., create_task and run_task or sth like that. I would prefer if we don't just silently fail (apart from the log msg).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah right, yeah that would make sense, I tried it but CFRunLoopSource is !Send

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed out of band. New solution of handling the a failing background_task by polling rx in poll_if_event lgtm!

};
store.set_notification_keys(
&CFArray::<CFString>::from_CFTypes(&[]),
&CFArray::from_CFTypes(&[CFString::new("State:/Network/Interface/.*/IPv.")]),
);
let source = store.create_run_loop_source();
let Some(source) = store.create_run_loop_source() else {
log::error!("Failed to create run loop source");
return;
Comment thread
elenaf9 marked this conversation as resolved.
};
let run_loop = CFRunLoop::get_current();
run_loop.add_source(&source, unsafe { kCFRunLoopCommonModes });
CFRunLoop::run_current();
Expand Down
43 changes: 14 additions & 29 deletions src/linux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ use futures::ready;
use futures::stream::{FusedStream, Stream, TryStreamExt};
use futures::StreamExt;
use netlink_packet_core::NetlinkPayload;
use netlink_packet_route::rtnl::address::nlas::Nla;
use netlink_packet_route::rtnl::{AddressMessage, RtnlMessage};
use netlink_packet_route::address::{AddressAttribute, AddressMessage};
use netlink_packet_route::RouteNetlinkMessage;
use netlink_proto::Connection;
use netlink_sys::{AsyncSocket, SocketAddr};
use rtnetlink::constants::{RTMGRP_IPV4_IFADDR, RTMGRP_IPV6_IFADDR};
use std::collections::VecDeque;
use std::future::Future;
use std::io::{Error, ErrorKind, Result};
use std::net::{Ipv4Addr, Ipv6Addr};
use std::io::{ErrorKind, Result};
use std::pin::Pin;
use std::task::{Context, Poll};

Expand All @@ -35,8 +34,8 @@ pub mod smol {
}

pub struct IfWatcher<T> {
conn: Connection<RtnlMessage, T>,
messages: Pin<Box<dyn Stream<Item = Result<RtnlMessage>> + Send>>,
conn: Connection<RouteNetlinkMessage, T>,
messages: Pin<Box<dyn Stream<Item = Result<RouteNetlinkMessage>> + Send>>,
addrs: FnvHashSet<IpNet>,
queue: VecDeque<IfEvent>,
}
Expand All @@ -63,8 +62,8 @@ where
.address()
.get()
.execute()
.map_ok(RtnlMessage::NewAddress)
.map_err(|err| Error::new(ErrorKind::Other, err));
.map_ok(RouteNetlinkMessage::NewAddress)
.map_err(std::io::Error::other);
let msg_stream = messages.filter_map(|(msg, _)| async {
match msg.payload {
NetlinkPayload::Error(err) => Some(Err(err.to_io())),
Expand Down Expand Up @@ -129,8 +128,8 @@ where
}
};
match message {
RtnlMessage::NewAddress(msg) => self.add_address(msg),
RtnlMessage::DelAddress(msg) => self.rem_address(msg),
RouteNetlinkMessage::NewAddress(msg) => self.add_address(msg),
RouteNetlinkMessage::DelAddress(msg) => self.rem_address(msg),
_ => {}
}
}
Expand All @@ -143,25 +142,11 @@ fn socket_err(error: &str) -> std::io::Error {

fn iter_nets(msg: AddressMessage) -> impl Iterator<Item = IpNet> {
let prefix = msg.header.prefix_len;
let family = msg.header.family;
msg.nlas.into_iter().filter_map(move |nla| {
if let Nla::Address(octets) = nla {
match family {
2 => {
let mut addr = [0; 4];
addr.copy_from_slice(&octets);
Some(IpNet::V4(
Ipv4Net::new(Ipv4Addr::from(addr), prefix).unwrap(),
))
}
10 => {
let mut addr = [0; 16];
addr.copy_from_slice(&octets);
Some(IpNet::V6(
Ipv6Net::new(Ipv6Addr::from(addr), prefix).unwrap(),
))
}
_ => None,
msg.attributes.into_iter().filter_map(move |attr| {
if let AddressAttribute::Address(addr) = attr {
match addr {
std::net::IpAddr::V4(ipv4) => Some(IpNet::V4(Ipv4Net::new(ipv4, prefix).unwrap())),
std::net::IpAddr::V6(ipv6) => Some(IpNet::V6(Ipv6Net::new(ipv6, prefix).unwrap())),
}
} else {
None
Expand Down
7 changes: 4 additions & 3 deletions src/win.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use windows::Win32::Foundation::{BOOLEAN, HANDLE};
use windows::Win32::Foundation::HANDLE;
use windows::Win32::NetworkManagement::IpHelper::{
CancelMibChangeNotify2, NotifyIpInterfaceChange, MIB_IPINTERFACE_ROW, MIB_NOTIFICATION_TYPE,
};
Expand Down Expand Up @@ -168,9 +168,10 @@ impl IpChangeNotification {
AF_UNSPEC,
Some(global_callback),
Some(callback as _),
BOOLEAN(0),
false,
&mut handle as _,
)
.ok()
.map_err(|err| Error::new(ErrorKind::Other, err.to_string()))?;
}
Ok(Self { callback, handle })
Expand All @@ -180,7 +181,7 @@ impl IpChangeNotification {
impl Drop for IpChangeNotification {
fn drop(&mut self) {
unsafe {
if let Err(err) = CancelMibChangeNotify2(self.handle) {
if let Err(err) = CancelMibChangeNotify2(self.handle).ok() {
log::error!("error deregistering notification: {}", err);
}
drop(Box::from_raw(self.callback));
Expand Down