From 17db4df58628cc8bd0b81640d6a57cae932b545c Mon Sep 17 00:00:00 2001 From: Aviram Hassan Date: Wed, 6 Jul 2022 21:22:26 +0300 Subject: [PATCH] Load environment variables before process starts (#189) * layer: Environment variables now load before process starts, no more race conditions. --- CHANGELOG.md | 1 + mirrord-layer/src/lib.rs | 129 +++++++++++++++++++++------------------ tests/src/sanity.rs | 2 - 3 files changed, 70 insertions(+), 62 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 581301859ab..fa0c122b33a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ Check [Keep a Changelog](http://keepachangelog.com/) for recommendations on how - Refactor e2e, enable only Node HTTP mirroring test. - E2E: add macOS to E2E, support using minikube by env var. - E2E: Skip loading to docker before loading to minikube (load directly to minikube..) +- layer: Environment variables now load before process starts, no more race conditions. ### Fixed - Support connections that start with tcp flags in addition to Syn (on macOS CI we saw CWR + NS) diff --git a/mirrord-layer/src/lib.rs b/mirrord-layer/src/lib.rs index 026c084837a..bbd61bc55a5 100644 --- a/mirrord-layer/src/lib.rs +++ b/mirrord-layer/src/lib.rs @@ -82,7 +82,7 @@ fn init() { let enabled_file_ops = ENABLED_FILE_OPS.get_or_init(|| config.enabled_file_ops); enable_hooks(*enabled_file_ops); - RUNTIME.spawn(poll_agent(port_forwarder, receiver, config)); + RUNTIME.block_on(start_layer_thread(port_forwarder, receiver, config)); } #[allow(clippy::too_many_arguments)] @@ -333,42 +333,21 @@ async fn handle_daemon_message( panic!("Daemon: unmatched pong!"); } } - DaemonMessage::GetEnvVarsResponse(remote_env_vars) => { - debug!("DaemonMessage::GetEnvVarsResponse {:#?}!", remote_env_vars); - - match remote_env_vars { - Ok(remote_env_vars) => { - for (key, value) in remote_env_vars.into_iter() { - debug!( - "DaemonMessage::GetEnvVarsResponse set key {:#?} value {:#?}", - key, value - ); - - std::env::set_var(&key, &value); - debug_assert_eq!(std::env::var(key), Ok(value)); - } - } - Err(fail) => error!( - "Loading remote environment variables failed with {:#?}", - fail - ), - } + DaemonMessage::GetEnvVarsResponse(_) => { + unreachable!("We get env vars only on initialization right now, shouldn't happen") } DaemonMessage::Close => todo!(), DaemonMessage::LogMessage(_) => todo!(), } } -async fn poll_agent( - mut pf: Portforwarder, +async fn thread_loop( mut receiver: Receiver, - config: LayerConfig, + mut codec: actix_codec::Framed< + impl tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send, + ClientCodec, + >, ) { - let port = pf.take_stream(61337).unwrap(); // TODO: Make port configurable - - // `codec` is used to retrieve messages from the daemon (messages that are sent from -agent to - // -layer) - let mut codec = actix_codec::Framed::new(port, ClientCodec::new()); // TODO: Starting to think about a better abstraction over this whole mess. File operations are // pretty much just `std::fs::File` things, so I think the best approach would be to create // a `FakeFile`, and implement `std::io` traits on it. @@ -385,37 +364,6 @@ async fn poll_agent( let close_file_handler = Mutex::new(Vec::with_capacity(4)); let mut ping = false; - - if !config.override_env_vars_exclude.is_empty() && !config.override_env_vars_include.is_empty() - { - panic!( - r#"mirrord-layer encountered an issue: - - mirrord doesn't support specifying both - `OVERRIDE_ENV_VARS_EXCLUDE` and `OVERRIDE_ENV_VARS_INCLUDE` at the same time! - - > Use either `--override_env_vars_exclude` or `--override_env_vars_include`. - >> If you want to include all, use `--override_env_vars_include="*"`."# - ); - } else { - let env_vars_filter = HashSet::from(EnvVars(config.override_env_vars_exclude)); - let env_vars_select = HashSet::from(EnvVars(config.override_env_vars_include)); - - if !env_vars_filter.is_empty() || !env_vars_select.is_empty() { - let codec_result = codec - .send(ClientMessage::GetEnvVarsRequest(GetEnvVarsRequest { - env_vars_filter, - env_vars_select, - })) - .await; - - debug!( - "ClientMessage::GetEnvVarsRequest codec_result {:#?}", - codec_result - ); - } - } - let mut tcp_mirror_handler = TcpMirrorHandler::default(); loop { @@ -460,6 +408,67 @@ async fn poll_agent( } } +async fn start_layer_thread( + mut pf: Portforwarder, + receiver: Receiver, + config: LayerConfig, +) { + let port = pf.take_stream(61337).unwrap(); // TODO: Make port configurable + + // `codec` is used to retrieve messages from the daemon (messages that are sent from -agent to + // -layer) + let mut codec = actix_codec::Framed::new(port, ClientCodec::new()); + + if !config.override_env_vars_exclude.is_empty() && !config.override_env_vars_include.is_empty() + { + panic!( + r#"mirrord-layer encountered an issue: + + mirrord doesn't support specifying both + `OVERRIDE_ENV_VARS_EXCLUDE` and `OVERRIDE_ENV_VARS_INCLUDE` at the same time! + + > Use either `--override_env_vars_exclude` or `--override_env_vars_include`. + >> If you want to include all, use `--override_env_vars_include="*"`."# + ); + } else { + let env_vars_filter = HashSet::from(EnvVars(config.override_env_vars_exclude)); + let env_vars_select = HashSet::from(EnvVars(config.override_env_vars_include)); + + if !env_vars_filter.is_empty() || !env_vars_select.is_empty() { + let codec_result = codec + .send(ClientMessage::GetEnvVarsRequest(GetEnvVarsRequest { + env_vars_filter, + env_vars_select, + })) + .await; + + debug!( + "ClientMessage::GetEnvVarsRequest codec_result {:#?}", + codec_result + ); + + let msg = codec.next().await; + if let Some(Ok(DaemonMessage::GetEnvVarsResponse(Ok(remote_env_vars)))) = msg { + debug!("DaemonMessage::GetEnvVarsResponse {:#?}!", remote_env_vars); + + for (key, value) in remote_env_vars.into_iter() { + debug!( + "DaemonMessage::GetEnvVarsResponse set key {:#?} value {:#?}", + key, value + ); + + std::env::set_var(&key, &value); + debug_assert_eq!(std::env::var(key), Ok(value)); + } + } else { + panic!("unexpected response - expected env vars response {msg:?}"); + } + } + } + + let _ = tokio::spawn(thread_loop(receiver, codec)); +} + /// Enables file (behind `MIRRORD_FILE_OPS` option) and socket hooks. fn enable_hooks(enabled_file_ops: bool) { let mut interceptor = Interceptor::obtain(&GUM); diff --git a/tests/src/sanity.rs b/tests/src/sanity.rs index 0aceca0334c..a5dba45e1ee 100644 --- a/tests/src/sanity.rs +++ b/tests/src/sanity.rs @@ -524,7 +524,6 @@ mod tests { #[rstest] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - #[ignore] // race condition pub async fn test_remote_env_vars_exclude_works(#[future] service: EchoService) { let service = service.await; let node_command = vec![ @@ -541,7 +540,6 @@ mod tests { #[rstest] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - #[ignore] // race condition pub async fn test_remote_env_vars_include_works(#[future] service: EchoService) { let service = service.await; let node_command = vec![