Skip to content

Commit

Permalink
Don't filter outgoing traffic to target (#2588)
Browse files Browse the repository at this point in the history
* Rework

* Use envy

* Fix

* Update

* Fix tests
  • Loading branch information
DmitryDodzin authored Jul 17, 2024
1 parent 652f092 commit 1323a14
Show file tree
Hide file tree
Showing 14 changed files with 92 additions and 32 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions changelog.d/2572.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Update loopback detection to include pod ip's
1 change: 1 addition & 0 deletions mirrord/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ drain.workspace = true
tokio-rustls = "0.26"
x509-parser = "0.16"
rustls.workspace = true
envy = "0.4"

[target.'cfg(target_os = "linux")'.dependencies]
iptables = { git = "https://github.com/metalbear-co/rust-iptables.git", rev = "e66c7332e361df3c61a194f08eefe3f40763d624" }
Expand Down
18 changes: 13 additions & 5 deletions mirrord/agent/src/steal/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use mirrord_protocol::{
RemoteError::{BadHttpFilterExRegex, BadHttpFilterRegex},
RequestId,
};
use serde::Deserialize;
use tokio::{
net::TcpStream,
sync::mpsc::{Receiver, Sender},
Expand Down Expand Up @@ -260,6 +261,12 @@ impl Client {
}
}

#[derive(Deserialize, Debug, Default)]
struct TcpStealerConfig {
stealer_flush_connections: bool,
pod_ips: Option<String>,
}

/// Created once per agent during initialization.
///
/// Meant to be run (see [`TcpConnectionStealer::start`]) in a separate thread while the agent
Expand Down Expand Up @@ -287,12 +294,13 @@ impl TcpConnectionStealer {
/// You need to call [`TcpConnectionStealer::start`] to do so.
#[tracing::instrument(level = "trace")]
pub(crate) async fn new(command_rx: Receiver<StealerCommand>) -> Result<Self, AgentError> {
let config = envy::prefixed("MIRRORD_AGENT_")
.from_env::<TcpStealerConfig>()
.unwrap_or_default();

let port_subscriptions = {
let flush_connections = std::env::var("MIRRORD_AGENT_STEALER_FLUSH_CONNECTIONS")
.ok()
.and_then(|var| var.parse::<bool>().ok())
.unwrap_or_default();
let redirector = IpTablesRedirector::new(flush_connections).await?;
let redirector =
IpTablesRedirector::new(config.stealer_flush_connections, config.pod_ips).await?;

PortSubscriptions::new(redirector, 4)
};
Expand Down
14 changes: 9 additions & 5 deletions mirrord/agent/src/steal/ip_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,13 +236,17 @@ impl<IPT> SafeIpTables<IPT>
where
IPT: IPTables + Send + Sync,
{
pub(super) async fn create(ipt: IPT, flush_connections: bool) -> Result<Self> {
pub(super) async fn create(
ipt: IPT,
flush_connections: bool,
pod_ips: Option<&str>,
) -> Result<Self> {
let ipt = Arc::new(ipt);

let mut redirect = if let Some(vendor) = MeshVendor::detect(ipt.as_ref())? {
Redirects::Mesh(MeshRedirect::create(ipt.clone(), vendor)?)
Redirects::Mesh(MeshRedirect::create(ipt.clone(), vendor, pod_ips)?)
} else {
match StandardRedirect::create(ipt.clone()) {
match StandardRedirect::create(ipt.clone(), pod_ips) {
Err(err) => {
warn!("Unable to create StandardRedirect chain: {err}");

Expand Down Expand Up @@ -416,7 +420,7 @@ mod tests {
.times(1)
.returning(|_| Ok(()));

let ipt = SafeIpTables::create(mock, false)
let ipt = SafeIpTables::create(mock, false, None)
.await
.expect("Create Failed");

Expand Down Expand Up @@ -549,7 +553,7 @@ mod tests {
.times(1)
.returning(|_| Ok(()));

let ipt = SafeIpTables::create(mock, false)
let ipt = SafeIpTables::create(mock, false, None)
.await
.expect("Create Failed");

Expand Down
10 changes: 5 additions & 5 deletions mirrord/agent/src/steal/ip_tables/mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ impl<IPT> MeshRedirect<IPT>
where
IPT: IPTables,
{
pub fn create(ipt: Arc<IPT>, vendor: MeshVendor) -> Result<Self> {
pub fn create(ipt: Arc<IPT>, vendor: MeshVendor, pod_ips: Option<&str>) -> Result<Self> {
let prerouteing = PreroutingRedirect::create(ipt.clone())?;

for port in Self::get_skip_ports(&ipt, &vendor)? {
prerouteing.add_rule(&format!("-m multiport -p tcp ! --dports {port} -j RETURN"))?;
}

let output = OutputRedirect::create(ipt, IPTABLE_MESH.to_string())?;
let output = OutputRedirect::create(ipt, IPTABLE_MESH.to_string(), pod_ips)?;

Ok(MeshRedirect {
prerouteing,
Expand Down Expand Up @@ -220,7 +220,7 @@ mod tests {
mock.expect_insert_rule()
.with(
eq(IPTABLE_MESH.as_str()),
eq(format!("-m owner --gid-owner {gid} -p tcp -j RETURN")),
eq(format!("-m owner --gid-owner {gid} -p tcp -j RETURN")),
eq(1),
)
.times(1)
Expand All @@ -245,8 +245,8 @@ mod tests {
.times(1)
.returning(|_| Ok(()));

let prerouting =
MeshRedirect::create(Arc::new(mock), MeshVendor::Linkerd).expect("Unable to create");
let prerouting = MeshRedirect::create(Arc::new(mock), MeshVendor::Linkerd, None)
.expect("Unable to create");

assert!(prerouting.add_redirect(69, 420).await.is_ok());
}
Expand Down
12 changes: 9 additions & 3 deletions mirrord/agent/src/steal/ip_tables/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@ where
{
const ENTRYPOINT: &'static str = "OUTPUT";

pub fn create(ipt: Arc<IPT>, chain_name: String) -> Result<Self> {
pub fn create(ipt: Arc<IPT>, chain_name: String, pod_ips: Option<&str>) -> Result<Self> {
let managed = IPTableChain::create(ipt, chain_name)?;

let exclude_source_ips = pod_ips
.map(|pod_ips| format!("! -s {pod_ips}"))
.unwrap_or_default();

let gid = getgid();
managed
.add_rule(&format!("-m owner --gid-owner {gid} -p tcp -j RETURN"))
.add_rule(&format!(
"-m owner --gid-owner {gid} -p tcp {exclude_source_ips} -j RETURN"
))
.inspect_err(|_| {
warn!("Unable to create iptable rule with \"--gid-owner {gid}\" filter")
})?;
Expand All @@ -34,7 +40,7 @@ where
}

pub fn load(ipt: Arc<IPT>, chain_name: String) -> Result<Self> {
let managed = IPTableChain::create(ipt, chain_name)?;
let managed = IPTableChain::load(ipt, chain_name)?;

Ok(OutputRedirect { managed })
}
Expand Down
4 changes: 2 additions & 2 deletions mirrord/agent/src/steal/ip_tables/standard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ impl<IPT> StandardRedirect<IPT>
where
IPT: IPTables,
{
pub fn create(ipt: Arc<IPT>) -> Result<Self> {
pub fn create(ipt: Arc<IPT>, pod_ips: Option<&str>) -> Result<Self> {
let prerouteing = PreroutingRedirect::create(ipt.clone())?;
let output = OutputRedirect::create(ipt, IPTABLE_STANDARD.to_string())?;
let output = OutputRedirect::create(ipt, IPTABLE_STANDARD.to_string(), pod_ips)?;

Ok(StandardRedirect {
prerouteing,
Expand Down
15 changes: 13 additions & 2 deletions mirrord/agent/src/steal/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ pub(crate) struct IpTablesRedirector {
redirect_to: Port,
/// Listener to which redirect all connections.
listener: TcpListener,

pod_ips: Option<String>,
}

impl IpTablesRedirector {
Expand All @@ -73,7 +75,10 @@ impl IpTablesRedirector {
///
/// * `flush_connections` - whether exisitng connections should be flushed when adding new
/// redirects
pub(crate) async fn new(flush_connections: bool) -> Result<Self, AgentError> {
pub(crate) async fn new(
flush_connections: bool,
pod_ips: Option<String>,
) -> Result<Self, AgentError> {
let listener = TcpListener::bind((Ipv4Addr::UNSPECIFIED, 0)).await?;
let redirect_to = listener.local_addr()?.port();

Expand All @@ -82,6 +87,7 @@ impl IpTablesRedirector {
flush_connections,
redirect_to,
listener,
pod_ips,
})
}
}
Expand All @@ -95,7 +101,12 @@ impl PortRedirector for IpTablesRedirector {
Some(iptables) => iptables,
None => {
let iptables = new_iptables();
let safe = SafeIpTables::create(iptables.into(), self.flush_connections).await?;
let safe = SafeIpTables::create(
iptables.into(),
self.flush_connections,
self.pod_ips.as_deref(),
)
.await?;
self.iptables.insert(safe)
}
};
Expand Down
12 changes: 4 additions & 8 deletions mirrord/kube/src/api/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ pub struct ContainerParams {
/// Value for [`AGENT_OPERATOR_CERT_ENV`](mirrord_protocol::AGENT_OPERATOR_CERT_ENV) set in
/// the agent container.
pub tls_cert: Option<String>,
pub pod_ips: Option<String>,
}

impl ContainerParams {
pub fn new() -> ContainerParams {
pub fn new(tls_cert: Option<String>, pod_ips: Option<String>) -> ContainerParams {
let port: u16 = rand::thread_rng().gen_range(30000..=65535);
let gid: u16 = rand::thread_rng().gen_range(3000..u16::MAX);

Expand All @@ -57,17 +58,12 @@ impl ContainerParams {
name,
gid,
port,
tls_cert: None,
tls_cert,
pod_ips,
}
}
}

impl Default for ContainerParams {
fn default() -> Self {
Self::new()
}
}

pub trait ContainerVariant {
type Update;

Expand Down
3 changes: 3 additions & 0 deletions mirrord/kube/src/api/container/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ mod test {
port: 3000,
gid: 13,
tls_cert: None,
pod_ips: None,
};

let update = JobVariant::new(&agent, &params).as_update();
Expand Down Expand Up @@ -327,6 +328,7 @@ mod test {
port: 3000,
gid: 13,
tls_cert: None,
pod_ips: None,
};

let update = JobTargetedVariant::new(
Expand All @@ -335,6 +337,7 @@ mod test {
&RuntimeData {
mesh: None,
pod_name: "pod".to_string(),
pod_ips: vec![],
pod_namespace: None,
node_name: "foobaz".to_string(),
container_id: "container".to_string(),
Expand Down
4 changes: 4 additions & 0 deletions mirrord/kube/src/api/container/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ pub(super) fn agent_env(agent: &AgentConfig, params: &&ContainerParams) -> Vec<E
env.push(("MIRRORD_AGENT_DNS_TIMEOUT".to_string(), timeout.to_string()));
};

if let Some(pod_ips) = params.pod_ips.clone() {
env.push(("MIRRORD_AGENT_POD_IPS".to_string(), pod_ips));
}

env.into_iter()
.chain(
params
Expand Down
8 changes: 6 additions & 2 deletions mirrord/kube/src/api/kubernetes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,12 @@ impl KubernetesAPI {
.into(),
};

let mut params = ContainerParams::new();
params.tls_cert = tls_cert;
let pod_ips = runtime_data
.as_ref()
.filter(|runtime_data| !runtime_data.pod_ips.is_empty())
.map(|runtime_data| runtime_data.pod_ips.join(","));

let params = ContainerParams::new(tls_cert, pod_ips);

Ok((params, runtime_data))
}
Expand Down
12 changes: 12 additions & 0 deletions mirrord/kube/src/api/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ impl Display for ContainerRuntime {
#[derive(Debug)]
pub struct RuntimeData {
pub pod_name: String,
pub pod_ips: Vec<String>,
pub pod_namespace: Option<String>,
pub node_name: String,
pub container_id: String,
Expand Down Expand Up @@ -109,6 +110,16 @@ impl RuntimeData {
.ok_or_else(|| KubeApiError::missing_field(pod, ".spec.nodeName"))?
.to_owned();

let pod_ips = pod
.status
.as_ref()
.and_then(|spec| spec.pod_ips.as_ref())
.ok_or_else(|| KubeApiError::missing_field(pod, ".status.podIPs"))?
.iter()
.filter_map(|pod_ip| pod_ip.ip.as_ref())
.cloned()
.collect();

let container_statuses = pod
.status
.as_ref()
Expand Down Expand Up @@ -155,6 +166,7 @@ impl RuntimeData {
};

Ok(RuntimeData {
pod_ips,
pod_name,
pod_namespace: pod.metadata.namespace.clone(),
node_name,
Expand Down

0 comments on commit 1323a14

Please sign in to comment.