From 2207fdc11a34c5e2cbc02694488ecb03d337f0c9 Mon Sep 17 00:00:00 2001 From: Mehul <65443164+infiniteregrets@users.noreply.github.com> Date: Wed, 8 Jun 2022 15:57:35 -0400 Subject: [PATCH] Add detours for fcntl/dup system calls (#115) * Add detours for fcntl/dup syscalls --- .github/workflows/ci.yaml | 4 +- CHANGELOG.md | 7 +- Cargo.lock | 18 +++- mirrord-layer/src/lib.rs | 3 +- mirrord-layer/src/sockets.rs | 137 ++++++++++++++++------------ tests/Cargo.toml | 4 +- tests/python-e2e/app.py | 49 ++++++++++ tests/src/sanity.rs | 168 +++++++++++++++++++++++------------ tests/src/utils.rs | 68 +++++++------- 9 files changed, 304 insertions(+), 154 deletions(-) create mode 100644 tests/python-e2e/app.py diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index d3a07372d27..ce722abdec6 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -128,6 +128,8 @@ jobs: - run: | sudo apt-get update -y sudo apt-get install -y libpcap-dev cmake + - uses: actions/setup-python@v3 + - run: pip3 install flask - name: start minikube uses: medyagh/setup-minikube@master with: @@ -143,7 +145,7 @@ jobs: - run: minikube image load test:latest - name: setup nginx run: kubectl apply -f tests/app.yaml - - name: run node tests + - name: cargo test run: cargo test --package tests --lib -- tests --nocapture --test-threads 1 - name: switch minikube runtime run: | diff --git a/CHANGELOG.md b/CHANGELOG.md index fe53119129c..3a15bf46ddb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,14 +7,15 @@ Check [Keep a Changelog](http://keepachangelog.com/) for recommendations on how ## [Unreleased] ### Added +- File operations are now available behind the `MIRRORD_FILE_OPS` env variable, this means that mirrord now hooks into the following file functions: `open`, `fopen`, `fdopen`, `openat`, `read`, `fread`, `fileno`, `lseek`, and `write` to provide a mirrored file system. - Support for running x64 (Intel) binary on arm (Silicon) macOS using mirrord. This will download and use the x64 mirrord-layer binary when needed. +- Add detours for fcntl/dup system calls, closes [#51](https://github.com/metalbear-co/mirrord/issues/51) ### Changed -- Added graceful exit for library extraction logic in case of error +- Add graceful exit for library extraction logic in case of error. - Refactor the CI by splitting the building of mirrord-agent in a separate job and caching the agent image for E2E tests. -- File operations are now available behind the `MIRRORD_FILE_OPS` env variable, this means that mirrord now hooks into the following file functions: `open`, `fopen`, `fdopen`, `openat`, `read`, `fread`, `fileno`, `lseek`, and `write` to provide a mirrored file system. - Update bug report template to apply to the latest version of mirrord. -- Changed release profile to strip debuginfo and enable LTO. +- Chang release profile to strip debuginfo and enable LTO. - VS Code extension - update dependencies. ### Fixed diff --git a/Cargo.lock b/Cargo.lock index 6f57faa9c63..5ffc793472f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1432,7 +1432,7 @@ dependencies = [ "containerd-client", "futures", "mirrord-protocol", - "nix", + "nix 0.23.1", "num-traits", "pcap", "pnet", @@ -1464,7 +1464,7 @@ dependencies = [ "mirrord-protocol", "multi-map", "nanoid", - "nix", + "nix 0.23.1", "os_socketaddr", "queues", "rand", @@ -1539,6 +1539,18 @@ dependencies = [ "memoffset", ] +[[package]] +name = "nix" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f17df307904acd05aa8e32e97bb20f2a0df1728bbc2d771ae8f9a90463441e9" +dependencies = [ + "bitflags", + "cfg-if", + "libc", + "memoffset", +] + [[package]] name = "nom" version = "5.1.2" @@ -2682,7 +2694,9 @@ dependencies = [ "futures", "k8s-openapi", "kube", + "lazy_static", "mirrord", + "nix 0.24.1", "reqwest", "serde", "serde_json", diff --git a/mirrord-layer/src/lib.rs b/mirrord-layer/src/lib.rs index d47556562b8..738517923dd 100644 --- a/mirrord-layer/src/lib.rs +++ b/mirrord-layer/src/lib.rs @@ -1,3 +1,4 @@ +#![feature(c_variadic)] #![feature(once_cell)] #![feature(result_option_inspect)] #![feature(const_trait_impl)] @@ -542,7 +543,7 @@ unsafe extern "C" fn close_detour(fd: c_int) -> c_int { .get() .expect("Should be set during initialization!"); - if SOCKETS.lock().unwrap().remove(&fd) { + if SOCKETS.lock().unwrap().remove(&fd).is_some() { libc::close(fd) } else if *enabled_file_ops { let remote_fd = OPEN_FILES.lock().unwrap().remove(&fd); diff --git a/mirrord-layer/src/sockets.rs b/mirrord-layer/src/sockets.rs index 8224840dcab..0b5c1a00675 100644 --- a/mirrord-layer/src/sockets.rs +++ b/mirrord-layer/src/sockets.rs @@ -1,13 +1,11 @@ //! We implement each hook function in a safe function as much as possible, having the unsafe do the //! absolute minimum use std::{ - borrow::Borrow, - collections::{HashMap, HashSet, VecDeque}, - hash::{Hash, Hasher}, + collections::{HashMap, VecDeque}, lazy::SyncLazy, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, os::unix::io::RawFd, - sync::Mutex, + sync::{Arc, Mutex}, }; use errno::{errno, set_errno, Errno}; @@ -22,8 +20,8 @@ use crate::{ HOOK_SENDER, }; -pub(crate) static SOCKETS: SyncLazy>> = - SyncLazy::new(|| Mutex::new(HashSet::new())); +pub(crate) static SOCKETS: SyncLazy>>> = + SyncLazy::new(|| Mutex::new(HashMap::new())); pub static CONNECTION_QUEUE: SyncLazy> = SyncLazy::new(|| Mutex::new(ConnectionQueue::default())); @@ -75,7 +73,7 @@ pub struct Connected { local_address: SocketAddr, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy)] pub struct Bound { address: SocketAddr, } @@ -96,34 +94,13 @@ impl Default for SocketState { #[derive(Debug)] #[allow(dead_code)] -pub(crate) struct Socket { - fd: RawFd, +pub struct Socket { domain: c_int, type_: c_int, protocol: c_int, pub state: SocketState, } -impl PartialEq for Socket { - fn eq(&self, other: &Self) -> bool { - self.fd == other.fd - } -} - -impl Eq for Socket {} - -impl Hash for Socket { - fn hash(&self, state: &mut H) { - self.fd.hash(state); - } -} - -impl Borrow for Socket { - fn borrow(&self) -> &RawFd { - &self.fd - } -} - #[inline] fn is_ignored_port(port: Port) -> bool { port == 0 || (port > 50000 && port < 60000) @@ -144,13 +121,15 @@ fn socket(domain: c_int, type_: c_int, protocol: c_int) -> RawFd { return fd; } let mut sockets = SOCKETS.lock().unwrap(); - sockets.insert(Socket { + sockets.insert( fd, - domain, - type_, - protocol, - state: SocketState::default(), - }); + Arc::new(Socket { + domain, + type_, + protocol, + state: SocketState::default(), + }), + ); fd } @@ -167,7 +146,7 @@ fn bind(sockfd: c_int, addr: *const sockaddr, addrlen: socklen_t) -> c_int { debug!("bind called sockfd: {:?}", sockfd); let mut socket = { let mut sockets = SOCKETS.lock().unwrap(); - match sockets.take(&sockfd) { + match sockets.remove(&sockfd) { Some(socket) if !matches!(socket.state, SocketState::Initialized) => { error!("socket is in invalid state for bind {:?}", socket.state); return libc::EINVAL; @@ -195,12 +174,12 @@ fn bind(sockfd: c_int, addr: *const sockaddr, addrlen: socklen_t) -> c_int { return unsafe { libc::bind(sockfd, addr, addrlen) }; } - socket.state = SocketState::Bound(Bound { + Arc::get_mut(&mut socket).unwrap().state = SocketState::Bound(Bound { address: parsed_addr, }); let mut sockets = SOCKETS.lock().unwrap(); - sockets.insert(socket); + sockets.insert(sockfd, socket); 0 } @@ -220,7 +199,7 @@ fn listen(sockfd: RawFd, _backlog: c_int) -> c_int { debug!("listen called"); let mut socket = { let mut sockets = SOCKETS.lock().unwrap(); - match sockets.take(&sockfd) { + match sockets.remove(&sockfd) { Some(socket) => socket, None => { debug!("listen: no socket found for fd: {}", &sockfd); @@ -228,10 +207,10 @@ fn listen(sockfd: RawFd, _backlog: c_int) -> c_int { } } }; - match socket.state { + match &socket.state { SocketState::Bound(bound) => { let real_port = bound.address.port(); - socket.state = SocketState::Listening(bound); + Arc::get_mut(&mut socket).unwrap().state = SocketState::Listening(*bound); let mut os_addr = match socket.domain { libc::AF_INET => { OsSocketAddr::from(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0)) @@ -304,7 +283,7 @@ fn listen(sockfd: RawFd, _backlog: c_int) -> c_int { } debug!("listen: success"); let mut sockets = SOCKETS.lock().unwrap(); - sockets.insert(socket); + sockets.insert(sockfd, socket); 0 } @@ -319,7 +298,7 @@ fn connect(sockfd: RawFd, address: *const sockaddr, len: socklen_t) -> c_int { let socket = { let mut sockets = SOCKETS.lock().unwrap(); - match sockets.take(&sockfd) { + match sockets.remove(&sockfd) { Some(socket) => socket, None => { debug!("connect: no socket found for fd: {}", &sockfd); @@ -329,7 +308,7 @@ fn connect(sockfd: RawFd, address: *const sockaddr, len: socklen_t) -> c_int { }; // We don't handle this socket, so restore state if there was any. (delay execute bind) - if let SocketState::Bound(bound) = socket.state { + if let SocketState::Bound(bound) = &socket.state { let os_addr = OsSocketAddr::from(bound.address); let ret = unsafe { libc::bind(sockfd, os_addr.as_ptr(), os_addr.len()) }; if ret != 0 { @@ -459,16 +438,10 @@ fn accept( address_len: *mut socklen_t, new_fd: RawFd, ) -> RawFd { - let (origin_fd, local_address, domain, protocol, type_) = { + let (local_address, domain, protocol, type_) = { if let Some(socket) = SOCKETS.lock().unwrap().get(&sockfd) { if let SocketState::Listening(bound) = &socket.state { - ( - socket.fd, - bound.address, - socket.domain, - socket.protocol, - socket.type_, - ) + (bound.address, socket.domain, socket.protocol, socket.type_) } else { error!("original socket is not listening"); return new_fd; @@ -478,7 +451,7 @@ fn accept( return new_fd; } }; - let socket_info = { CONNECTION_QUEUE.lock().unwrap().get(&origin_fd) }; + let socket_info = { CONNECTION_QUEUE.lock().unwrap().get(&sockfd) }; let remote_address = match socket_info { Some(socket_info) => socket_info, None => { @@ -488,7 +461,6 @@ fn accept( } .address; let new_socket = Socket { - fd: new_fd, domain, protocol, type_, @@ -499,7 +471,7 @@ fn accept( }; fill_address(address, address_len, remote_address); - SOCKETS.lock().unwrap().insert(new_socket); + SOCKETS.lock().unwrap().insert(new_fd, Arc::new(new_socket)); new_fd } @@ -533,17 +505,72 @@ unsafe extern "C" fn accept4_detour( } } +fn fcntl(orig_fd: c_int, cmd: c_int, fcntl_fd: i32) -> c_int { + if fcntl_fd == -1 { + error!("fcntl failed"); + return fcntl_fd; + } + match cmd { + libc::F_DUPFD | libc::F_DUPFD_CLOEXEC => { + dup(orig_fd, fcntl_fd); + } + _ => (), + } + fcntl_fd +} + +unsafe extern "C" fn fcntl_detour(fd: c_int, cmd: c_int, arg: ...) -> c_int { + let fcntl_fd = libc::fcntl(fd, cmd, arg); + fcntl(fd, cmd, fcntl_fd) +} + +fn dup(fd: c_int, dup_fd: i32) -> c_int { + if dup_fd == -1 { + error!("dup failed"); + return dup_fd; + } + let mut sockets = SOCKETS.lock().unwrap(); + if let Some(socket) = sockets.get(&fd) { + let dup_socket = socket.clone(); + sockets.insert(dup_fd as RawFd, dup_socket); + } + dup_fd +} + +unsafe extern "C" fn dup_detour(fd: c_int) -> c_int { + let dup_fd = libc::dup(fd); + dup(fd, dup_fd) +} + +unsafe extern "C" fn dup2_detour(oldfd: c_int, newfd: c_int) -> c_int { + if oldfd == newfd { + return newfd; + } + let dup2_fd = libc::dup2(oldfd, newfd); + dup(oldfd, dup2_fd) +} + +#[cfg(target_os = "linux")] +unsafe extern "C" fn dup3_detour(oldfd: c_int, newfd: c_int, flags: c_int) -> c_int { + let dup3_fd = libc::dup3(oldfd, newfd, flags); + dup(oldfd, dup3_fd) +} + pub fn enable_socket_hooks(interceptor: &mut Interceptor) { hook!(interceptor, "socket", socket_detour); hook!(interceptor, "bind", bind_detour); hook!(interceptor, "listen", listen_detour); hook!(interceptor, "connect", connect_detour); + hook!(interceptor, "fcntl", fcntl_detour); + hook!(interceptor, "dup", dup_detour); + hook!(interceptor, "dup2", dup2_detour); try_hook!(interceptor, "getpeername", getpeername_detour); try_hook!(interceptor, "getsockname", getsockname_detour); #[cfg(target_os = "linux")] { try_hook!(interceptor, "uv__accept4", accept4_detour); try_hook!(interceptor, "accept4", accept4_detour); + try_hook!(interceptor, "dup3", dup3_detour); } try_hook!(interceptor, "accept", accept_detour); } diff --git a/tests/Cargo.toml b/tests/Cargo.toml index f7b8b4f7f82..c2376b6144f 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -16,4 +16,6 @@ tokio = { version = "1", features = ["macros"] } serde_json = "1.0.79" mirrord = { artifact = "bin", bin = true, path = "../mirrord-cli" } serde = "1.0.137" -futures = "0.3.21" \ No newline at end of file +futures = "0.3.21" +lazy_static = "1.4.0" +nix = "0.24.1" \ No newline at end of file diff --git a/tests/python-e2e/app.py b/tests/python-e2e/app.py new file mode 100644 index 00000000000..6110273cc8e --- /dev/null +++ b/tests/python-e2e/app.py @@ -0,0 +1,49 @@ +from os import getcwd, remove +import click +from flask import Flask, request +import logging +import sys + +log = logging.getLogger('werkzeug') +log.disabled = True + +cli = sys.modules['flask.cli'] + +cli.show_server_banner = lambda *x: click.echo("Server listening on port 80") + +app = Flask(__name__) + +TEXT = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum." +PATH = getcwd() + "/test" + + +@app.route('/', methods=['GET']) +def get(): + print("GET: Request completed", flush=True) + return "OK" + + +@app.route('/', methods=['POST']) +def post(): + if str(request.data, 'utf-8') == TEXT: + print("POST: Request completed", flush=True) + return "OK" + + +@app.route('/', methods=['PUT']) +def put(): + with open(PATH, "w") as f: + f.write(TEXT) + print("PUT: Request completed", flush=True) + return "OK" + + +@app.route('/', methods=['DELETE']) +def delete(): + remove(PATH) + print("DELETE: Request completed", flush=True) + return "OK" + + +if __name__ == '__main__': + app.run(host="0.0.0.0", port=80) diff --git a/tests/src/sanity.rs b/tests/src/sanity.rs index 1acfb27c482..d49e6cea6f4 100644 --- a/tests/src/sanity.rs +++ b/tests/src/sanity.rs @@ -6,6 +6,10 @@ mod tests { use k8s_openapi::api::{batch::v1::Job, core::v1::Pod}; use kube::{api::ListParams, Api}; + use nix::{ + sys::signal::{self, Signal}, + unistd::Pid, + }; use tokio::{ io::{AsyncBufReadExt, AsyncReadExt, BufReader}, time::{timeout, Duration}, @@ -13,21 +17,29 @@ mod tests { use crate::utils::*; - // actual test function used with "containerd", "docker" runtimes - // starts the node(express.js) api server, sends four different requests, validates data, - // stops the server and validates if the agent job and pod are deleted #[tokio::test] async fn test_complete_node_api() { + _test_complete_api("node").await; + } + + #[tokio::test] + async fn test_complete_python_api() { + _test_complete_api("python").await; + } + + /// Starts the Node(express.js)/Python(flask) api server, sends four different requests, + /// validates data, stops the server and validates if the agent job and pod are deleted. + async fn _test_complete_api(server: &str) { let client = setup_kube_client().await; let pod_namespace = "default"; let env: HashMap<&str, &str> = HashMap::new(); // for adding more environment variables - let mut server = test_server_init(&client, pod_namespace, env).await; + let mut server = test_server_init(&client, pod_namespace, env, server).await; let service_url = get_service_url(&client, pod_namespace).await.unwrap(); let mut stderr_reader = BufReader::new(server.stderr.take().unwrap()); - let child_stdout = server.stdout.take().unwrap(); + let mut stdout_reader = BufReader::new(server.stdout.take().unwrap()); // Note: to run a task in the background, don't await on it. // spawn returns a JoinHandle @@ -45,27 +57,39 @@ mod tests { } }); - // since we are reading from the stdout, we could block at any point in case the server - // does not write to its stdout, so we need a timeout here - let validation_timeout = Duration::from_secs(20); - timeout( - validation_timeout, - validate_requests(child_stdout, service_url.as_str()), - ) + let mut is_running = String::new(); + let start_timeout = Duration::from_secs(10); + + timeout(start_timeout, async { + stdout_reader.read_line(&mut is_running).await.unwrap(); + assert_eq!(is_running, "Server listening on port 80\n"); + }) .await .unwrap(); - server.kill().await.unwrap(); + + send_requests(service_url.as_str()).await; + + // Note: Sending a SIGTERM adds an EOF to the stdout stream, so we can read it without + // blocking. + signal::kill( + Pid::from_raw(server.id().unwrap().try_into().unwrap()), + Signal::SIGTERM, + ) + .unwrap(); + + server.wait().await.unwrap(); + validate_requests(&mut stdout_reader).await; let jobs_api: Api = Api::namespaced(client.clone(), "default"); let jobs = jobs_api.list(&ListParams::default()).await.unwrap(); // assuming only one job is running - // to make the tests parallel we need to figure a way to get the exact job name - // when len() > 1 - assert_eq!(jobs.items.len(), 1); + // to make the tests parallel we need to figure a way to get the exact job name when len() > + // 1 + assert!(jobs.items.len() > 0); let pods_api: Api = Api::namespaced(client.clone(), "default"); let pods = pods_api.list(&ListParams::default()).await.unwrap(); - assert_eq!(pods.items.len(), 2); + assert!(pods.items.len() > 1); let cleanup_timeout = Duration::from_secs(35); timeout( @@ -90,16 +114,16 @@ mod tests { } #[tokio::test] - // we send a request to a different pod in the cluster (different namespace) and assert - // that no operation is performed as specified in the request by the server - // as the agent pod is impersonating the pod running in the default namespace + /// Sends a request to a different pod in the cluster (different namespace) and asserts + /// that no operation is performed as specified in the request by the server + /// as the agent pod is impersonating the pod running in the default namespace async fn test_different_pod_in_cluster() { let client = setup_kube_client().await; let test_namespace = "test-namespace"; let pod_namespace = "default"; let env: HashMap<&str, &str> = HashMap::new(); - let mut server = test_server_init(&client, pod_namespace, env).await; + let mut server = test_server_init(&client, pod_namespace, env, "node").await; create_namespace(&client, test_namespace).await; create_nginx_pod(&client, test_namespace).await; @@ -107,6 +131,8 @@ mod tests { let service_url = get_service_url(&client, test_namespace).await.unwrap(); let mut stderr_reader = BufReader::new(server.stderr.take().unwrap()); + let mut stdout_reader = BufReader::new(server.stdout.take().unwrap()); + tokio::spawn(async move { loop { let mut error_stream = String::new(); @@ -117,38 +143,41 @@ mod tests { } }); - let child_stdout = server.stdout.take().unwrap(); - let timeout_duration = Duration::from_secs(10); - timeout( - timeout_duration, - validate_no_requests(child_stdout, service_url.as_str()), - ) + let mut is_running = String::new(); + let start_timeout = Duration::from_secs(10); + + timeout(start_timeout, async { + stdout_reader.read_line(&mut is_running).await.unwrap(); + assert_eq!(is_running, "Server listening on port 80\n"); + }) .await .unwrap(); + validate_no_requests(service_url.as_str()).await; + server.kill().await.unwrap(); delete_namespace(&client, test_namespace).await; } // agent namespace tests #[tokio::test] - // creates a new k8s namespace, starts the API server with env: - // MIRRORD_AGENT_NAMESPACE=namespace, asserts that the agent job and pod are created - // validate data through requests to the API server + /// Creates a new k8s namespace, starts the API server with env: + /// MIRRORD_AGENT_NAMESPACE=namespace, asserts that the agent job and pod are created + /// validate data through requests to the API server async fn test_good_agent_namespace() { let client = setup_kube_client().await; let agent_namespace = "test-namespace-agent-good"; let pod_namespace = "default"; let env = HashMap::from([("MIRRORD_AGENT_NAMESPACE", agent_namespace)]); - let mut server = test_server_init(&client, pod_namespace, env).await; + let mut server = test_server_init(&client, pod_namespace, env, "node").await; create_namespace(&client, agent_namespace).await; let service_url = get_service_url(&client, "default").await.unwrap(); let mut stderr_reader = BufReader::new(server.stderr.take().unwrap()); - let child_stdout = server.stdout.take().unwrap(); + let mut stdout_reader = BufReader::new(server.stdout.take().unwrap()); tokio::spawn(async move { loop { @@ -160,14 +189,26 @@ mod tests { } }); - let validation_timeout = Duration::from_secs(20); - timeout( - validation_timeout, - validate_requests(child_stdout, service_url.as_str()), - ) + let mut is_running = String::new(); + let start_timeout = Duration::from_secs(10); + + timeout(start_timeout, async { + stdout_reader.read_line(&mut is_running).await.unwrap(); + assert_eq!(is_running, "Server listening on port 80\n"); + }) .await .unwrap(); - server.kill().await.unwrap(); + + send_requests(service_url.as_str()).await; + + signal::kill( + Pid::from_raw(server.id().unwrap().try_into().unwrap()), + Signal::SIGTERM, + ) + .unwrap(); + + server.wait().await.unwrap(); + validate_requests(&mut stdout_reader).await; let jobs_api: Api = Api::namespaced(client.clone(), agent_namespace); let jobs = jobs_api.list(&ListParams::default()).await.unwrap(); @@ -180,14 +221,14 @@ mod tests { } #[tokio::test] - // starts the API server with env: MIRRORD_AGENT_NAMESPACE=namespace (nonexistent), - // asserts the process crashes: "NotFound" as the namespace does not exist + /// Starts the API server with env: MIRRORD_AGENT_NAMESPACE=namespace (nonexistent), + /// asserts the process crashes: "NotFound" as the namespace does not exist async fn test_nonexistent_agent_namespace() { let client = setup_kube_client().await; let agent_namespace = "nonexistent-namespace"; let pod_namespace = "default"; let env = HashMap::from([("MIRRORD_AGENT_NAMESPACE", agent_namespace)]); - let mut server = test_server_init(&client, pod_namespace, env).await; + let mut server = test_server_init(&client, pod_namespace, env, "node").await; let mut stderr_reader = BufReader::new(server.stderr.take().unwrap()); @@ -215,9 +256,9 @@ mod tests { // pod namespace tests #[tokio::test] - // creates a new k8s namespace, starts the API server with env: - // MIRRORD_AGENT_IMPERSONATED_POD_NAMESPACE=namespace, validates data sent through - // requests + /// Creates a new k8s namespace, starts the API server with env: + /// MIRRORD_AGENT_IMPERSONATED_POD_NAMESPACE=namespace, validates data sent through + /// requests async fn test_good_pod_namespace() { let client = setup_kube_client().await; @@ -226,11 +267,13 @@ mod tests { create_nginx_pod(&client, pod_namespace).await; let env = HashMap::from([("MIRRORD_AGENT_IMPERSONATED_POD_NAMESPACE", pod_namespace)]); - let mut server = test_server_init(&client, pod_namespace, env).await; + let mut server = test_server_init(&client, pod_namespace, env, "node").await; let service_url = get_service_url(&client, pod_namespace).await.unwrap(); let mut stderr_reader = BufReader::new(server.stderr.take().unwrap()); + let mut stdout_reader = BufReader::new(server.stdout.take().unwrap()); + tokio::spawn(async move { loop { let mut error_stream = String::new(); @@ -241,25 +284,34 @@ mod tests { } }); - let child_stdout = server.stdout.take().unwrap(); + let mut is_running = String::new(); + let start_timeout = Duration::from_secs(10); - let validation_timeout = Duration::from_secs(20); - timeout( - validation_timeout, - validate_requests(child_stdout, service_url.as_str()), - ) + timeout(start_timeout, async { + stdout_reader.read_line(&mut is_running).await.unwrap(); + assert_eq!(is_running, "Server listening on port 80\n"); + }) .await .unwrap(); - server.kill().await.unwrap(); + + send_requests(service_url.as_str()).await; + + signal::kill( + Pid::from_raw(server.id().unwrap().try_into().unwrap()), + Signal::SIGTERM, + ) + .unwrap(); + + server.wait().await.unwrap(); + validate_requests(&mut stdout_reader).await; + delete_namespace(&client, pod_namespace).await; } - // TODO: This test fails: errors out with `error_stream.contains(\"NotFound\")` when running - // with docker runtime. #[tokio::test] - // starts the API server with env: MIRRORD_AGENT_IMPERSONATED_POD_NAMESPACE=namespace - // (nonexistent), asserts the process crashes: "NotFound" as the namespace does not - // exist + /// Starts the API server with env: MIRRORD_AGENT_IMPERSONATED_POD_NAMESPACE=namespace + /// (nonexistent), asserts the process crashes: "NotFound" as the namespace does not + /// exist async fn test_bad_pod_namespace() { let client = setup_kube_client().await; @@ -268,7 +320,7 @@ mod tests { "MIRRORD_AGENT_IMPERSONATED_POD_NAMESPACE", "nonexistent-namespace", )]); - let mut server = test_server_init(&client, pod_namespace, env).await; + let mut server = test_server_init(&client, pod_namespace, env, "node").await; let mut stderr_reader = BufReader::new(server.stderr.take().unwrap()); let timeout_duration = Duration::from_secs(5); diff --git a/tests/src/utils.rs b/tests/src/utils.rs index 2aa8e8ede66..20e88f2f32d 100644 --- a/tests/src/utils.rs +++ b/tests/src/utils.rs @@ -13,19 +13,27 @@ use kube::{ core::WatchEvent, Api, Client, Config, }; +use lazy_static::lazy_static; use reqwest::{Method, StatusCode}; use serde::de::DeserializeOwned; use serde_json::json; use tokio::{ - io::{AsyncBufReadExt, BufReader}, + io::{AsyncReadExt, BufReader}, process::{Child, ChildStdout, Command}, time::{sleep, Duration}, }; static TEXT: &str = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum."; +lazy_static! { + static ref SERVERS: HashMap<&'static str, Vec<&'static str>> = HashMap::from([ + ("python", vec!["python3", "python-e2e/app.py"]), + ("node", vec!["node", "node-e2e/app.js"]) + ]); +} + // target/debug/mirrord exec --pod-name pod_name -c binary command -pub fn start_node_server(pod_name: &str, command: Vec<&str>, env: HashMap<&str, &str>) -> Child { +pub fn start_server(pod_name: &str, command: Vec<&str>, env: HashMap<&str, &str>) -> Child { let path = env!("CARGO_BIN_FILE_MIRRORD"); let args: Vec<&str> = vec!["exec", "--pod-name", pod_name, "-c"] .into_iter() @@ -231,48 +239,41 @@ pub async fn watch_resource_exists(api: Api } } -// to all requests, the express API prints {request_name}: Request completed -// PUT - creates cwd/test, DELETE - deletes cwd/test -// this is verified by reading the stdout of the server -pub async fn validate_requests(stdout: ChildStdout, service_url: &str) { - let mut buffer = BufReader::new(stdout); - let mut stream = String::new(); - buffer.read_line(&mut stream).await.unwrap(); - - println!("validate_requests -> stream is {:#?}", stream); - - assert!(stream.contains("Server listening on port 80")); - +// Sends GET, POST, PUT, and DELETE requests to the given service URL -> Express/Flask server. +// PUT creates a file named "test" in cwd and DELETE deletes it. +pub async fn send_requests(service_url: &str) { http_request(service_url, Method::GET).await; - buffer.read_line(&mut stream).await.unwrap(); - assert!(stream.contains("GET: Request completed")); - http_request(service_url, Method::POST).await; - buffer.read_line(&mut stream).await.unwrap(); - assert!(stream.contains("POST: Request completed")); + + http_request(service_url, Method::PUT).await; let cwd = env::current_dir().unwrap(); let path = cwd.join("test"); // 'test' is created in cwd, by PUT and deleted by DELETE - http_request(service_url, Method::PUT).await; - sleep(Duration::from_secs(2)).await; // Todo: remove this sleep and replace with a filesystem watcher + sleep(Duration::from_secs(5)).await; // Todo: remove this sleep and replace with a filesystem watcher assert!(path.exists()); - buffer.read_line(&mut stream).await.unwrap(); - assert!(stream.contains("PUT: Request completed")); http_request(service_url, Method::DELETE).await; - sleep(Duration::from_secs(2)).await; + + sleep(Duration::from_secs(5)).await; assert!(!path.exists()); - buffer.read_line(&mut stream).await.unwrap(); - assert!(stream.contains("DELETE: Request completed")); } -pub async fn validate_no_requests(stdout: ChildStdout, service_url: &str) { - let mut buffer = BufReader::new(stdout); - let mut stream = String::new(); - buffer.read_line(&mut stream).await.unwrap(); - assert!(stream.contains("Server listening on port 80")); +/// For all requests, the Express/Flask server prints "{request_name}: Request completed", +/// this is verified by reading the stdout of the server +pub async fn validate_requests(stdout: &mut BufReader) { + let mut out = String::new(); + stdout.read_to_string(&mut out).await.unwrap(); + + // Todo: change this assertions to assert_eq! when TCPClose is patched + + assert!(out.contains("GET: Request completed")); + assert!(out.contains("POST: Request completed")); + assert!(out.contains("PUT: Request completed")); + assert!(out.contains("DELETE: Request completed")); +} +pub async fn validate_no_requests(service_url: &str) { let cwd = env::current_dir().unwrap(); let path = cwd.join("test"); @@ -286,15 +287,16 @@ pub async fn test_server_init( client: &Client, pod_namespace: &str, mut env: HashMap<&str, &str>, + server: &str, ) -> Child { let pod_name = get_nginx_pod_name(client, pod_namespace).await.unwrap(); - let command = vec!["node", "node-e2e/app.js"]; + let command = SERVERS.get(server).unwrap().clone(); // used by the CI, to load the image locally: // docker build -t test . -f mirrord-agent/Dockerfile // minikube load image test:latest env.insert("MIRRORD_AGENT_IMAGE", "test"); env.insert("MIRRORD_CHECK_VERSION", "false"); - let server = start_node_server(&pod_name, command, env); + let server = start_server(&pod_name, command, env); setup_panic_hook(); server }