From 0b42ec9e8ccf96a60834b7358a878e44ada69e5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Smolarek?= <34063647+Razz4780@users.noreply.github.com> Date: Tue, 14 May 2024 13:34:49 +0200 Subject: [PATCH] Fix outgoing connect on bound sockets (#2440) * Integration test * Fix case 1 * Fix case 2 * Changelog entry * Fix CI workflow * Doc + comment --- .github/workflows/ci.yaml | 6 ++ Cargo.lock | 7 +++ Cargo.toml | 1 + changelog.d/2438.fixed.md | 1 + mirrord/layer/src/detour.rs | 3 - mirrord/layer/src/socket/ops.rs | 51 ++++++++-------- mirrord/layer/tests/apps/issue2438/Cargo.toml | 7 +++ .../layer/tests/apps/issue2438/src/main.rs | 49 +++++++++++++++ mirrord/layer/tests/common/mod.rs | 12 +++- mirrord/layer/tests/outgoing.rs | 59 +++++++++++++++++++ 10 files changed, 167 insertions(+), 29 deletions(-) create mode 100644 changelog.d/2438.fixed.md create mode 100644 mirrord/layer/tests/apps/issue2438/Cargo.toml create mode 100644 mirrord/layer/tests/apps/issue2438/src/main.rs diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 6e0c56f2787..e2b4455ffe2 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -309,6 +309,9 @@ jobs: - run: | cd mirrord/layer/tests/apps/issue2001 cargo build + - run: | + cd mirrord/layer/tests/apps/issue2438 + cargo build - run: ./scripts/build_c_apps.sh - run: cargo test --target x86_64-unknown-linux-gnu -p mirrord-layer - name: mirrord protocol UT @@ -408,6 +411,9 @@ jobs: - run: | cd mirrord/layer/tests/apps/issue2001 cargo build + - run: | + cd mirrord/layer/tests/apps/issue2438 + cargo build - run: ./scripts/build_c_apps.sh # For the `java_temurin_sip` test. - uses: sdkman/sdkman-action@b1f9b696c79148b66d3d3a06f7ea801820318d0f diff --git a/Cargo.lock b/Cargo.lock index bf606853d4c..c2b01004610 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2815,6 +2815,13 @@ dependencies = [ "libc", ] +[[package]] +name = "issue2438" +version = "0.1.0" +dependencies = [ + "nix 0.24.3", +] + [[package]] name = "itertools" version = "0.10.5" diff --git a/Cargo.toml b/Cargo.toml index 861e7d05491..4d64582a6c1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ members = [ "mirrord/layer/tests/apps/issue1776portnot53", "mirrord/layer/tests/apps/issue1899", "mirrord/layer/tests/apps/issue2001", + "mirrord/layer/tests/apps/issue2438", "sample/rust", "medschool", "tests", diff --git a/changelog.d/2438.fixed.md b/changelog.d/2438.fixed.md new file mode 100644 index 00000000000..b2a25ab5e99 --- /dev/null +++ b/changelog.d/2438.fixed.md @@ -0,0 +1 @@ +Fixed a bug where outgoing connections where not intercepted from bound sockets. \ No newline at end of file diff --git a/mirrord/layer/src/detour.rs b/mirrord/layer/src/detour.rs index 88b32376a56..e2f3f8c3165 100644 --- a/mirrord/layer/src/detour.rs +++ b/mirrord/layer/src/detour.rs @@ -195,9 +195,6 @@ pub(crate) enum Bypass { /// is not exposed by a service, so bind locally. BindWhenTargetless, - /// Hooked connect from a bound mirror socket. - MirrorConnect, - /// Hooked a `connect` to a target that is disabled in the configuration. DisabledOutgoing, diff --git a/mirrord/layer/src/socket/ops.rs b/mirrord/layer/src/socket/ops.rs index 6314d7a1e11..20d26b99b1f 100644 --- a/mirrord/layer/src/socket/ops.rs +++ b/mirrord/layer/src/socket/ops.rs @@ -23,6 +23,7 @@ use mirrord_protocol::{ dns::{GetAddrInfoRequest, LookupRecord}, file::{OpenFileResponse, OpenOptionsInternal, ReadFileResponse}, }; +use nix::sys::socket::{sockopt, SockaddrLike, SockaddrStorage}; use socket2::SockAddr; use tracing::{error, trace}; @@ -202,8 +203,9 @@ fn is_ignored_tcp_port(addr: &SocketAddr, config: &IncomingConfig) -> bool { is_ignored_port(addr) || (not_stolen_with_filter && not_whitelisted) } -/// Check if the socket is managed by us, if it's managed by us and it's not an ignored port, -/// update the socket state. +/// If the socket is not found in [`SOCKETS`], bypass. +/// Otherwise, if it's not an ignored port, bind (possibly with a fallback to random port) and +/// update socket state in [`SOCKETS`]. If it's an ignored port, remove the socket from [`SOCKETS`]. #[mirrord_layer_macro::instrument(level = "trace", ret, skip(raw_address))] pub(super) fn bind( sockfd: c_int, @@ -538,10 +540,27 @@ pub(super) fn connect( trace!("in connect {:#?}", SOCKETS); - let (_, user_socket_info) = { - SOCKETS - .remove(&sockfd) - .ok_or(Bypass::LocalFdNotFound(sockfd))? + let user_socket_info = match SOCKETS.remove(&sockfd) { + Some((_, socket)) => socket, + None => { + // Socket was probably removed from `SOCKETS` in `bind` detour (as not interesting in + // terms of `incoming` feature). + // Here we just recreate `UserSocket` using domain and type fetched from the descriptor + // we have. + let domain = nix::sys::socket::getsockname::(sockfd) + .map_err(io::Error::from)? + .family() + .map(|family| family as i32) + .unwrap_or(-1); + if domain != libc::AF_INET && domain != libc::AF_UNIX { + return Detour::Bypass(Bypass::Domain(domain)); + } + let type_ = nix::sys::socket::getsockopt(sockfd, sockopt::SockType) + .map_err(io::Error::from)? as i32; + let kind = SocketKind::try_from(type_)?; + + Arc::new(UserSocket::new(domain, type_, 0, Default::default(), kind)) + } }; if let Some(ip_address) = optional_ip_address { @@ -598,7 +617,7 @@ pub(super) fn connect( ), NetProtocol::Stream => match user_socket_info.state { - SocketState::Initialized + SocketState::Initialized | SocketState::Bound(..) if (optional_ip_address.is_some() && enabled_tcp_outgoing) || (remote_address.is_unix() && !unix_streams.is_empty()) => { @@ -610,24 +629,6 @@ pub(super) fn connect( ) } - SocketState::Bound(Bound { address, .. }) => { - trace!("connect -> SocketState::Bound {:#?}", user_socket_info); - - let address = SockAddr::from(address); - let bind_result = unsafe { FN_BIND(sockfd, address.as_ptr(), address.len()) }; - - if bind_result != 0 { - error!( - "connect -> Failed to bind socket result {:?}, address: {:?}, sockfd: {:?}!", - bind_result, address, sockfd - ); - - Err(io::Error::last_os_error())? - } else { - Detour::Bypass(Bypass::MirrorConnect) - } - } - _ => Detour::Bypass(Bypass::DisabledOutgoing), }, diff --git a/mirrord/layer/tests/apps/issue2438/Cargo.toml b/mirrord/layer/tests/apps/issue2438/Cargo.toml new file mode 100644 index 00000000000..99fe15d595f --- /dev/null +++ b/mirrord/layer/tests/apps/issue2438/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "issue2438" +version = "0.1.0" +edition = "2021" + +[dependencies] +nix.workspace = true diff --git a/mirrord/layer/tests/apps/issue2438/src/main.rs b/mirrord/layer/tests/apps/issue2438/src/main.rs new file mode 100644 index 00000000000..af0bbfac855 --- /dev/null +++ b/mirrord/layer/tests/apps/issue2438/src/main.rs @@ -0,0 +1,49 @@ +//! Attempts to connect to in-cluster peer `1.1.1.1:4567` from bound TCP sockets. Tests 2 cases: +//! 1. Socket is first bound to an address `0.0.0.0:0`, that triggers a bypass in `bind` detour +//! (socket **could** later be used for port subscription) +//! 2. Socket is first bound to an address `0.0.0.0:80`, that goes through `bind` detour completely +//! (socket **could not** later be used for port subscription) + +use std::{ + io::{Read, Write}, + net::{SocketAddr, TcpStream}, + os::fd::FromRawFd, +}; + +use nix::sys::socket::{self, AddressFamily, SockFlag, SockType, SockaddrStorage}; +fn main() { + let bind_addresses: [SocketAddr; 2] = + ["0.0.0.0:0".parse().unwrap(), "0.0.0.0:80".parse().unwrap()]; + + let peer_address = "1.1.1.1:4567".parse::().unwrap(); + + for bind_address in bind_addresses { + let sockfd = socket::socket( + AddressFamily::Inet, + SockType::Stream, + SockFlag::empty(), + None, + ) + .unwrap(); + println!("SOCKET CREATED: {sockfd}"); + + socket::bind(sockfd, &SockaddrStorage::from(bind_address)).unwrap(); + println!("SOCKET BOUND TO {bind_address}"); + + socket::connect(sockfd, &SockaddrStorage::from(peer_address)).unwrap(); + println!("SOCKET CONNECTED TO {peer_address}"); + + let mut stream = unsafe { TcpStream::from_raw_fd(sockfd) }; + assert_eq!(stream.peer_addr().unwrap(), peer_address); + println!("`TcpStream::peer_addr()` RESULT AS EXPECTED"); + + let message = b"hello there"; + let bytes_written = stream.write(message).unwrap(); + assert_eq!(bytes_written, message.len(), "partial write"); + + let mut buf = vec![]; + stream.read_to_end(&mut buf).unwrap(); + assert_eq!(buf.as_slice(), message); + println!("RECEIVED ECHO"); + } +} diff --git a/mirrord/layer/tests/common/mod.rs b/mirrord/layer/tests/common/mod.rs index 4c708ff3415..1389b2071f8 100644 --- a/mirrord/layer/tests/common/mod.rs +++ b/mirrord/layer/tests/common/mod.rs @@ -657,6 +657,7 @@ pub enum Application { Realpath, NodeIssue2283, RustIssue2204, + RustIssue2438, // For running applications with the executable and arguments determined at runtime. DynamicApp(String, Vec), } @@ -781,6 +782,13 @@ impl Application { "../../target/debug/issue1899" ) } + Application::RustIssue2438 => { + format!( + "{}/{}", + env!("CARGO_MANIFEST_DIR"), + "../../target/debug/issue2438" + ) + } Application::RustIssue2001 => { format!( "{}/{}", @@ -902,7 +910,8 @@ impl Application { | Application::OpenFile | Application::CIssue2055 | Application::CIssue2178 - | Application::RustIssue2204 => vec![], + | Application::RustIssue2204 + | Application::RustIssue2438 => vec![], Application::RustOutgoingUdp => ["--udp", RUST_OUTGOING_LOCAL, RUST_OUTGOING_PEERS] .into_iter() .map(Into::into) @@ -973,6 +982,7 @@ impl Application { | Application::CIssue2178 | Application::NodeIssue2283 | Application::RustIssue2204 + | Application::RustIssue2438 | Application::DynamicApp(..) => unimplemented!("shouldn't get here"), Application::PythonSelfConnect => 1337, Application::RustIssue2058 => 1234, diff --git a/mirrord/layer/tests/outgoing.rs b/mirrord/layer/tests/outgoing.rs index 1af552dcbd7..72e7fdfdc56 100644 --- a/mirrord/layer/tests/outgoing.rs +++ b/mirrord/layer/tests/outgoing.rs @@ -182,3 +182,62 @@ async fn outgoing_tcp_from_the_local_app_broken( ) { outgoing_tcp_logic(with_config, dylib_path, config_dir).await; } + +/// Tests that outgoing connections are properly handled on sockets that were bound by the user +/// application. +#[rstest] +#[tokio::test] +#[timeout(Duration::from_secs(10))] +async fn outgoing_tcp_bound_socket(dylib_path: &PathBuf) { + let (mut test_process, mut intproxy) = Application::RustIssue2438 + .start_process_with_layer(dylib_path, vec![], None) + .await; + + let expected_peer_address = "1.1.1.1:4567".parse::().unwrap(); + + // Test apps runs logic twice for 2 bind addresses. + for _ in 0..2 { + let msg = intproxy.recv().await; + let ClientMessage::TcpOutgoing(LayerTcpOutgoing::Connect(LayerConnect { + remote_address: SocketAddress::Ip(addr), + })) = msg + else { + panic!("Invalid message received from layer: {msg:?}"); + }; + assert_eq!(addr, expected_peer_address); + + intproxy + .send(DaemonMessage::TcpOutgoing(DaemonTcpOutgoing::Connect(Ok( + DaemonConnect { + connection_id: 0, + remote_address: addr.into(), + local_address: "1.2.3.4:6000".parse::().unwrap().into(), + }, + )))) + .await; + + let msg = intproxy.recv().await; + let ClientMessage::TcpOutgoing(LayerTcpOutgoing::Write(LayerWrite { + connection_id: 0, + bytes, + })) = msg + else { + panic!("Invalid message received from layer: {msg:?}"); + }; + + intproxy + .send(DaemonMessage::TcpOutgoing(DaemonTcpOutgoing::Read(Ok( + DaemonRead { + connection_id: 0, + bytes, + }, + )))) + .await; + + intproxy + .send(DaemonMessage::TcpOutgoing(DaemonTcpOutgoing::Close(0))) + .await; + } + + test_process.wait_assert_success().await; +}