Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send a copy of the SOCKETS to intproxy, and have it initialize a new layer with those SOCKETS. #2566

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions mirrord/intproxy/protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{
collections::HashMap,
fmt,
net::{IpAddr, SocketAddr},
os::fd::RawFd,
};

use bincode::{Decode, Encode};
Expand All @@ -23,10 +24,12 @@ use mirrord_protocol::{
tcp::StealType,
FileRequest, FileResponse, GetEnvVarsRequest, Port, RemoteResult,
};
use net::UserSocket;

#[cfg(feature = "codec")]
pub mod codec;
mod macros;
pub mod net;

/// An identifier for a message sent from the layer to the internal proxy.
/// The layer uses this to match proxy responses with awaiting requests.
Expand Down Expand Up @@ -58,6 +61,7 @@ pub enum LayerToProxyMessage {
Incoming(IncomingRequest),
/// Fetch environment variables from the target.
GetEnv(GetEnvVarsRequest),
NewSocket(NewSocketRequest),
}

/// Layer process information
Expand Down Expand Up @@ -91,10 +95,17 @@ pub struct LayerId(pub u64);
#[derive(Encode, Decode, Debug)]
pub enum NewSessionRequest {
/// Layer initialized from its constructor, has a fresh state.
New(ProcessInfo),
// TODO(alex) [mid]: Do we need it here?
New {
info: ProcessInfo,
sockets: HashMap<RawFd, UserSocket>,
},
/// Layer re-initialized from a [`fork`](https://man7.org/linux/man-pages/man2/fork.2.html) detour.
/// It inherits state from its parent.
Forked(LayerId),
Forked {
layer: LayerId,
sockets: HashMap<RawFd, UserSocket>,
},
}

/// Supported network protocols when intercepting outgoing connections.
Expand Down Expand Up @@ -143,6 +154,7 @@ pub enum IncomingRequest {
/// A request made by the layer when it accepts a connection on the socket that is listening
/// for mirrored connections.
ConnMetadata(ConnMetadataRequest),
NewSocket(NewSocketRequest),
}

/// A request for additional metadata for accepted connection.
Expand Down Expand Up @@ -213,7 +225,7 @@ pub struct PortUnsubscribe {
pub enum ProxyToLayerMessage {
/// A response to [`NewSessionRequest`]. Contains the identifier of the new `layer <-> proxy`
/// session.
NewSession(LayerId),
NewSession(LayerId, HashMap<RawFd, UserSocket>),
/// A response to layer's [`FileRequest`].
File(FileResponse),
/// A response to layer's [`GetAddrInfoRequest`].
Expand Down Expand Up @@ -281,6 +293,17 @@ pub trait IsLayerRequestWithResponse: IsLayerRequest {
) -> Result<Self::Response, ProxyToLayerMessage>;
}

#[derive(Encode, Decode, Debug, Clone)]
pub struct NewSocketRequest {
pub fd: RawFd,
pub user: UserSocket,
}

// impl_request!(
// req = NewSocketRequest,
// req_path = LayerToProxyMessage::NewSocket => IncomingRequest::NewSocket,
// );

impl_request!(
req = OpenFileRequest,
res = RemoteResult<OpenFileResponse>,
Expand Down
122 changes: 122 additions & 0 deletions mirrord/intproxy/protocol/src/net.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
use std::{ffi::c_int, net::SocketAddr};

use bincode::{Decode, Encode};
use mirrord_protocol::outgoing::SocketAddress;

use crate::NetProtocol;

// TODO(alex): We could treat `sockfd` as being the same as `&self` for socket ops, we currently
// can't do that due to how `dup` interacts directly with our `Arc<UserSocket>`, because we just
// `clone` the arc, we end up with exact duplicates, but `dup` generates a new fd that we have no
// way of putting inside the duplicated `UserSocket`.
#[derive(Encode, Decode, Debug, Clone)]
#[allow(dead_code)]
pub struct UserSocket {
pub domain: c_int,
pub type_: c_int,
pub protocol: c_int,
pub state: SocketState,
pub kind: SocketKind,
}

impl UserSocket {
pub fn new(
domain: c_int,
type_: c_int,
protocol: c_int,
state: SocketState,
kind: SocketKind,
) -> Self {
Self {
domain,
type_,
protocol,
state,
kind,
}
}
}

#[derive(Encode, Decode, Debug, Clone, Copy, PartialEq, Eq)]
pub enum SocketKind {
Tcp(c_int),
Udp(c_int),
}

impl SocketKind {
pub const fn is_udp(self) -> bool {
matches!(self, Self::Udp(..))
}
}

impl From<SocketKind> for NetProtocol {
fn from(kind: SocketKind) -> Self {
match kind {
SocketKind::Tcp(..) => Self::Stream,
SocketKind::Udp(..) => Self::Datagrams,
}
}
}

#[derive(Encode, Decode, Debug, Default, Clone)]
pub enum SocketState {
#[default]
Initialized,
Bound(Bound),
Listening(Bound),
Connected(Connected),
}

/// Represents a [`SocketState`] where the user made a [`libc::bind`] call, and we intercepted it.
///
/// ## Details
///
/// Our [`ops::bind`] hook doesn't bind the address that the user passed to us, instead we call
/// [`hooks::FN_BIND`] with `localhost:0` (or `unspecified:0` for ipv6), and use
/// [`hooks::FN_GETSOCKNAME`] to retrieve this bound address which we assign to `Bound::address`.
///
/// The original user requested address is assigned to `Bound::requested_address`, and used as an
/// illusion for when the user calls [`libc::getsockname`], as if this address was the actual local
/// bound address.
#[derive(Encode, Decode, Debug, Clone, Copy)]
pub struct Bound {
/// Address originally requested by the user for `bind`.
pub requested_address: SocketAddr,

/// Actual bound address that we use to communicate between the user's listener socket and our
/// interceptor socket.
pub address: SocketAddr,
}

/// Contains the addresses of a mirrord connected socket.
///
/// - `layer_address` is only used for the outgoing feature.
#[derive(Encode, Decode, Debug, Clone)]
pub struct Connected {
/// The address requested by the user that we're "connected" to.
///
/// Whenever the user calls [`libc::getpeername`], this is the address we return to them.
///
/// For the _outgoing_ feature, we actually connect to the `layer_address` interceptor socket,
/// but use this address in the [`libc::recvfrom`] handling of [`fill_address`].
pub remote_address: SocketAddress,

/// Local address (pod-wise)
///
/// ## Example
///
/// ```sh
/// $ kubectl get pod -o wide
///
/// NAME READY STATUS IP
/// impersonated-pod 0/1 Running 1.2.3.4
/// ```
///
/// We would set this ip as `1.2.3.4:{port}` in `bind`, where `{port}` is the user requested
/// port.
pub local_address: SocketAddress,

/// The address of the interceptor socket, this is what we're really connected to in the
/// outgoing feature.
pub layer_address: Option<SocketAddress>,
}
21 changes: 16 additions & 5 deletions mirrord/intproxy/src/layer_initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,32 @@ impl LayerInitializer {
let id = self.next_layer_id;
self.next_layer_id.0 += 1;

let parent_id = match msg.inner {
LayerToProxyMessage::NewSession(NewSessionRequest::New(process_info)) => {
// TODO(alex) [high]: Deal with the sockets we received from the layer!
let (parent_id, sockets) = match msg.inner {
LayerToProxyMessage::NewSession(NewSessionRequest::New {
info: process_info,
sockets,
}) => {
// TODO(alex) [high]: Looks like the socket is created in another process,
// and we end up here, not in the forked match!
info!(?process_info, "new session");
None
(None, sockets)
}
LayerToProxyMessage::NewSession(NewSessionRequest::Forked(parent)) => Some(parent),
LayerToProxyMessage::NewSession(NewSessionRequest::Forked {
layer: parent,
sockets,
}) => (Some(parent), sockets),
other => return Err(LayerInitializerError::UnexpectedMessage(other)),
};

tracing::info!("lesockets {sockets:?} parent id {parent_id:?}");

let mut encoder: AsyncEncoder<LocalMessage<ProxyToLayerMessage>, _> =
AsyncEncoder::new(decoder.into_inner());
encoder
.send(&LocalMessage {
message_id: msg.message_id,
inner: ProxyToLayerMessage::NewSession(id),
inner: ProxyToLayerMessage::NewSession(id, sockets),
})
.await?;
encoder.flush().await?;
Expand Down
62 changes: 53 additions & 9 deletions mirrord/layer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,20 @@ use ctor::ctor;
use error::{LayerError, Result};
use file::OPEN_FILES;
use hooks::HookManager;
use libc::{c_int, pid_t};
use libc::{c_int, pid_t, FD_CLOEXEC};
use load::ExecuteArgs;
#[cfg(target_os = "macos")]
use mirrord_config::feature::fs::FsConfig;
use mirrord_config::{
feature::{fs::FsModeConfig, network::incoming::IncomingMode},
LayerConfig,
};
use mirrord_intproxy_protocol::NewSessionRequest;
use mirrord_intproxy_protocol::{net::UserSocket, NewSessionRequest};
use mirrord_layer_macro::{hook_fn, hook_guard_fn};
use mirrord_protocol::{EnvVars, GetEnvVarsRequest};
use proxy_connection::ProxyConnection;
use setup::LayerSetup;
use socket::SOCKETS;
use socket::{hooks::FN_FCNTL, UserSocketImpl, SOCKETS};
use tracing_subscriber::{fmt::format::FmtSpan, prelude::*};

use crate::{
Expand Down Expand Up @@ -226,14 +226,27 @@ fn load_only_layer_start(config: &LayerConfig) {
.parse()
.expect("failed to parse internal proxy address");

tracing::info!("another sockets {SOCKETS:?}");
let new_connection = ProxyConnection::new(
address,
NewSessionRequest::New(
EXECUTABLE_ARGS
NewSessionRequest::New {
info: EXECUTABLE_ARGS
.get()
.expect("EXECUTABLE_ARGS MUST BE SET")
.to_process_info(config),
),
sockets: {
SOCKETS
.iter()
.filter_map(|inner| {
// if FD_CLOEXEC & unsafe { FN_FCNTL(*inner.key(), 0) } > 0 {
// None
// } else {
Some((*inner.key(), UserSocket::clone(inner.value())))
// }
})
.collect()
},
},
PROXY_CONNECTION_TIMEOUT,
)
.expect("failed to initialize proxy connection");
Expand Down Expand Up @@ -358,9 +371,27 @@ fn layer_start(mut config: LayerConfig) {

unsafe {
let address = setup().proxy_address();

// TODO(alex) [high]: Pass the sockets here?

tracing::info!("sockets in layer init {SOCKETS:?}");
let new_connection = ProxyConnection::new(
address,
NewSessionRequest::New(process_info),
NewSessionRequest::New {
info: process_info,
sockets: {
SOCKETS
.iter()
.filter_map(|inner| {
// if FD_CLOEXEC & FN_FCNTL(*inner.key(), 0) > 0 {
// None
// } else {
Some((*inner.key(), UserSocket::clone(inner.value())))
// }
})
.collect()
},
},
PROXY_CONNECTION_TIMEOUT,
)
.expect("failed to initialize proxy connection");
Expand Down Expand Up @@ -581,7 +612,7 @@ pub(crate) unsafe extern "C" fn close_detour(fd: c_int) -> c_int {
/// on macOS, be wary what we do in this path as we might trigger <https://github.com/metalbear-co/mirrord/issues/1745>
#[hook_guard_fn]
pub(crate) unsafe extern "C" fn fork_detour() -> pid_t {
tracing::debug!("Process {} forking!.", std::process::id());
tracing::info!("Process {} forking!.", std::process::id());

let res = FN_FORK();

Expand All @@ -596,9 +627,22 @@ pub(crate) unsafe extern "C" fn fork_detour() -> pid_t {
}
};

tracing::info!("the final sockets {SOCKETS:?}");
let new_connection = ProxyConnection::new(
parent_connection.proxy_addr(),
NewSessionRequest::Forked(parent_connection.layer_id()),
NewSessionRequest::Forked {
layer: parent_connection.layer_id(),
sockets: SOCKETS
.iter()
.filter_map(|inner| {
// if FD_CLOEXEC & FN_FCNTL(*inner.key(), 0) > 0 {
// None
// } else {
Some((*inner.key(), UserSocket::clone(inner.value())))
// }
})
.collect(),
},
PROXY_CONNECTION_TIMEOUT,
)
.expect("failed to establish proxy connection for child");
Expand Down
Loading
Loading