Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "systemd-udp-proxy"
version = "0.1.3"
version = "0.1.4"
edition = "2024"

[dependencies]
Expand Down
5 changes: 3 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ struct ProxyConfig {
/// How many seconds sessions should be cached before expiring
#[arg(short = 't', long, default_value_t = 60)]
session_timeout: u64,
/// Maximum UDP packet size to receive in bytes (packets larger will be truncated)
#[arg(short = 'm', long, default_value_t = 1500)]
max_packet_size: usize,
}

const MAX_UDP_PACKET_SIZE: u16 = u16::MAX;

#[tokio::main]
async fn main() -> io::Result<()> {
log_config::init_systemd();
Expand Down
14 changes: 11 additions & 3 deletions src/primary_tasks/rx_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::{
};

use crate::{
MAX_UDP_PACKET_SIZE, ProxyConfig,
ProxyConfig,
error_util::{ErrorAction, handle_io_error},
session::{Session, SessionReply, SessionSource},
};
Expand All @@ -28,6 +28,10 @@ type SessionCache = HashMap<SessionSource, (SessionChannel, Arc<Session>)>;
/// tx/rx loop tasks are spawned to proxy traffic for that session to and from the destination. If a [`Session`]
/// does not recieve traffic for [`ProxyConfig::session_timeout`] seconds, it will close its tasks and a new one will
/// be created if any traffic resumes from it.
///
/// If a packet arrives after a Session's channel has closed but before the session is removed
/// from the cache, that packet will be dropped and the session will be cleaned up. Subsequent
/// packets from the same source will trigger creation of a new session.
pub async fn rx_task(
config: ProxyConfig,
reply_channel_tx: UnboundedSender<SessionReply>,
Expand All @@ -37,7 +41,7 @@ pub async fn rx_task(
let sessions = Arc::new(RwLock::new(SessionCache::new()));

loop {
let mut buf = Vec::with_capacity(MAX_UDP_PACKET_SIZE.into());
let mut buf = Vec::with_capacity(config.max_packet_size);
match rx_socket.recv_buf_from(&mut buf).await {
Err(err) => match handle_io_error(err) {
ErrorAction::Terminate(err) => return Err(err),
Expand Down Expand Up @@ -72,7 +76,11 @@ pub async fn rx_task(
let rx_reply_channel = shared_reply_channel.clone();
tokio::spawn(async move {
if let Err(err) = rx_session
.rx_loop(rx_reply_channel, config.session_timeout)
.rx_loop(
rx_reply_channel,
config.session_timeout,
config.max_packet_size,
)
.await
{
error!("RX error for {}: {:?}", source, err);
Expand Down
11 changes: 7 additions & 4 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ use std::{
time::Duration,
};

use log::info;
use log::{info, warn};
use tokio::{
net::UdpSocket,
sync::mpsc::{UnboundedReceiver, UnboundedSender},
time::timeout,
};

use crate::{
MAX_UDP_PACKET_SIZE, ProxyConfig,
ProxyConfig,
error_util::{ErrorAction, handle_io_error},
};

Expand Down Expand Up @@ -79,7 +79,9 @@ impl Session {
Ok(_) => {}
Err(err) => match err.kind() {
// Destination service hasn't started yet
ErrorKind::ConnectionRefused => {}
ErrorKind::ConnectionRefused => {
warn!("Destination service refused connection");
}
_ => match handle_io_error(err) {
ErrorAction::Terminate(cause) => return Err(cause),
ErrorAction::Continue => {}
Expand All @@ -97,10 +99,11 @@ impl Session {
&self,
reply_channel: Arc<UnboundedSender<SessionReply>>,
session_timeout: u64,
max_packet_size: usize,
) -> io::Result<()> {
let duration = Duration::from_secs(session_timeout);
loop {
let mut buf = Vec::with_capacity(MAX_UDP_PACKET_SIZE.into());
let mut buf = Vec::with_capacity(max_packet_size);
match timeout(duration, self.destination_socket.recv_buf(&mut buf)).await {
Ok(result) => {
if let Err(err) = result {
Expand Down