Skip to content

Commit

Permalink
Last layer refactor I promise (#211)
Browse files Browse the repository at this point in the history
* Refactored `mirrord-layer/lib.rs` - no more passsing many arguments! :)
* Changed `addrinfo` to `VecDeque` - fixes a potential bug (loss of order)
  • Loading branch information
aviramha authored Jul 17, 2022
1 parent 7c15a35 commit 6440e75
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 103 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ Use Kubernetes beta feature `Ephemeral Containers` to mirror traffic with the `-
- Refactored error handling in mirrord-layer.
- E2E: Collect minikube logs and fix collecting container logs
- E2E: macOS use colima instead of minikube.
- Refactored `mirrord-layer/lib.rs` - no more passing many arguments! :)

### Fixed
- Handle unwraps in fileops to gracefully exit and enable python fileops tests.
- Changed `addrinfo` to `VecDeque` - fixes a potential bug (loss of order)

## 2.4.1

Expand Down
204 changes: 102 additions & 102 deletions mirrord-layer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@
#![feature(const_trait_impl)]

use std::{
collections::HashSet,
sync::{LazyLock, Mutex, OnceLock},
collections::{HashSet, VecDeque},
sync::{LazyLock, OnceLock},
};

use actix_codec::{AsyncRead, AsyncWrite};
use common::{GetAddrInfoHook, ResponseChannel};
use ctor::ctor;
use envconfig::Envconfig;
Expand Down Expand Up @@ -80,128 +79,129 @@ fn init() {
RUNTIME.block_on(start_layer_thread(port_forwarder, receiver, config));
}

async fn handle_hook_message(
hook_message: HookMessage,
tcp_mirror_handler: &mut TcpMirrorHandler,
codec: &mut actix_codec::Framed<impl AsyncRead + AsyncWrite + Unpin + Send, ClientCodec>,
getaddrinfo_handler: &Mutex<Vec<ResponseChannel<Vec<AddrInfoInternal>>>>,
file_handler: &mut FileHandler,
) {
match hook_message {
HookMessage::Tcp(message) => {
tcp_mirror_handler
.handle_hook_message(message, codec)
.await
.unwrap();
}
HookMessage::File(message) => {
file_handler
.handle_hook_message(message, codec)
.await
.unwrap();
}
HookMessage::GetAddrInfoHook(GetAddrInfoHook {
node,
service,
hints,
hook_channel_tx,
}) => {
trace!("HookMessage::GetAddrInfo");
struct Layer<T>
where
T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send,
{
pub codec: actix_codec::Framed<T, ClientCodec>,
ping: bool,
tcp_mirror_handler: TcpMirrorHandler,
// TODO: Starting to think about a better abstraction over this whole mess. File operations are
// pretty much just `std::fs::File` things, so I think the best approach would be to create
// a `FakeFile`, and implement `std::io` traits on it.
//
// Maybe every `FakeFile` could hold it's own `oneshot` channel, read more about this on the
// `common` module above `XHook` structs.
file_handler: FileHandler,

getaddrinfo_handler.lock().unwrap().push(hook_channel_tx);
// Stores a list of `oneshot`s that communicates with the hook side (send a message from -layer
// to -agent, and when we receive a message from -agent to -layer).
getaddrinfo_handler_queue: VecDeque<ResponseChannel<Vec<AddrInfoInternal>>>,
}

let request = ClientMessage::GetAddrInfoRequest(GetAddrInfoRequest {
impl<T> Layer<T>
where
T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send,
{
fn new(codec: actix_codec::Framed<T, ClientCodec>) -> Layer<T> {
Self {
codec,
ping: false,
tcp_mirror_handler: TcpMirrorHandler::default(),
file_handler: FileHandler::default(),
getaddrinfo_handler_queue: VecDeque::new(),
}
}

async fn handle_hook_message(&mut self, hook_message: HookMessage) {
match hook_message {
HookMessage::Tcp(message) => {
self.tcp_mirror_handler
.handle_hook_message(message, &mut self.codec)
.await
.unwrap();
}
HookMessage::File(message) => {
self.file_handler
.handle_hook_message(message, &mut self.codec)
.await
.unwrap();
}
HookMessage::GetAddrInfoHook(GetAddrInfoHook {
node,
service,
hints,
});
hook_channel_tx,
}) => {
trace!("HookMessage::GetAddrInfo");

// TODO: Handle this error. We're just ignoring it here and letting -layer crash later.
let _codec_result = codec.send(request).await;
self.getaddrinfo_handler_queue.push_back(hook_channel_tx);

let request = ClientMessage::GetAddrInfoRequest(GetAddrInfoRequest {
node,
service,
hints,
});

self.codec.send(request).await.unwrap();
}
}
}
}

async fn handle_daemon_message(
daemon_message: DaemonMessage,
tcp_mirror_handler: &mut TcpMirrorHandler,
file_handler: &mut FileHandler,
getaddrinfo_handler: &Mutex<Vec<ResponseChannel<Vec<AddrInfoInternal>>>>,
ping: &mut bool,
) -> Result<(), LayerError> {
match daemon_message {
DaemonMessage::Tcp(message) => tcp_mirror_handler.handle_daemon_message(message).await,
DaemonMessage::File(message) => file_handler.handle_daemon_message(message).await,
DaemonMessage::Pong => {
if *ping {
*ping = false;
trace!("Daemon sent pong!");
} else {
Err(LayerError::UnmatchedPong)?;
async fn handle_daemon_message(
&mut self,
daemon_message: DaemonMessage,
) -> Result<(), LayerError> {
match daemon_message {
DaemonMessage::Tcp(message) => {
self.tcp_mirror_handler.handle_daemon_message(message).await
}
DaemonMessage::File(message) => self.file_handler.handle_daemon_message(message).await,
DaemonMessage::Pong => {
if self.ping {
self.ping = false;
trace!("Daemon sent pong!");
} else {
Err(LayerError::UnmatchedPong)?;
}

Ok(())
}
DaemonMessage::GetEnvVarsResponse(_) => {
unreachable!("We get env vars only on initialization right now, shouldn't happen")
}
DaemonMessage::GetAddrInfoResponse(get_addr_info) => {
trace!("DaemonMessage::GetAddrInfoResponse {:#?}", get_addr_info);

getaddrinfo_handler
.lock()?
.pop()
.ok_or(LayerError::SendErrorGetAddrInfoResponse)?
.send(get_addr_info)
.map_err(|_| LayerError::SendErrorGetAddrInfoResponse)
Ok(())
}
DaemonMessage::GetEnvVarsResponse(_) => {
unreachable!("We get env vars only on initialization right now, shouldn't happen")
}
DaemonMessage::GetAddrInfoResponse(get_addr_info) => {
trace!("DaemonMessage::GetAddrInfoResponse {:#?}", get_addr_info);

self.getaddrinfo_handler_queue
.pop_front()
.ok_or(LayerError::SendErrorGetAddrInfoResponse)?
.send(get_addr_info)
.map_err(|_| LayerError::SendErrorGetAddrInfoResponse)
}
DaemonMessage::Close => todo!(),
DaemonMessage::LogMessage(_) => todo!(),
}
DaemonMessage::Close => todo!(),
DaemonMessage::LogMessage(_) => todo!(),
}
}

async fn thread_loop(
mut receiver: Receiver<HookMessage>,
mut codec: actix_codec::Framed<
codec: actix_codec::Framed<
impl tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send,
ClientCodec,
>,
) {
// TODO: Starting to think about a better abstraction over this whole mess. File operations are
// pretty much just `std::fs::File` things, so I think the best approach would be to create
// a `FakeFile`, and implement `std::io` traits on it.
//
// Maybe every `FakeFile` could hold it's own `oneshot` channel, read more about this on the
// `common` module above `XHook` structs.

// Stores a list of `oneshot`s that communicates with the hook side (send a message from -layer
// to -agent, and when we receive a message from -agent to -layer).
let getaddrinfo_handler = Mutex::new(Vec::with_capacity(4));

let mut ping = false;
let mut tcp_mirror_handler = TcpMirrorHandler::default();
let mut file_handler = FileHandler::default();

let mut layer = Layer::new(codec);
loop {
select! {
hook_message = receiver.recv() => {
handle_hook_message(
hook_message.unwrap(),
&mut tcp_mirror_handler,
&mut codec,
&getaddrinfo_handler,
&mut file_handler,
).await;
layer.handle_hook_message(hook_message.unwrap()).await;
}
daemon_message = codec.next() => {
daemon_message = layer.codec.next() => {
if let Some(Ok(message)) = daemon_message {
if let Err(err) = handle_daemon_message(
message,
&mut tcp_mirror_handler,
&mut file_handler,
&getaddrinfo_handler,
&mut ping,
).await {
if let Err(err) = layer.handle_daemon_message(
message).await {
error!("Error handling daemon message: {:?}", err);
break;
}
Expand All @@ -211,10 +211,10 @@ async fn thread_loop(
}
},
_ = sleep(Duration::from_secs(60)) => {
if !ping {
codec.send(ClientMessage::Ping).await.unwrap();
if !layer.ping {
layer.codec.send(ClientMessage::Ping).await.unwrap();
trace!("sent ping to daemon");
ping = true;
layer.ping = true;
} else {
panic!("Client: unmatched ping");
}
Expand Down
3 changes: 2 additions & 1 deletion mirrord-layer/src/socket/hooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,10 @@ unsafe extern "C" fn freeaddrinfo_detour(addrinfo: *mut libc::addrinfo) {
// Iterate over `addrinfo` linked list dropping it.
let mut current = addrinfo;
while !current.is_null() {
Box::from_raw(current);
let current_box = Box::from_raw(current);

current = (*current).ai_next;
drop(current_box);
}
}

Expand Down

0 comments on commit 6440e75

Please sign in to comment.