diff --git a/Cargo.lock b/Cargo.lock index c6cbb40..77fb34c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -275,7 +275,7 @@ dependencies = [ [[package]] name = "systemd-udp-proxy" -version = "0.1.3" +version = "0.1.4" dependencies = [ "clap", "env_logger", diff --git a/Cargo.toml b/Cargo.toml index c3d9fbe..05cd6b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "systemd-udp-proxy" -version = "0.1.3" +version = "0.1.4" edition = "2024" [dependencies] diff --git a/src/main.rs b/src/main.rs index 6f325df..2846c17 100644 --- a/src/main.rs +++ b/src/main.rs @@ -34,10 +34,11 @@ struct ProxyConfig { /// How many seconds sessions should be cached before expiring #[arg(short = 't', long, default_value_t = 60)] session_timeout: u64, + /// Maximum UDP packet size to receive in bytes (packets larger will be truncated) + #[arg(short = 'm', long, default_value_t = 1500)] + max_packet_size: usize, } -const MAX_UDP_PACKET_SIZE: u16 = u16::MAX; - #[tokio::main] async fn main() -> io::Result<()> { log_config::init_systemd(); diff --git a/src/primary_tasks/rx_task.rs b/src/primary_tasks/rx_task.rs index 2371485..a7712cc 100644 --- a/src/primary_tasks/rx_task.rs +++ b/src/primary_tasks/rx_task.rs @@ -14,7 +14,7 @@ use tokio::{ }; use crate::{ - MAX_UDP_PACKET_SIZE, ProxyConfig, + ProxyConfig, error_util::{ErrorAction, handle_io_error}, session::{Session, SessionReply, SessionSource}, }; @@ -28,6 +28,10 @@ type SessionCache = HashMap)>; /// tx/rx loop tasks are spawned to proxy traffic for that session to and from the destination. If a [`Session`] /// does not recieve traffic for [`ProxyConfig::session_timeout`] seconds, it will close its tasks and a new one will /// be created if any traffic resumes from it. +/// +/// If a packet arrives after a Session's channel has closed but before the session is removed +/// from the cache, that packet will be dropped and the session will be cleaned up. Subsequent +/// packets from the same source will trigger creation of a new session. pub async fn rx_task( config: ProxyConfig, reply_channel_tx: UnboundedSender, @@ -37,7 +41,7 @@ pub async fn rx_task( let sessions = Arc::new(RwLock::new(SessionCache::new())); loop { - let mut buf = Vec::with_capacity(MAX_UDP_PACKET_SIZE.into()); + let mut buf = Vec::with_capacity(config.max_packet_size); match rx_socket.recv_buf_from(&mut buf).await { Err(err) => match handle_io_error(err) { ErrorAction::Terminate(err) => return Err(err), @@ -72,7 +76,11 @@ pub async fn rx_task( let rx_reply_channel = shared_reply_channel.clone(); tokio::spawn(async move { if let Err(err) = rx_session - .rx_loop(rx_reply_channel, config.session_timeout) + .rx_loop( + rx_reply_channel, + config.session_timeout, + config.max_packet_size, + ) .await { error!("RX error for {}: {:?}", source, err); diff --git a/src/session.rs b/src/session.rs index 0e4105d..3606616 100644 --- a/src/session.rs +++ b/src/session.rs @@ -6,7 +6,7 @@ use std::{ time::Duration, }; -use log::info; +use log::{info, warn}; use tokio::{ net::UdpSocket, sync::mpsc::{UnboundedReceiver, UnboundedSender}, @@ -14,7 +14,7 @@ use tokio::{ }; use crate::{ - MAX_UDP_PACKET_SIZE, ProxyConfig, + ProxyConfig, error_util::{ErrorAction, handle_io_error}, }; @@ -79,7 +79,9 @@ impl Session { Ok(_) => {} Err(err) => match err.kind() { // Destination service hasn't started yet - ErrorKind::ConnectionRefused => {} + ErrorKind::ConnectionRefused => { + warn!("Destination service refused connection"); + } _ => match handle_io_error(err) { ErrorAction::Terminate(cause) => return Err(cause), ErrorAction::Continue => {} @@ -97,10 +99,11 @@ impl Session { &self, reply_channel: Arc>, session_timeout: u64, + max_packet_size: usize, ) -> io::Result<()> { let duration = Duration::from_secs(session_timeout); loop { - let mut buf = Vec::with_capacity(MAX_UDP_PACKET_SIZE.into()); + let mut buf = Vec::with_capacity(max_packet_size); match timeout(duration, self.destination_socket.recv_buf(&mut buf)).await { Ok(result) => { if let Err(err) = result {