Skip to content

Commit

Permalink
Traffic steal (#215)
Browse files Browse the repository at this point in the history
  • Loading branch information
aviramha authored Aug 31, 2022
1 parent 10fdf1c commit ce9850e
Show file tree
Hide file tree
Showing 27 changed files with 844 additions and 2,760 deletions.
39 changes: 35 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ k8s-openapi = { version = "0.15", features = ["v1_24"] }
reqwest = { version = "0.11", features = ["blocking"] }
kube = { version = "0.73", default-features = false, features = ["runtime", "derive", "client", "ws", "rustls-tls"] }
dns-lookup = "1"
tokio-util = "0.7"
rand = "0.8"
streammap-ext = "0.1"


# latest commits on rustls suppress certificate verification
# https://github.com/rustls/rustls/pull/1032
Expand Down
7 changes: 5 additions & 2 deletions mirrord-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ thiserror.workspace = true
dns-lookup.workspace = true
num-traits = "0.2"
bollard = "0.13"
tokio-util = "0.7"
tokio-util.workspace = true
rand.workspace = true
streammap-ext.workspace = true
iptables = "0.5"
libc = "0.2"
faccess = "0.2"
streammap-ext = "0.1"

[dev-dependencies]
test_bin = "0.4"
4 changes: 3 additions & 1 deletion mirrord-agent/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ COPY .cargo /app/.cargo
RUN cargo +nightly build -Z bindeps --manifest-path /app/mirrord-agent/Cargo.toml --release

FROM debian:stable
RUN apt update && apt install -y libpcap-dev
COPY --from=build-env /app/target/release/mirrord-agent /
RUN apt update && apt install -y libpcap-dev iptables
RUN update-alternatives --set iptables /usr/sbin/iptables-legacy \
&& update-alternatives --set ip6tables /usr/sbin/ip6tables-legacy

CMD ["./mirrord-agent"]
15 changes: 13 additions & 2 deletions mirrord-agent/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use mirrord_protocol::{
tcp::{
outgoing::{DaemonTcpOutgoing, LayerConnect, LayerTcpOutgoing},
DaemonTcp,
DaemonTcp, LayerTcpSteal,
},
FileRequest, FileResponse,
FileRequest, FileResponse, Port,
};
use thiserror::Error;

Expand Down Expand Up @@ -68,6 +68,17 @@ pub enum AgentError {
#[error("Bollard failed with `{0}`")]
Bollard(#[from] bollard::errors::Error),

#[error("Connection received from unexepcted port `{0}`")]
UnexpectedConnection(Port),

#[error("LayerTcpSteal sender failed with `{0}`")]
SendLayerTcpSteal(#[from] tokio::sync::mpsc::error::SendError<LayerTcpSteal>),

#[error("IPTables failed with `{0}`")]
IPTablesError(String),

#[error("Join task failed")]
JoinTask,
}

pub type Result<T> = std::result::Result<T, AgentError>;
41 changes: 33 additions & 8 deletions mirrord-agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,34 @@ use futures::{
SinkExt,
};
use mirrord_protocol::{
tcp::LayerTcp, AddrInfoHint, AddrInfoInternal, ClientMessage, DaemonCodec, DaemonMessage,
GetAddrInfoRequest, GetEnvVarsRequest, RemoteResult, ResponseError,
tcp::{DaemonTcp, LayerTcp, LayerTcpSteal},
AddrInfoHint, AddrInfoInternal, ClientMessage, DaemonCodec, DaemonMessage, GetAddrInfoRequest,
GetEnvVarsRequest, RemoteResult, ResponseError,
};
use sniffer::{SnifferCommand, TCPConnectionSniffer, TCPSnifferAPI};
use tcp::outgoing::TcpOutgoingApi;
use tokio::{
io::AsyncReadExt,
net::{TcpListener, TcpStream},
select,
sync::mpsc::{self, Sender},
sync::mpsc::{self, Receiver, Sender},
};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, trace};
use tracing_subscriber::prelude::*;
use util::{ClientID, IndexAllocator};

use crate::{runtime::get_container_pid, util::run_thread};
use crate::{
runtime::get_container_pid,
steal::steal_worker,
util::{run_thread, ClientID, IndexAllocator},
};

mod cli;
mod error;
mod file;
mod runtime;
mod sniffer;
mod steal;
mod tcp;
mod util;

Expand Down Expand Up @@ -178,6 +183,8 @@ struct ClientConnectionHandler {
stream: Framed<TcpStream, DaemonCodec>,
pid: Option<u64>,
tcp_sniffer_api: TCPSnifferAPI,
tcp_stealer_sender: Sender<LayerTcpSteal>,
tcp_stealer_receiver: Receiver<DaemonTcp>,
tcp_outgoing_api: TcpOutgoingApi,
}

Expand All @@ -201,6 +208,14 @@ impl ClientConnectionHandler {
let (tcp_sender, tcp_receiver) = mpsc::channel(CHANNEL_SIZE);
let tcp_sniffer_api =
TCPSnifferAPI::new(id, sniffer_command_sender, tcp_receiver, tcp_sender).await?;
let (tcp_steal_layer_sender, tcp_steal_layer_receiver) = mpsc::channel(CHANNEL_SIZE);
let (tcp_steal_daemon_sender, tcp_steal_daemon_receiver) = mpsc::channel(CHANNEL_SIZE);

let _ = run_thread(steal_worker(
tcp_steal_layer_receiver,
tcp_steal_daemon_sender,
pid,
));

let tcp_outgoing_api = TcpOutgoingApi::new(pid);

Expand All @@ -210,11 +225,12 @@ impl ClientConnectionHandler {
stream,
pid,
tcp_sniffer_api,
tcp_stealer_receiver: tcp_steal_daemon_receiver,
tcp_stealer_sender: tcp_steal_layer_sender,
tcp_outgoing_api,
};

client_handler.handle_loop(cancel_token).await?;

Ok(())
}

Expand Down Expand Up @@ -243,10 +259,18 @@ impl ClientConnectionHandler {
error!("tcp sniffer stopped?");
break;
}
}
},
message = self.tcp_stealer_receiver.recv() => {
if let Some(message) = message {
self.stream.send(DaemonMessage::TcpSteal(message)).await?;
} else {
error!("tcp stealer stopped?");
break;
}
},
message = self.tcp_outgoing_api.daemon_message() => {
self.respond(DaemonMessage::TcpOutgoing(message?)).await?;
}
},
_ = token.cancelled() => {
break;
}
Expand Down Expand Up @@ -298,6 +322,7 @@ impl ClientConnectionHandler {
}
ClientMessage::Ping => self.respond(DaemonMessage::Pong).await?,
ClientMessage::Tcp(message) => self.handle_client_tcp(message).await?,
ClientMessage::TcpSteal(message) => self.tcp_stealer_sender.send(message).await?,
ClientMessage::Close => {
return Ok(false);
}
Expand Down
Loading

0 comments on commit ce9850e

Please sign in to comment.