Skip to content

Commit

Permalink
extra tracing added
Browse files Browse the repository at this point in the history
  • Loading branch information
Razz4780 committed Mar 19, 2024
1 parent 53cce9c commit ba2fbb9
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 27 deletions.
24 changes: 20 additions & 4 deletions mirrord/intproxy/src/layer_conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,19 @@ impl BackgroundTask for LayerConnection {
tokio::select! {
res = self.layer_codec_rx.receive() => match res {
Err(e) => {
tracing::error!("layer connection failed with {e:?} when receiving");
tracing::error!(
layer_id = ?self.layer_id,
"layer connection failed with {e:?} when receiving",
);

break Err(e);
},
Ok(None) => {
tracing::trace!("message bus closed, exiting");
tracing::trace!(
layer_id = ?self.layer_id,
"message bus closed, exiting",
);

break Ok(());
}
Ok(Some(msg)) => message_bus.send(FromLayer { message: msg.inner, message_id: msg.message_id, layer_id: self.layer_id }).await,
Expand All @@ -68,12 +76,20 @@ impl BackgroundTask for LayerConnection {
msg = message_bus.recv() => match msg {
Some(msg) => {
if let Err(e) = self.send_and_flush(&msg).await {
tracing::error!("layer connection failed with {e:?} when sending {msg:?}");
tracing::error!(
layer_id = ?self.layer_id,
"layer connection failed with {e:?} when sending {msg:?}",
);

break Err(e);
}
},
None => {
tracing::trace!("no more messages from the proxy, exiting");
tracing::trace!(
layer_id = ?self.layer_id,
"no more messages from the proxy, exiting",
);

break Ok(());
},
},
Expand Down
4 changes: 3 additions & 1 deletion mirrord/intproxy/src/proxies/outgoing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ impl OutgoingProxy {
local_address,
} = connect;

let prepared_socket = protocol.prepare_socket(remote_address).await?;
let prepared_socket = protocol
.prepare_socket(remote_address, connection_id)
.await?;
let layer_address = prepared_socket.local_address()?;

let id = InterceptorId {
Expand Down
18 changes: 16 additions & 2 deletions mirrord/intproxy/src/proxies/outgoing/interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,27 @@ impl BackgroundTask for Interceptor {
continue;
},
Err(e) => break Err(e),
Ok(bytes) if bytes.is_empty() => break Ok(()),
Ok(bytes) if bytes.is_empty() => {
tracing::trace!(
connection_id = connected_socket.connection_id(),
"layer stopped stopped writing, outgoing interceptor task exits",
);

break Ok(())
},
Ok(bytes) => message_bus.send(bytes).await,
},

bytes = message_bus.recv() => match bytes {
Some(bytes) => connected_socket.send(&bytes).await?,
None => break Ok(()),
None => {
tracing::trace!(
connection_id = connected_socket.connection_id(),
"message bus closed, outgoing interceptor task exits",
);

break Ok(())
},
},
}
}
Expand Down
86 changes: 66 additions & 20 deletions mirrord/intproxy/src/proxies/outgoing/net_protocol_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! [`OutgoingProxy`](super::OutgoingProxy).
use std::{
env, io,
env, fmt, io,
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
path::PathBuf,
};
Expand Down Expand Up @@ -39,7 +39,11 @@ pub trait NetProtocolExt: Sized {
fn wrap_agent_connect(self, remote_address: SocketAddress) -> ClientMessage;

/// Opens a new socket for intercepting a connection to the given remote address.
async fn prepare_socket(self, for_remote_address: SocketAddress) -> io::Result<PreparedSocket>;
async fn prepare_socket(
self,
for_remote_address: SocketAddress,
connection_id: ConnectionId,
) -> io::Result<PreparedSocket>;
}

impl NetProtocolExt for NetProtocol {
Expand Down Expand Up @@ -80,8 +84,12 @@ impl NetProtocolExt for NetProtocol {
}
}

async fn prepare_socket(self, for_remote_address: SocketAddress) -> io::Result<PreparedSocket> {
let socket = match for_remote_address {
async fn prepare_socket(
self,
for_remote_address: SocketAddress,
connection_id: ConnectionId,
) -> io::Result<PreparedSocket> {
let inner = match for_remote_address {
SocketAddress::Ip(addr) => {
let ip_addr = match addr.ip() {
IpAddr::V4(..) => IpAddr::V4(Ipv4Addr::LOCALHOST),
Expand All @@ -90,14 +98,18 @@ impl NetProtocolExt for NetProtocol {
let bind_at = SocketAddr::new(ip_addr, 0);

match self {
Self::Datagrams => PreparedSocket::UdpSocket(UdpSocket::bind(bind_at).await?),
Self::Stream => PreparedSocket::TcpListener(TcpListener::bind(bind_at).await?),
Self::Datagrams => {
InnerPreparedSocket::UdpSocket(UdpSocket::bind(bind_at).await?)
}
Self::Stream => {
InnerPreparedSocket::TcpListener(TcpListener::bind(bind_at).await?)
}
}
}
SocketAddress::Unix(..) => match self {
Self::Stream => {
let path = PreparedSocket::generate_uds_path().await?;
PreparedSocket::UnixListener(UnixListener::bind(path)?)
InnerPreparedSocket::UnixListener(UnixListener::bind(path)?)
}
Self::Datagrams => {
tracing::error!("layer requested intercepting outgoing datagrams over unix socket, this is not supported");
Expand All @@ -106,18 +118,28 @@ impl NetProtocolExt for NetProtocol {
},
};

Ok(socket)
Ok(PreparedSocket {
inner,
connection_id,
})
}
}

/// A socket prepared to accept an intercepted connection.
pub enum PreparedSocket {
#[derive(Debug)]
enum InnerPreparedSocket {
/// There is no real listening/accepting here, see [`NetProtocol::Datagrams`] for more info.
UdpSocket(UdpSocket),
TcpListener(TcpListener),
UnixListener(UnixListener),
}

/// A socket prepared to accept an intercepted connection.
#[derive(Debug)]
pub struct PreparedSocket {
inner: InnerPreparedSocket,
connection_id: ConnectionId,
}

impl PreparedSocket {
/// For unix listeners, relative to the temp dir.
const UNIX_STREAMS_DIRNAME: &'static str = "mirrord-unix-sockets";
Expand All @@ -139,10 +161,10 @@ impl PreparedSocket {

/// Returns the address of this socket.
pub fn local_address(&self) -> io::Result<SocketAddress> {
let address = match self {
Self::TcpListener(listener) => listener.local_addr()?.into(),
Self::UdpSocket(socket) => socket.local_addr()?.into(),
Self::UnixListener(listener) => {
let address = match &self.inner {
InnerPreparedSocket::TcpListener(listener) => listener.local_addr()?.into(),
InnerPreparedSocket::UdpSocket(socket) => socket.local_addr()?.into(),
InnerPreparedSocket::UnixListener(listener) => {
let addr = listener.local_addr()?;
let pathname = addr.as_pathname().unwrap().to_path_buf();
SocketAddress::Unix(UnixAddr::Pathname(pathname))
Expand All @@ -154,27 +176,34 @@ impl PreparedSocket {

/// Accepts one connection on this socket and returns a new socket for sending and receiving
/// data.
#[tracing::instrument(level = "trace", name = "outgoing_listener_accept", ret, err(Debug))]
pub async fn accept(self) -> io::Result<ConnectedSocket> {
let (inner, is_really_connected) = match self {
Self::TcpListener(listener) => {
let (stream, _) = listener.accept().await?;
let (inner, is_really_connected) = match self.inner {
InnerPreparedSocket::TcpListener(listener) => {
let (stream, peer) = listener.accept().await?;
tracing::trace!(?peer, "accepted connection");
(InnerConnectedSocket::TcpStream(stream), true)
}
Self::UdpSocket(socket) => (InnerConnectedSocket::UdpSocket(socket), false),
Self::UnixListener(listener) => {
let (stream, _) = listener.accept().await?;
InnerPreparedSocket::UdpSocket(socket) => {
(InnerConnectedSocket::UdpSocket(socket), false)
}
InnerPreparedSocket::UnixListener(listener) => {
let (stream, peer) = listener.accept().await?;
tracing::trace!(?peer, "accepted connection");
(InnerConnectedSocket::UnixStream(stream), true)
}
};

Ok(ConnectedSocket {
inner,
connection_id: self.connection_id,
is_really_connected,
buffer: BytesMut::with_capacity(64 * 1024),
})
}
}

#[derive(Debug)]
enum InnerConnectedSocket {
UdpSocket(UdpSocket),
TcpStream(TcpStream),
Expand All @@ -184,13 +213,29 @@ enum InnerConnectedSocket {
/// A socket for intercepted connection with the layer.
pub struct ConnectedSocket {
inner: InnerConnectedSocket,
connection_id: ConnectionId,
/// Meaningful only when `inner` is [`InnerConnectedSocket::UdpSocket`].
is_really_connected: bool,
buffer: BytesMut,
}

impl fmt::Debug for ConnectedSocket {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ConnectedSocket")
.field("inner", &self.inner)
.field("connection_id", &self.connection_id)
.field("is_really_connected", &self.is_really_connected)
.finish()
}
}

impl ConnectedSocket {
pub fn connection_id(&self) -> ConnectionId {
self.connection_id
}

/// Sends all given data to the layer.
#[tracing::instrument(level = "trace", name = "outgoing_socket_send", fields(bytes_len = bytes.len()), err(Debug))]
pub async fn send(&mut self, bytes: &[u8]) -> io::Result<()> {
match &mut self.inner {
InnerConnectedSocket::UdpSocket(socket) => {
Expand All @@ -215,6 +260,7 @@ impl ConnectedSocket {
}

/// Receives some data from the layer.
#[tracing::instrument(level = "trace", name = "outgoing_socket_receive", err(Debug))]
pub async fn receive(&mut self) -> io::Result<Vec<u8>> {
match &mut self.inner {
InnerConnectedSocket::UdpSocket(socket) => {
Expand Down

0 comments on commit ba2fbb9

Please sign in to comment.