diff --git a/CHANGELOG.md b/CHANGELOG.md index 6bfbf28a028..d96f72c1e6c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,9 +22,11 @@ Use Kubernetes beta feature `Ephemeral Containers` to mirror traffic with the `- - Refactored error handling in mirrord-layer. - E2E: Collect minikube logs and fix collecting container logs - E2E: macOS use colima instead of minikube. +- Refactored `mirrord-layer/lib.rs` - no more passing many arguments! :) ### Fixed - Handle unwraps in fileops to gracefully exit and enable python fileops tests. +- Changed `addrinfo` to `VecDeque` - fixes a potential bug (loss of order) ## 2.4.1 diff --git a/mirrord-layer/src/lib.rs b/mirrord-layer/src/lib.rs index e5ea283fbbf..f771e6d5283 100644 --- a/mirrord-layer/src/lib.rs +++ b/mirrord-layer/src/lib.rs @@ -4,11 +4,10 @@ #![feature(const_trait_impl)] use std::{ - collections::HashSet, - sync::{LazyLock, Mutex, OnceLock}, + collections::{HashSet, VecDeque}, + sync::{LazyLock, OnceLock}, }; -use actix_codec::{AsyncRead, AsyncWrite}; use common::{GetAddrInfoHook, ResponseChannel}; use ctor::ctor; use envconfig::Envconfig; @@ -80,128 +79,129 @@ fn init() { RUNTIME.block_on(start_layer_thread(port_forwarder, receiver, config)); } -async fn handle_hook_message( - hook_message: HookMessage, - tcp_mirror_handler: &mut TcpMirrorHandler, - codec: &mut actix_codec::Framed, - getaddrinfo_handler: &Mutex>>>, - file_handler: &mut FileHandler, -) { - match hook_message { - HookMessage::Tcp(message) => { - tcp_mirror_handler - .handle_hook_message(message, codec) - .await - .unwrap(); - } - HookMessage::File(message) => { - file_handler - .handle_hook_message(message, codec) - .await - .unwrap(); - } - HookMessage::GetAddrInfoHook(GetAddrInfoHook { - node, - service, - hints, - hook_channel_tx, - }) => { - trace!("HookMessage::GetAddrInfo"); +struct Layer +where + T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send, +{ + pub codec: actix_codec::Framed, + ping: bool, + tcp_mirror_handler: TcpMirrorHandler, + // TODO: Starting to think about a better abstraction over this whole mess. File operations are + // pretty much just `std::fs::File` things, so I think the best approach would be to create + // a `FakeFile`, and implement `std::io` traits on it. + // + // Maybe every `FakeFile` could hold it's own `oneshot` channel, read more about this on the + // `common` module above `XHook` structs. + file_handler: FileHandler, - getaddrinfo_handler.lock().unwrap().push(hook_channel_tx); + // Stores a list of `oneshot`s that communicates with the hook side (send a message from -layer + // to -agent, and when we receive a message from -agent to -layer). + getaddrinfo_handler_queue: VecDeque>>, +} - let request = ClientMessage::GetAddrInfoRequest(GetAddrInfoRequest { +impl Layer +where + T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send, +{ + fn new(codec: actix_codec::Framed) -> Layer { + Self { + codec, + ping: false, + tcp_mirror_handler: TcpMirrorHandler::default(), + file_handler: FileHandler::default(), + getaddrinfo_handler_queue: VecDeque::new(), + } + } + + async fn handle_hook_message(&mut self, hook_message: HookMessage) { + match hook_message { + HookMessage::Tcp(message) => { + self.tcp_mirror_handler + .handle_hook_message(message, &mut self.codec) + .await + .unwrap(); + } + HookMessage::File(message) => { + self.file_handler + .handle_hook_message(message, &mut self.codec) + .await + .unwrap(); + } + HookMessage::GetAddrInfoHook(GetAddrInfoHook { node, service, hints, - }); + hook_channel_tx, + }) => { + trace!("HookMessage::GetAddrInfo"); - // TODO: Handle this error. We're just ignoring it here and letting -layer crash later. - let _codec_result = codec.send(request).await; + self.getaddrinfo_handler_queue.push_back(hook_channel_tx); + + let request = ClientMessage::GetAddrInfoRequest(GetAddrInfoRequest { + node, + service, + hints, + }); + + self.codec.send(request).await.unwrap(); + } } } -} -async fn handle_daemon_message( - daemon_message: DaemonMessage, - tcp_mirror_handler: &mut TcpMirrorHandler, - file_handler: &mut FileHandler, - getaddrinfo_handler: &Mutex>>>, - ping: &mut bool, -) -> Result<(), LayerError> { - match daemon_message { - DaemonMessage::Tcp(message) => tcp_mirror_handler.handle_daemon_message(message).await, - DaemonMessage::File(message) => file_handler.handle_daemon_message(message).await, - DaemonMessage::Pong => { - if *ping { - *ping = false; - trace!("Daemon sent pong!"); - } else { - Err(LayerError::UnmatchedPong)?; + async fn handle_daemon_message( + &mut self, + daemon_message: DaemonMessage, + ) -> Result<(), LayerError> { + match daemon_message { + DaemonMessage::Tcp(message) => { + self.tcp_mirror_handler.handle_daemon_message(message).await } + DaemonMessage::File(message) => self.file_handler.handle_daemon_message(message).await, + DaemonMessage::Pong => { + if self.ping { + self.ping = false; + trace!("Daemon sent pong!"); + } else { + Err(LayerError::UnmatchedPong)?; + } - Ok(()) - } - DaemonMessage::GetEnvVarsResponse(_) => { - unreachable!("We get env vars only on initialization right now, shouldn't happen") - } - DaemonMessage::GetAddrInfoResponse(get_addr_info) => { - trace!("DaemonMessage::GetAddrInfoResponse {:#?}", get_addr_info); - - getaddrinfo_handler - .lock()? - .pop() - .ok_or(LayerError::SendErrorGetAddrInfoResponse)? - .send(get_addr_info) - .map_err(|_| LayerError::SendErrorGetAddrInfoResponse) + Ok(()) + } + DaemonMessage::GetEnvVarsResponse(_) => { + unreachable!("We get env vars only on initialization right now, shouldn't happen") + } + DaemonMessage::GetAddrInfoResponse(get_addr_info) => { + trace!("DaemonMessage::GetAddrInfoResponse {:#?}", get_addr_info); + + self.getaddrinfo_handler_queue + .pop_front() + .ok_or(LayerError::SendErrorGetAddrInfoResponse)? + .send(get_addr_info) + .map_err(|_| LayerError::SendErrorGetAddrInfoResponse) + } + DaemonMessage::Close => todo!(), + DaemonMessage::LogMessage(_) => todo!(), } - DaemonMessage::Close => todo!(), - DaemonMessage::LogMessage(_) => todo!(), } } async fn thread_loop( mut receiver: Receiver, - mut codec: actix_codec::Framed< + codec: actix_codec::Framed< impl tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send, ClientCodec, >, ) { - // TODO: Starting to think about a better abstraction over this whole mess. File operations are - // pretty much just `std::fs::File` things, so I think the best approach would be to create - // a `FakeFile`, and implement `std::io` traits on it. - // - // Maybe every `FakeFile` could hold it's own `oneshot` channel, read more about this on the - // `common` module above `XHook` structs. - - // Stores a list of `oneshot`s that communicates with the hook side (send a message from -layer - // to -agent, and when we receive a message from -agent to -layer). - let getaddrinfo_handler = Mutex::new(Vec::with_capacity(4)); - - let mut ping = false; - let mut tcp_mirror_handler = TcpMirrorHandler::default(); - let mut file_handler = FileHandler::default(); - + let mut layer = Layer::new(codec); loop { select! { hook_message = receiver.recv() => { - handle_hook_message( - hook_message.unwrap(), - &mut tcp_mirror_handler, - &mut codec, - &getaddrinfo_handler, - &mut file_handler, - ).await; + layer.handle_hook_message(hook_message.unwrap()).await; } - daemon_message = codec.next() => { + daemon_message = layer.codec.next() => { if let Some(Ok(message)) = daemon_message { - if let Err(err) = handle_daemon_message( - message, - &mut tcp_mirror_handler, - &mut file_handler, - &getaddrinfo_handler, - &mut ping, - ).await { + if let Err(err) = layer.handle_daemon_message( + message).await { error!("Error handling daemon message: {:?}", err); break; } @@ -211,10 +211,10 @@ async fn thread_loop( } }, _ = sleep(Duration::from_secs(60)) => { - if !ping { - codec.send(ClientMessage::Ping).await.unwrap(); + if !layer.ping { + layer.codec.send(ClientMessage::Ping).await.unwrap(); trace!("sent ping to daemon"); - ping = true; + layer.ping = true; } else { panic!("Client: unmatched ping"); } diff --git a/mirrord-layer/src/socket/hooks.rs b/mirrord-layer/src/socket/hooks.rs index f2f986f320e..b77cb75804a 100644 --- a/mirrord-layer/src/socket/hooks.rs +++ b/mirrord-layer/src/socket/hooks.rs @@ -200,9 +200,10 @@ unsafe extern "C" fn freeaddrinfo_detour(addrinfo: *mut libc::addrinfo) { // Iterate over `addrinfo` linked list dropping it. let mut current = addrinfo; while !current.is_null() { - Box::from_raw(current); + let current_box = Box::from_raw(current); current = (*current).ai_next; + drop(current_box); } }