Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send a copy of the SOCKETS to intproxy, and have it initialize a new layer with those SOCKETS. #2566

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion mirrord/intproxy/protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ use std::{
collections::HashMap,
fmt,
net::{IpAddr, SocketAddr},
num::ParseIntError,
os::fd::RawFd,
str::FromStr,
};

use bincode::{Decode, Encode};
Expand All @@ -23,10 +26,12 @@ use mirrord_protocol::{
tcp::StealType,
FileRequest, FileResponse, GetEnvVarsRequest, Port, RemoteResult,
};
use net::UserSocket;

#[cfg(feature = "codec")]
pub mod codec;
mod macros;
pub mod net;

/// An identifier for a message sent from the layer to the internal proxy.
/// The layer uses this to match proxy responses with awaiting requests.
Expand Down Expand Up @@ -78,6 +83,20 @@ pub struct ProcessInfo {
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Encode, Decode)]
pub struct LayerId(pub u64);

impl core::fmt::Display for LayerId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}

impl FromStr for LayerId {
type Err = ParseIntError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self(u64::from_str(s)?))
}
}

/// A layer's request to start a new session with the internal proxy.
/// Contains info about layer's state.
/// This should be the first message sent by the layer after opening a new connection to the
Expand All @@ -91,10 +110,25 @@ pub struct LayerId(pub u64);
#[derive(Encode, Decode, Debug)]
pub enum NewSessionRequest {
/// Layer initialized from its constructor, has a fresh state.
// TODO(alex) [mid]: Do we need it here?
New(ProcessInfo),
/// Layer re-initialized from a [`fork`](https://man7.org/linux/man-pages/man2/fork.2.html) detour.
/// It inherits state from its parent.
Forked(LayerId),
Execve(ExecveRequest),
}

#[derive(Encode, Decode, Debug)]
pub struct ExecveRequest {
pub parent: LayerId,
pub shared_sockets: HashMap<RawFd, UserSocket>,
}

/// Layer process information
#[derive(Encode, Decode, Debug)]
pub struct Execve {
pub parent: LayerId,
pub shared_sockets: HashMap<RawFd, UserSocket>,
}

/// Supported network protocols when intercepting outgoing connections.
Expand Down Expand Up @@ -213,7 +247,7 @@ pub struct PortUnsubscribe {
pub enum ProxyToLayerMessage {
/// A response to [`NewSessionRequest`]. Contains the identifier of the new `layer <-> proxy`
/// session.
NewSession(LayerId),
NewSession(LayerId, HashMap<RawFd, UserSocket>),
/// A response to layer's [`FileRequest`].
File(FileResponse),
/// A response to layer's [`GetAddrInfoRequest`].
Expand Down Expand Up @@ -281,6 +315,12 @@ pub trait IsLayerRequestWithResponse: IsLayerRequest {
) -> Result<Self::Response, ProxyToLayerMessage>;
}

// TODO(alex) [high]: What am I doing wrong here?
impl_request!(
req = ExecveRequest,
req_path = LayerToProxyMessage::NewSession => NewSessionRequest::Execve,
);

impl_request!(
req = OpenFileRequest,
res = RemoteResult<OpenFileResponse>,
Expand Down
122 changes: 122 additions & 0 deletions mirrord/intproxy/protocol/src/net.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
use std::{ffi::c_int, net::SocketAddr};

use bincode::{Decode, Encode};
use mirrord_protocol::outgoing::SocketAddress;

use crate::NetProtocol;

// TODO(alex): We could treat `sockfd` as being the same as `&self` for socket ops, we currently
// can't do that due to how `dup` interacts directly with our `Arc<UserSocket>`, because we just
// `clone` the arc, we end up with exact duplicates, but `dup` generates a new fd that we have no
// way of putting inside the duplicated `UserSocket`.
#[derive(Encode, Decode, Debug, Clone)]
#[allow(dead_code)]
pub struct UserSocket {
pub domain: c_int,
pub type_: c_int,
pub protocol: c_int,
pub state: SocketState,
pub kind: SocketKind,
}

impl UserSocket {
pub fn new(
domain: c_int,
type_: c_int,
protocol: c_int,
state: SocketState,
kind: SocketKind,
) -> Self {
Self {
domain,
type_,
protocol,
state,
kind,
}
}
}

#[derive(Encode, Decode, Debug, Clone, Copy, PartialEq, Eq)]
pub enum SocketKind {
Tcp(c_int),
Udp(c_int),
}

impl SocketKind {
pub const fn is_udp(self) -> bool {
matches!(self, Self::Udp(..))
}
}

impl From<SocketKind> for NetProtocol {
fn from(kind: SocketKind) -> Self {
match kind {
SocketKind::Tcp(..) => Self::Stream,
SocketKind::Udp(..) => Self::Datagrams,
}
}
}

#[derive(Encode, Decode, Debug, Default, Clone)]
pub enum SocketState {
#[default]
Initialized,
Bound(Bound),
Listening(Bound),
Connected(Connected),
}

/// Represents a [`SocketState`] where the user made a [`libc::bind`] call, and we intercepted it.
///
/// ## Details
///
/// Our [`ops::bind`] hook doesn't bind the address that the user passed to us, instead we call
/// [`hooks::FN_BIND`] with `localhost:0` (or `unspecified:0` for ipv6), and use
/// [`hooks::FN_GETSOCKNAME`] to retrieve this bound address which we assign to `Bound::address`.
///
/// The original user requested address is assigned to `Bound::requested_address`, and used as an
/// illusion for when the user calls [`libc::getsockname`], as if this address was the actual local
/// bound address.
#[derive(Encode, Decode, Debug, Clone, Copy)]
pub struct Bound {
/// Address originally requested by the user for `bind`.
pub requested_address: SocketAddr,

/// Actual bound address that we use to communicate between the user's listener socket and our
/// interceptor socket.
pub address: SocketAddr,
}

/// Contains the addresses of a mirrord connected socket.
///
/// - `layer_address` is only used for the outgoing feature.
#[derive(Encode, Decode, Debug, Clone)]
pub struct Connected {
/// The address requested by the user that we're "connected" to.
///
/// Whenever the user calls [`libc::getpeername`], this is the address we return to them.
///
/// For the _outgoing_ feature, we actually connect to the `layer_address` interceptor socket,
/// but use this address in the [`libc::recvfrom`] handling of [`fill_address`].
pub remote_address: SocketAddress,

/// Local address (pod-wise)
///
/// ## Example
///
/// ```sh
/// $ kubectl get pod -o wide
///
/// NAME READY STATUS IP
/// impersonated-pod 0/1 Running 1.2.3.4
/// ```
///
/// We would set this ip as `1.2.3.4:{port}` in `bind`, where `{port}` is the user requested
/// port.
pub local_address: SocketAddress,

/// The address of the interceptor socket, this is what we're really connected to in the
/// outgoing feature.
pub layer_address: Option<SocketAddress>,
}
24 changes: 19 additions & 5 deletions mirrord/intproxy/src/layer_initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use std::io;

use mirrord_intproxy_protocol::{
codec::{AsyncDecoder, AsyncEncoder, CodecError},
LayerId, LayerToProxyMessage, LocalMessage, NewSessionRequest, ProxyToLayerMessage,
ExecveRequest, LayerId, LayerToProxyMessage, LocalMessage, NewSessionRequest,
ProxyToLayerMessage,
};
use thiserror::Error;
use tokio::net::{TcpListener, TcpStream};
Expand Down Expand Up @@ -58,21 +59,34 @@ impl LayerInitializer {
let id = self.next_layer_id;
self.next_layer_id.0 += 1;

let parent_id = match msg.inner {
// TODO(alex) [high]: Deal with the sockets we received from the layer!
tracing::info!("new stream message: {:?}", msg.inner);
let (parent_id, sockets) = match msg.inner {
LayerToProxyMessage::NewSession(NewSessionRequest::New(process_info)) => {
// TODO(alex) [high]: Looks like the socket is created in another process,
// and we end up here, not in the forked match!
info!(?process_info, "new session");
None
(None, Default::default())
}
LayerToProxyMessage::NewSession(NewSessionRequest::Forked(parent)) => Some(parent),
LayerToProxyMessage::NewSession(NewSessionRequest::Forked(parent)) => {
(Some(parent), Default::default())
}
LayerToProxyMessage::NewSession(NewSessionRequest::Execve(ExecveRequest {
parent,
shared_sockets,
})) => (Some(parent), shared_sockets),

other => return Err(LayerInitializerError::UnexpectedMessage(other)),
};

tracing::info!("lesockets {sockets:?} parent id {parent_id:?}");

let mut encoder: AsyncEncoder<LocalMessage<ProxyToLayerMessage>, _> =
AsyncEncoder::new(decoder.into_inner());
encoder
.send(&LocalMessage {
message_id: msg.message_id,
inner: ProxyToLayerMessage::NewSession(id),
inner: ProxyToLayerMessage::NewSession(id, sockets),
})
.await?;
encoder.flush().await?;
Expand Down
1 change: 1 addition & 0 deletions mirrord/layer/src/exec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub(crate) mod hooks;
127 changes: 127 additions & 0 deletions mirrord/layer/src/exec/hooks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use libc::{c_char, c_int, FD_CLOEXEC};
use mirrord_intproxy_protocol::{net::UserSocket, Execve, ExecveRequest, NewSessionRequest};
use mirrord_layer_macro::hook_guard_fn;
use tracing::Level;

use crate::{
common, detour::Detour, hooks::HookManager, proxy_connection::ProxyConnection, replace,
socket::hooks::FN_FCNTL, PROXY_CONNECTION, PROXY_CONNECTION_TIMEOUT, SOCKETS,
};

#[hook_guard_fn]
unsafe extern "C" fn execl_detour(
path: *const c_char,
arg0: *const c_char,
mut args: ...
) -> c_int {
tracing::info!("execl");

FN_EXECL(path, arg0, args)
}

#[hook_guard_fn]
unsafe extern "C" fn execlp_detour(
file: *const c_char,
arg0: *const c_char,
mut args: ...
) -> c_int {
tracing::info!("execlp");

FN_EXECLP(file, arg0, args)
}

#[hook_guard_fn]
unsafe extern "C" fn execle_detour(
path: *const c_char,
arg0: *const c_char,
mut args: ...
) -> c_int {
tracing::info!("execle");

FN_EXECLE(path, arg0, args)
}

#[hook_guard_fn]
#[tracing::instrument(level = Level::INFO, ret)]
unsafe extern "C" fn execv_detour(prog: *const c_char, argv: *const *const c_char) -> c_int {
FN_EXECV(prog, argv)
}

#[hook_guard_fn]
#[tracing::instrument(level = Level::INFO, ret)]
unsafe extern "C" fn execvp_detour(c: *const c_char, argv: *const *const c_char) -> c_int {
FN_EXECVP(c, argv)
}

#[hook_guard_fn]
#[tracing::instrument(level = Level::INFO, ret)]
unsafe extern "C" fn execvpe_detour(
file: *const c_char,
argv: *const *const c_char,
envp: *const *const c_char,
) -> c_int {
FN_EXECVPE(file, argv, envp)
}

#[hook_guard_fn]
#[tracing::instrument(level = Level::INFO, ret)]
pub(crate) unsafe extern "C" fn execve_detour(
path: *const c_char,
argv: *const *const c_char,
envp: *const *const c_char,
) -> c_int {
execve();
FN_EXECVE(path, argv, envp)
}

// TODO(alex) [high]: Set env var and save sockets.
#[mirrord_layer_macro::instrument(level = Level::INFO, ret)]
pub(super) fn execve() -> Detour<()> {
let shared_sockets = SOCKETS
.iter()
.filter_map(|inner| {
// if FD_CLOEXEC & unsafe { FN_FCNTL(*inner.key(), 0) } > 0 {
// None
// } else {
Some((*inner.key(), UserSocket::clone(inner.value())))
// }
})
.collect();
tracing::info!("shared sockets {shared_sockets:?}");

match unsafe { PROXY_CONNECTION.get() } {
Some(conn) => {
let id = conn.layer_id();
std::env::set_var("MIRRORD_PARENT_LAYER_ID", id.to_string());

common::make_proxy_request_no_response(ExecveRequest {
parent: id,
shared_sockets,
})
.unwrap();
}
None => {
tracing::info!("Skipping new inptroxy connection (trace only)");
return Detour::Success(());
}
};

Detour::Success(())
}

pub(crate) unsafe fn enable_exec_hooks(hook_manager: &mut HookManager) {
// replace!(hook_manager, "execl", execl_detour, FnExecl, FN_EXECL);
// replace!(hook_manager, "execlp", execlp_detour, FnExeclp, FN_EXECLP);
// replace!(hook_manager, "execle", execle_detour, FnExecle, FN_EXECLE);
// replace!(hook_manager, "execv", execv_detour, FnExecv, FN_EXECV);
// replace!(hook_manager, "execvp", execvp_detour, FnExecvp, FN_EXECVP);
// replace!(
// hook_manager,
// "execvpe",
// execvpe_detour,
// FnExecvpe,
// FN_EXECVPE
// );
replace!(hook_manager, "execve", execve_detour, FnExecve, FN_EXECVE);
// replace!(hook_manager, "exec", exec_detour, FnExec, FN_EXEC);
}
Loading
Loading