Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: use concrete errors #13

Merged
merged 7 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
452 changes: 286 additions & 166 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions netwatch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ rust-version = "1.81"
workspace = true

[dependencies]
anyhow = { version = "1" }
atomic-waker = "1.1.2"
bytes = "1.7"
futures-lite = "2.5"
Expand Down Expand Up @@ -58,11 +57,13 @@ rtnetlink = "=0.13.1" # pinned because of https://github.com/rust-netlink/rtnetl

[target.'cfg(target_os = "windows")'.dependencies]
wmi = "0.14"
windows = { version = "0.58", features = ["Win32_NetworkManagement_IpHelper", "Win32_Foundation", "Win32_NetworkManagement_Ndis", "Win32_Networking_WinSock"] }
windows = { version = "0.59", features = ["Win32_NetworkManagement_IpHelper", "Win32_Foundation", "Win32_NetworkManagement_Ndis", "Win32_Networking_WinSock"] }
windows-result = "0.3"
serde = { version = "1", features = ["derive"] }
derive_more = { version = "1.0.0", features = ["debug"] }

[dev-dependencies]
testresult = "0.4.1"
tokio = { version = "1", features = [
"io-util",
"sync",
Expand Down
46 changes: 31 additions & 15 deletions netwatch/src/interfaces/linux.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! Linux-specific network interfaces implementations.

use anyhow::{anyhow, Result};
#[cfg(not(target_os = "android"))]
use futures_util::TryStreamExt;
use tokio::{
Expand All @@ -10,6 +9,26 @@ use tokio::{

use super::DefaultRouteDetails;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("IO {0}")]
Io(#[from] std::io::Error),
#[cfg(not(target_os = "android"))]
#[error("no netlink response")]
NoResponse,
#[cfg(not(target_os = "android"))]
#[error("interface not found")]
InterfaceNotFound,
#[error("iface field is missing")]
MissingIfaceField,
#[error("destination field is missing")]
MissingDestinationField,
#[error("mask field is missing")]
MissingMaskField,
#[error("netlink")]
Netlink(#[from] rtnetlink::Error),
}

pub async fn default_route() -> Option<DefaultRouteDetails> {
let route = default_route_proc().await;
if let Ok(route) = route {
Expand All @@ -27,7 +46,7 @@ pub async fn default_route() -> Option<DefaultRouteDetails> {

const PROC_NET_ROUTE_PATH: &str = "/proc/net/route";

async fn default_route_proc() -> Result<Option<DefaultRouteDetails>> {
async fn default_route_proc() -> Result<Option<DefaultRouteDetails>, Error> {
const ZERO_ADDR: &str = "00000000";
let file = File::open(PROC_NET_ROUTE_PATH).await?;

Expand All @@ -50,9 +69,9 @@ async fn default_route_proc() -> Result<Option<DefaultRouteDetails>> {
continue;
}
let mut fields = line.split_ascii_whitespace();
let iface = fields.next().ok_or(anyhow!("iface field missing"))?;
let destination = fields.next().ok_or(anyhow!("destination field missing"))?;
let mask = fields.nth(5).ok_or(anyhow!("mask field missing"))?;
let iface = fields.next().ok_or(Error::MissingIfaceField)?;
let destination = fields.next().ok_or(Error::MissingDestinationField)?;
let mask = fields.nth(5).ok_or(Error::MissingMaskField)?;
// if iface.starts_with("tailscale") || iface.starts_with("wg") {
// continue;
// }
Expand All @@ -70,15 +89,15 @@ async fn default_route_proc() -> Result<Option<DefaultRouteDetails>> {
/// We use this on Android where /proc/net/route can be missing entries or have locked-down
/// permissions. See also comments in <https://github.com/tailscale/tailscale/pull/666>.
#[cfg(target_os = "android")]
pub async fn default_route_android_ip_route() -> Result<Option<DefaultRouteDetails>> {
pub async fn default_route_android_ip_route() -> Result<Option<DefaultRouteDetails>, Error> {
use tokio::process::Command;

let output = Command::new("/system/bin/ip")
.args(["route", "show", "table", "0"])
.kill_on_drop(true)
.output()
.await?;
let stdout = std::str::from_utf8(&output.stdout)?;
let stdout = std::string::String::from_utf8_lossy(&output.stdout);
let details = parse_android_ip_route(&stdout).map(|iface| DefaultRouteDetails {
interface_name: iface.to_string(),
});
Expand All @@ -104,7 +123,7 @@ fn parse_android_ip_route(stdout: &str) -> Option<&str> {
}

#[cfg(not(target_os = "android"))]
async fn default_route_netlink() -> Result<Option<DefaultRouteDetails>> {
async fn default_route_netlink() -> Result<Option<DefaultRouteDetails>, Error> {
use tracing::{info_span, Instrument};

let (connection, handle, _receiver) = rtnetlink::new_connection()?;
Expand All @@ -127,7 +146,7 @@ async fn default_route_netlink() -> Result<Option<DefaultRouteDetails>> {
async fn default_route_netlink_family(
handle: &rtnetlink::Handle,
family: rtnetlink::IpVersion,
) -> Result<Option<(String, u32)>> {
) -> Result<Option<(String, u32)>, Error> {
use netlink_packet_route::route::RouteAttribute;

let mut routes = handle.route().get(family).execute();
Expand Down Expand Up @@ -165,21 +184,18 @@ async fn default_route_netlink_family(
}

#[cfg(not(target_os = "android"))]
async fn iface_by_index(handle: &rtnetlink::Handle, index: u32) -> Result<String> {
async fn iface_by_index(handle: &rtnetlink::Handle, index: u32) -> Result<String, Error> {
use netlink_packet_route::link::LinkAttribute;

let mut links = handle.link().get().match_index(index).execute();
let msg = links
.try_next()
.await?
.ok_or_else(|| anyhow!("No netlink response"))?;
let msg = links.try_next().await?.ok_or(Error::NoResponse)?;

for nla in msg.attributes {
if let LinkAttribute::IfName(name) = nla {
return Ok(name);
}
}
Err(anyhow!("Interface name not found"))
Err(Error::InterfaceNotFound)
}

#[cfg(test)]
Expand Down
14 changes: 12 additions & 2 deletions netwatch/src/interfaces/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,17 @@ struct Win32_IP4RouteTable {
Name: String,
}

fn get_default_route() -> anyhow::Result<DefaultRouteDetails> {
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("IO {0}")]
Io(#[from] std::io::Error),
#[error("not route found")]
NoRoute,
#[error("WMI {0}")]
Wmi(#[from] wmi::WMIError),
}

fn get_default_route() -> Result<DefaultRouteDetails, Error> {
let com_con = COMLibrary::new()?;
let wmi_con = WMIConnection::new(com_con)?;

Expand All @@ -22,7 +32,7 @@ fn get_default_route() -> anyhow::Result<DefaultRouteDetails> {
.filtered_query(&query)?
.drain(..)
.next()
.ok_or_else(|| anyhow::anyhow!("no route found"))?;
.ok_or(Error::NoRoute)?;

Ok(DefaultRouteDetails {
interface_name: route.Name,
Expand Down
29 changes: 24 additions & 5 deletions netwatch/src/netmon.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! Monitoring of networking interfaces and route changes.

use anyhow::Result;
use futures_lite::future::Boxed as BoxFuture;
use tokio::sync::{mpsc, oneshot};
use tokio_util::task::AbortOnDropHandle;
Expand Down Expand Up @@ -32,9 +31,29 @@ pub struct Monitor {
actor_tx: mpsc::Sender<ActorMessage>,
}

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("channel closed")]
ChannelClosed,
#[error("actor {0}")]
Actor(#[from] actor::Error),
}

impl<T> From<mpsc::error::SendError<T>> for Error {
fn from(_value: mpsc::error::SendError<T>) -> Self {
Self::ChannelClosed
}
}

impl From<oneshot::error::RecvError> for Error {
fn from(_value: oneshot::error::RecvError) -> Self {
Self::ChannelClosed
}
}

impl Monitor {
/// Create a new monitor.
pub async fn new() -> Result<Self> {
pub async fn new() -> Result<Self, Error> {
let actor = Actor::new().await?;
let actor_tx = actor.subscribe();

Expand All @@ -49,7 +68,7 @@ impl Monitor {
}

/// Subscribe to network changes.
pub async fn subscribe<F>(&self, callback: F) -> Result<CallbackToken>
pub async fn subscribe<F>(&self, callback: F) -> Result<CallbackToken, Error>
where
F: Fn(bool) -> BoxFuture<()> + 'static + Sync + Send,
{
Expand All @@ -62,7 +81,7 @@ impl Monitor {
}

/// Unsubscribe a callback from network changes, using the provided token.
pub async fn unsubscribe(&self, token: CallbackToken) -> Result<()> {
pub async fn unsubscribe(&self, token: CallbackToken) -> Result<(), Error> {
let (s, r) = oneshot::channel();
self.actor_tx
.send(ActorMessage::Unsubscribe(token, s))
Expand All @@ -72,7 +91,7 @@ impl Monitor {
}

/// Potential change detected outside
pub async fn network_change(&self) -> Result<()> {
pub async fn network_change(&self) -> Result<(), Error> {
self.actor_tx.send(ActorMessage::NetworkChange).await?;
Ok(())
}
Expand Down
16 changes: 6 additions & 10 deletions netwatch/src/netmon/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ use std::{
time::{Duration, Instant},
};

use anyhow::Result;
use futures_lite::future::Boxed as BoxFuture;
pub(super) use os::Error;
use os::{is_interesting_interface, RouteMonitor};
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, trace, warn};
use tracing::{debug, trace};

#[cfg(target_os = "android")]
use super::android as os;
Expand Down Expand Up @@ -77,7 +77,7 @@ pub(super) enum ActorMessage {
}

impl Actor {
pub(super) async fn new() -> Result<Self> {
pub(super) async fn new() -> Result<Self, os::Error> {
let interface_state = State::new().await;
let wall_time = Instant::now();

Expand Down Expand Up @@ -114,9 +114,7 @@ impl Actor {

_ = debounce_interval.tick() => {
if let Some(time_jumped) = last_event.take() {
if let Err(err) = self.handle_potential_change(time_jumped).await {
warn!("failed to handle network changes: {:?}", err);
};
self.handle_potential_change(time_jumped).await;
}
}
_ = wall_time_interval.tick() => {
Expand Down Expand Up @@ -172,7 +170,7 @@ impl Actor {
token
}

async fn handle_potential_change(&mut self, time_jumped: bool) -> Result<()> {
async fn handle_potential_change(&mut self, time_jumped: bool) {
trace!("potential change");

let new_state = State::new().await;
Expand All @@ -181,7 +179,7 @@ impl Actor {
// No major changes, continue on
if !time_jumped && old_state == &new_state {
debug!("no changes detected");
return Ok(());
return;
}

let is_major = is_major_change(old_state, &new_state) || time_jumped;
Expand All @@ -197,8 +195,6 @@ impl Actor {
cb(is_major).await;
});
}

Ok(())
}

/// Reports whether wall time jumped more than 150%
Expand Down
7 changes: 5 additions & 2 deletions netwatch/src/netmon/android.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use anyhow::Result;
use tokio::sync::mpsc;

use super::actor::NetworkMessage;

#[derive(Debug, thiserror::Error)]
#[error("error")]
pub struct Error;

#[derive(Debug)]
pub(super) struct RouteMonitor {
_sender: mpsc::Sender<NetworkMessage>,
}

impl RouteMonitor {
pub(super) fn new(_sender: mpsc::Sender<NetworkMessage>) -> Result<Self> {
pub(super) fn new(_sender: mpsc::Sender<NetworkMessage>) -> Result<Self, Error> {
// Very sad monitor. Android doesn't allow us to do this

Ok(RouteMonitor { _sender })
Expand Down
11 changes: 8 additions & 3 deletions netwatch/src/netmon/bsd.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use anyhow::Result;
#[cfg(any(target_os = "macos", target_os = "ios"))]
use libc::{RTAX_DST, RTAX_IFP};
use tokio::{io::AsyncReadExt, sync::mpsc};
Expand All @@ -15,7 +14,13 @@ pub(super) struct RouteMonitor {
_handle: AbortOnDropHandle<()>,
}

fn create_socket() -> Result<tokio::net::UnixStream> {
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("IO {0}")]
Io(#[from] std::io::Error),
}

fn create_socket() -> std::io::Result<tokio::net::UnixStream> {
let socket = socket2::Socket::new(libc::AF_ROUTE.into(), socket2::Type::RAW, None)?;
socket.set_nonblocking(true)?;
let socket_std: std::os::unix::net::UnixStream = socket.into();
Expand All @@ -27,7 +32,7 @@ fn create_socket() -> Result<tokio::net::UnixStream> {
}

impl RouteMonitor {
pub(super) fn new(sender: mpsc::Sender<NetworkMessage>) -> Result<Self> {
pub(super) fn new(sender: mpsc::Sender<NetworkMessage>) -> Result<Self, Error> {
let mut socket = create_socket()?;
let handle = tokio::task::spawn(async move {
trace!("AF_ROUTE monitor started");
Expand Down
9 changes: 7 additions & 2 deletions netwatch/src/netmon/linux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::{
net::IpAddr,
};

use anyhow::Result;
use futures_lite::StreamExt;
use libc::{
RTNLGRP_IPV4_IFADDR, RTNLGRP_IPV4_ROUTE, RTNLGRP_IPV4_RULE, RTNLGRP_IPV6_IFADDR,
Expand Down Expand Up @@ -32,6 +31,12 @@ impl Drop for RouteMonitor {
}
}

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("IO {0}")]
Io(#[from] std::io::Error),
}

const fn nl_mgrp(group: u32) -> u32 {
if group > 31 {
panic!("use netlink_sys::Socket::add_membership() for this group");
Expand All @@ -52,7 +57,7 @@ macro_rules! get_nla {
}

impl RouteMonitor {
pub(super) fn new(sender: mpsc::Sender<NetworkMessage>) -> Result<Self> {
pub(super) fn new(sender: mpsc::Sender<NetworkMessage>) -> Result<Self, Error> {
let (mut conn, mut _handle, mut messages) = new_connection()?;

// Specify flags to listen on.
Expand Down
Loading
Loading