From ba2fbb933c666e42ae6a3d847fdd89a296e1133e Mon Sep 17 00:00:00 2001 From: Razz4780 Date: Tue, 19 Mar 2024 12:42:35 +0100 Subject: [PATCH] extra tracing added --- mirrord/intproxy/src/layer_conn.rs | 24 +++++- mirrord/intproxy/src/proxies/outgoing.rs | 4 +- .../src/proxies/outgoing/interceptor.rs | 18 +++- .../src/proxies/outgoing/net_protocol_ext.rs | 86 ++++++++++++++----- 4 files changed, 105 insertions(+), 27 deletions(-) diff --git a/mirrord/intproxy/src/layer_conn.rs b/mirrord/intproxy/src/layer_conn.rs index 91424bd5c9b..9d56b8f6c7e 100644 --- a/mirrord/intproxy/src/layer_conn.rs +++ b/mirrord/intproxy/src/layer_conn.rs @@ -55,11 +55,19 @@ impl BackgroundTask for LayerConnection { tokio::select! { res = self.layer_codec_rx.receive() => match res { Err(e) => { - tracing::error!("layer connection failed with {e:?} when receiving"); + tracing::error!( + layer_id = ?self.layer_id, + "layer connection failed with {e:?} when receiving", + ); + break Err(e); }, Ok(None) => { - tracing::trace!("message bus closed, exiting"); + tracing::trace!( + layer_id = ?self.layer_id, + "message bus closed, exiting", + ); + break Ok(()); } Ok(Some(msg)) => message_bus.send(FromLayer { message: msg.inner, message_id: msg.message_id, layer_id: self.layer_id }).await, @@ -68,12 +76,20 @@ impl BackgroundTask for LayerConnection { msg = message_bus.recv() => match msg { Some(msg) => { if let Err(e) = self.send_and_flush(&msg).await { - tracing::error!("layer connection failed with {e:?} when sending {msg:?}"); + tracing::error!( + layer_id = ?self.layer_id, + "layer connection failed with {e:?} when sending {msg:?}", + ); + break Err(e); } }, None => { - tracing::trace!("no more messages from the proxy, exiting"); + tracing::trace!( + layer_id = ?self.layer_id, + "no more messages from the proxy, exiting", + ); + break Ok(()); }, }, diff --git a/mirrord/intproxy/src/proxies/outgoing.rs b/mirrord/intproxy/src/proxies/outgoing.rs index e9eb59b796b..2ea3b480485 100644 --- a/mirrord/intproxy/src/proxies/outgoing.rs +++ b/mirrord/intproxy/src/proxies/outgoing.rs @@ -164,7 +164,9 @@ impl OutgoingProxy { local_address, } = connect; - let prepared_socket = protocol.prepare_socket(remote_address).await?; + let prepared_socket = protocol + .prepare_socket(remote_address, connection_id) + .await?; let layer_address = prepared_socket.local_address()?; let id = InterceptorId { diff --git a/mirrord/intproxy/src/proxies/outgoing/interceptor.rs b/mirrord/intproxy/src/proxies/outgoing/interceptor.rs index b68b18ea1c3..212b35b20cd 100644 --- a/mirrord/intproxy/src/proxies/outgoing/interceptor.rs +++ b/mirrord/intproxy/src/proxies/outgoing/interceptor.rs @@ -40,13 +40,27 @@ impl BackgroundTask for Interceptor { continue; }, Err(e) => break Err(e), - Ok(bytes) if bytes.is_empty() => break Ok(()), + Ok(bytes) if bytes.is_empty() => { + tracing::trace!( + connection_id = connected_socket.connection_id(), + "layer stopped stopped writing, outgoing interceptor task exits", + ); + + break Ok(()) + }, Ok(bytes) => message_bus.send(bytes).await, }, bytes = message_bus.recv() => match bytes { Some(bytes) => connected_socket.send(&bytes).await?, - None => break Ok(()), + None => { + tracing::trace!( + connection_id = connected_socket.connection_id(), + "message bus closed, outgoing interceptor task exits", + ); + + break Ok(()) + }, }, } } diff --git a/mirrord/intproxy/src/proxies/outgoing/net_protocol_ext.rs b/mirrord/intproxy/src/proxies/outgoing/net_protocol_ext.rs index f8b595b7199..018d2e3f51a 100644 --- a/mirrord/intproxy/src/proxies/outgoing/net_protocol_ext.rs +++ b/mirrord/intproxy/src/proxies/outgoing/net_protocol_ext.rs @@ -2,7 +2,7 @@ //! [`OutgoingProxy`](super::OutgoingProxy). use std::{ - env, io, + env, fmt, io, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, path::PathBuf, }; @@ -39,7 +39,11 @@ pub trait NetProtocolExt: Sized { fn wrap_agent_connect(self, remote_address: SocketAddress) -> ClientMessage; /// Opens a new socket for intercepting a connection to the given remote address. - async fn prepare_socket(self, for_remote_address: SocketAddress) -> io::Result; + async fn prepare_socket( + self, + for_remote_address: SocketAddress, + connection_id: ConnectionId, + ) -> io::Result; } impl NetProtocolExt for NetProtocol { @@ -80,8 +84,12 @@ impl NetProtocolExt for NetProtocol { } } - async fn prepare_socket(self, for_remote_address: SocketAddress) -> io::Result { - let socket = match for_remote_address { + async fn prepare_socket( + self, + for_remote_address: SocketAddress, + connection_id: ConnectionId, + ) -> io::Result { + let inner = match for_remote_address { SocketAddress::Ip(addr) => { let ip_addr = match addr.ip() { IpAddr::V4(..) => IpAddr::V4(Ipv4Addr::LOCALHOST), @@ -90,14 +98,18 @@ impl NetProtocolExt for NetProtocol { let bind_at = SocketAddr::new(ip_addr, 0); match self { - Self::Datagrams => PreparedSocket::UdpSocket(UdpSocket::bind(bind_at).await?), - Self::Stream => PreparedSocket::TcpListener(TcpListener::bind(bind_at).await?), + Self::Datagrams => { + InnerPreparedSocket::UdpSocket(UdpSocket::bind(bind_at).await?) + } + Self::Stream => { + InnerPreparedSocket::TcpListener(TcpListener::bind(bind_at).await?) + } } } SocketAddress::Unix(..) => match self { Self::Stream => { let path = PreparedSocket::generate_uds_path().await?; - PreparedSocket::UnixListener(UnixListener::bind(path)?) + InnerPreparedSocket::UnixListener(UnixListener::bind(path)?) } Self::Datagrams => { tracing::error!("layer requested intercepting outgoing datagrams over unix socket, this is not supported"); @@ -106,18 +118,28 @@ impl NetProtocolExt for NetProtocol { }, }; - Ok(socket) + Ok(PreparedSocket { + inner, + connection_id, + }) } } -/// A socket prepared to accept an intercepted connection. -pub enum PreparedSocket { +#[derive(Debug)] +enum InnerPreparedSocket { /// There is no real listening/accepting here, see [`NetProtocol::Datagrams`] for more info. UdpSocket(UdpSocket), TcpListener(TcpListener), UnixListener(UnixListener), } +/// A socket prepared to accept an intercepted connection. +#[derive(Debug)] +pub struct PreparedSocket { + inner: InnerPreparedSocket, + connection_id: ConnectionId, +} + impl PreparedSocket { /// For unix listeners, relative to the temp dir. const UNIX_STREAMS_DIRNAME: &'static str = "mirrord-unix-sockets"; @@ -139,10 +161,10 @@ impl PreparedSocket { /// Returns the address of this socket. pub fn local_address(&self) -> io::Result { - let address = match self { - Self::TcpListener(listener) => listener.local_addr()?.into(), - Self::UdpSocket(socket) => socket.local_addr()?.into(), - Self::UnixListener(listener) => { + let address = match &self.inner { + InnerPreparedSocket::TcpListener(listener) => listener.local_addr()?.into(), + InnerPreparedSocket::UdpSocket(socket) => socket.local_addr()?.into(), + InnerPreparedSocket::UnixListener(listener) => { let addr = listener.local_addr()?; let pathname = addr.as_pathname().unwrap().to_path_buf(); SocketAddress::Unix(UnixAddr::Pathname(pathname)) @@ -154,27 +176,34 @@ impl PreparedSocket { /// Accepts one connection on this socket and returns a new socket for sending and receiving /// data. + #[tracing::instrument(level = "trace", name = "outgoing_listener_accept", ret, err(Debug))] pub async fn accept(self) -> io::Result { - let (inner, is_really_connected) = match self { - Self::TcpListener(listener) => { - let (stream, _) = listener.accept().await?; + let (inner, is_really_connected) = match self.inner { + InnerPreparedSocket::TcpListener(listener) => { + let (stream, peer) = listener.accept().await?; + tracing::trace!(?peer, "accepted connection"); (InnerConnectedSocket::TcpStream(stream), true) } - Self::UdpSocket(socket) => (InnerConnectedSocket::UdpSocket(socket), false), - Self::UnixListener(listener) => { - let (stream, _) = listener.accept().await?; + InnerPreparedSocket::UdpSocket(socket) => { + (InnerConnectedSocket::UdpSocket(socket), false) + } + InnerPreparedSocket::UnixListener(listener) => { + let (stream, peer) = listener.accept().await?; + tracing::trace!(?peer, "accepted connection"); (InnerConnectedSocket::UnixStream(stream), true) } }; Ok(ConnectedSocket { inner, + connection_id: self.connection_id, is_really_connected, buffer: BytesMut::with_capacity(64 * 1024), }) } } +#[derive(Debug)] enum InnerConnectedSocket { UdpSocket(UdpSocket), TcpStream(TcpStream), @@ -184,13 +213,29 @@ enum InnerConnectedSocket { /// A socket for intercepted connection with the layer. pub struct ConnectedSocket { inner: InnerConnectedSocket, + connection_id: ConnectionId, /// Meaningful only when `inner` is [`InnerConnectedSocket::UdpSocket`]. is_really_connected: bool, buffer: BytesMut, } +impl fmt::Debug for ConnectedSocket { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ConnectedSocket") + .field("inner", &self.inner) + .field("connection_id", &self.connection_id) + .field("is_really_connected", &self.is_really_connected) + .finish() + } +} + impl ConnectedSocket { + pub fn connection_id(&self) -> ConnectionId { + self.connection_id + } + /// Sends all given data to the layer. + #[tracing::instrument(level = "trace", name = "outgoing_socket_send", fields(bytes_len = bytes.len()), err(Debug))] pub async fn send(&mut self, bytes: &[u8]) -> io::Result<()> { match &mut self.inner { InnerConnectedSocket::UdpSocket(socket) => { @@ -215,6 +260,7 @@ impl ConnectedSocket { } /// Receives some data from the layer. + #[tracing::instrument(level = "trace", name = "outgoing_socket_receive", err(Debug))] pub async fn receive(&mut self) -> io::Result> { match &mut self.inner { InnerConnectedSocket::UdpSocket(socket) => {