Skip to content

Commit 0e25899

Browse files
committed
udpconn file renamed to udp. UDP proxy related codes removed from main.rs to udp.rs
1 parent f8aafec commit 0e25899

File tree

3 files changed

+382
-375
lines changed

3 files changed

+382
-375
lines changed

src/main.rs

+5-202
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,19 @@
11
//! Command line arguments.
22
use anyhow::Context;
3-
use bytes::Bytes;
43
use clap::{Parser, Subcommand};
54
use dumbpipe::NodeTicket;
65
use iroh::{
76
endpoint::{get_remote_node_id, Connecting},
87
Endpoint, NodeAddr, SecretKey,
98
};
10-
use quinn::Connection;
119
use std::{
12-
collections::HashMap, io, net::{SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs}, str::FromStr, sync::Arc
10+
io, net::{SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs}, str::FromStr
1311
};
1412
use tokio::{
15-
io::{AsyncRead, AsyncWrite, AsyncWriteExt}, net::UdpSocket, select, signal
13+
io::{AsyncRead, AsyncWrite, AsyncWriteExt}, select
1614
};
1715
use tokio_util::sync::CancellationToken;
18-
mod udpconn;
16+
mod udp;
1917

2018
/// Create a dumb pipe between two machines, using an iroh magicsocket.
2119
///
@@ -482,113 +480,6 @@ async fn connect_tcp(args: ConnectTcpArgs) -> anyhow::Result<()> {
482480
Ok(())
483481
}
484482

485-
// 1- Receives request message from socket
486-
// 2- Forwards it to the connection datagram
487-
// 3- Receives response message back from connection datagram
488-
// 4- Forwards it back to the socket
489-
async fn connect_udp(args: ConnectUdpArgs) -> anyhow::Result<()> {
490-
let addrs = args
491-
.addr
492-
.to_socket_addrs()
493-
.context(format!("invalid host string {}", args.addr))?;
494-
let secret_key = get_or_create_secret()?;
495-
let mut builder = Endpoint::builder().secret_key(secret_key).alpns(vec![]);
496-
if let Some(addr) = args.common.magic_ipv4_addr {
497-
builder = builder.bind_addr_v4(addr);
498-
}
499-
if let Some(addr) = args.common.magic_ipv6_addr {
500-
builder = builder.bind_addr_v6(addr);
501-
}
502-
let endpoint = builder.bind().await.context("unable to bind magicsock")?;
503-
tracing::info!("udp listening on {:?}", addrs);
504-
let socket = Arc::new(UdpSocket::bind(addrs.as_slice()).await?);
505-
506-
let node_addr = args.ticket.node_addr();
507-
let mut buf: Vec<u8> = vec![0u8; 65535];
508-
let conns = Arc::new(tokio::sync::Mutex::new(
509-
HashMap::<SocketAddr, Connection>::new(),
510-
));
511-
loop {
512-
tokio::select! {
513-
_ = signal::ctrl_c() => {
514-
tracing::info!("Received CTRL-C, shutting down...");
515-
break;
516-
}
517-
result = socket.recv_from(&mut buf) => {
518-
match result {
519-
Ok((size, sock_addr)) => {
520-
// Check if we already have a connection for this socket address
521-
let mut cnns = conns.lock().await;
522-
let connection = match cnns.get_mut(&sock_addr) {
523-
Some(conn) => conn,
524-
None => {
525-
// If we don't have a connection, drop the previous lock to create a new one later on
526-
drop(cnns);
527-
528-
// Create a new connection since this address is not in the hashmap
529-
let endpoint = endpoint.clone();
530-
let addr = node_addr.clone();
531-
let handshake = !args.common.is_custom_alpn();
532-
let alpn = args.common.alpn()?;
533-
534-
let remote_node_id = addr.node_id;
535-
tracing::info!("creating a connection to be forwarding UDP to {}", remote_node_id);
536-
537-
// connect to the node, try only once
538-
let connection = endpoint
539-
.connect(addr.clone(), &alpn)
540-
.await
541-
.context(format!("error connecting to {}", remote_node_id))?;
542-
tracing::info!("connected to {}", remote_node_id);
543-
544-
// send the handshake unless we are using a custom alpn
545-
if handshake {
546-
connection.send_datagram(Bytes::from_static(&dumbpipe::HANDSHAKE))?;
547-
}
548-
549-
let sock_send = socket.clone();
550-
let conn_clone = connection.clone();
551-
let conns_clone = conns.clone();
552-
// Spawn a task for listening the connection datagram, and forward the data to the UDP socket
553-
tokio::spawn(async move {
554-
// 3- Receives response message back from connection datagram
555-
// 4- Forwards it back to the socket
556-
if let Err(cause) = udpconn::handle_udp_accept(sock_addr, sock_send, conn_clone).await {
557-
// log error at warn level
558-
//
559-
// we should know about it, but it's not fatal
560-
tracing::warn!("error handling connection: {}", cause);
561-
}
562-
// Cleanup resources for this connection since it's `Connection` is closed or errored out
563-
let mut cn = conns_clone.lock().await;
564-
cn.remove(&sock_addr);
565-
});
566-
567-
// Store the connection and return
568-
let mut cn = conns.lock().await;
569-
cn.insert(sock_addr, connection.clone());
570-
&mut connection.clone()
571-
}
572-
};
573-
574-
// 1- Receives request message from socket
575-
// 2- Forwards it to the connection datagram
576-
if let Err(e) = connection.send_datagram(Bytes::copy_from_slice(&buf[..size])) { // Is Bytes::copy_from_slice most efficient way to do this?. Investigate.
577-
tracing::error!("Error writing to connection datagram: {}", e);
578-
return Err(e.into());
579-
}
580-
}
581-
Err(e) => {
582-
tracing::warn!("error receiving from UDP socket: {}", e);
583-
break;
584-
}
585-
}
586-
}
587-
}
588-
}
589-
Ok(())
590-
}
591-
592483
/// Listen on a magicsocket and forward incoming connections to a tcp socket.
593484
async fn listen_tcp(args: ListenTcpArgs) -> anyhow::Result<()> {
594485
let addrs = match args.host.to_socket_addrs() {
@@ -682,105 +573,17 @@ async fn listen_tcp(args: ListenTcpArgs) -> anyhow::Result<()> {
682573
Ok(())
683574
}
684575

685-
/// Listen on a magicsocket and forward incoming connections to a udp socket.
686-
async fn listen_udp(args: ListenUdpArgs) -> anyhow::Result<()> {
687-
let addrs = match args.host.to_socket_addrs() {
688-
Ok(addrs) => addrs.collect::<Vec<_>>(),
689-
Err(e) => anyhow::bail!("invalid host string {}: {}", args.host, e),
690-
};
691-
let secret_key = get_or_create_secret()?;
692-
let mut builder = Endpoint::builder()
693-
.alpns(vec![args.common.alpn()?])
694-
.secret_key(secret_key);
695-
if let Some(addr) = args.common.magic_ipv4_addr {
696-
builder = builder.bind_addr_v4(addr);
697-
}
698-
if let Some(addr) = args.common.magic_ipv6_addr {
699-
builder = builder.bind_addr_v6(addr);
700-
}
701-
let endpoint = builder.bind().await?;
702-
// wait for the endpoint to figure out its address before making a ticket
703-
endpoint.home_relay().initialized().await?;
704-
let node_addr = endpoint.node_addr().await?;
705-
let mut short = node_addr.clone();
706-
let ticket = NodeTicket::new(node_addr);
707-
short.direct_addresses.clear();
708-
let short = NodeTicket::new(short);
709-
710-
// print the ticket on stderr so it doesn't interfere with the data itself
711-
//
712-
// note that the tests rely on the ticket being the last thing printed
713-
eprintln!("Forwarding incoming requests to '{}'.", args.host);
714-
eprintln!("To connect, use e.g.:");
715-
eprintln!("dumbpipe connect-udp {ticket}");
716-
if args.common.verbose > 0 {
717-
eprintln!("or:\ndumbpipe connect-udp {}", short);
718-
}
719-
tracing::info!("node id is {}", ticket.node_addr().node_id);
720-
tracing::info!("derp url is {:?}", ticket.node_addr().relay_url);
721-
722-
// handle a new incoming connection on the magic endpoint
723-
async fn handle_magic_accept(
724-
connecting: Connecting,
725-
addrs: Vec<std::net::SocketAddr>,
726-
handshake: bool,
727-
) -> anyhow::Result<()> {
728-
let connection = connecting.await.context("error accepting connection")?;
729-
let remote_node_id = get_remote_node_id(&connection)?;
730-
tracing::info!("got connection from {}", remote_node_id);
731-
if handshake {
732-
// read the handshake and verify it
733-
let bytes = connection.read_datagram().await?;
734-
anyhow::ensure!(*bytes == dumbpipe::HANDSHAKE, "invalid handshake");
735-
}
736-
737-
// 1- Receives request message from quinn stream
738-
// 2- Forwards it to the (addrs) via UDP socket
739-
// 3- Receives response message back from UDP socket
740-
// 4- Forwards it back to the quinn stream
741-
udpconn::handle_udp_listen(addrs.as_slice(), connection).await?;
742-
Ok(())
743-
}
744-
745-
loop {
746-
let incoming = select! {
747-
incoming = endpoint.accept() => incoming,
748-
_ = tokio::signal::ctrl_c() => {
749-
eprintln!("got ctrl-c, exiting");
750-
break;
751-
}
752-
};
753-
let Some(incoming) = incoming else {
754-
break;
755-
};
756-
let Ok(connecting) = incoming.accept() else {
757-
break;
758-
};
759-
let addrs = addrs.clone();
760-
let handshake = !args.common.is_custom_alpn();
761-
tokio::spawn(async move {
762-
if let Err(cause) = handle_magic_accept(connecting, addrs, handshake).await {
763-
// log error at warn level
764-
//
765-
// we should know about it, but it's not fatal
766-
tracing::warn!("error handling connection: {}", cause);
767-
}
768-
});
769-
}
770-
Ok(())
771-
}
772-
773576
#[tokio::main]
774577
async fn main() -> anyhow::Result<()> {
775578
tracing_subscriber::fmt::init();
776579
let args = Args::parse();
777580
let res = match args.command {
778581
Commands::Listen(args) => listen_stdio(args).await,
779582
Commands::ListenTcp(args) => listen_tcp(args).await,
780-
Commands::ListenUdp(args) => listen_udp(args).await,
583+
Commands::ListenUdp(args) => udp::listen_udp(args).await,
781584
Commands::Connect(args) => connect_stdio(args).await,
782585
Commands::ConnectTcp(args) => connect_tcp(args).await,
783-
Commands::ConnectUdp(args) => connect_udp(args).await,
586+
Commands::ConnectUdp(args) => udp::connect_udp(args).await,
784587
};
785588
match res {
786589
Ok(()) => std::process::exit(0),

0 commit comments

Comments
 (0)