From 0b0f9dd0a249fdc7dfaedf120f6801fdea71e65a Mon Sep 17 00:00:00 2001 From: Dmitry Dodzin Date: Tue, 16 Jul 2024 18:09:45 +0300 Subject: [PATCH] Don't filter outgoing traffic to target (#2588) * Rework * Use envy * Fix * Update * Fix tests --- Cargo.lock | 10 ++++++++++ changelog.d/2572.fixed.md | 1 + mirrord/agent/Cargo.toml | 1 + mirrord/agent/src/steal/connection.rs | 18 +++++++++++++----- mirrord/agent/src/steal/ip_tables.rs | 14 +++++++++----- mirrord/agent/src/steal/ip_tables/mesh.rs | 10 +++++----- mirrord/agent/src/steal/ip_tables/output.rs | 12 +++++++++--- mirrord/agent/src/steal/ip_tables/standard.rs | 4 ++-- mirrord/agent/src/steal/subscriptions.rs | 15 +++++++++++++-- mirrord/kube/src/api/container.rs | 12 ++++-------- mirrord/kube/src/api/container/job.rs | 3 +++ mirrord/kube/src/api/container/util.rs | 4 ++++ mirrord/kube/src/api/kubernetes.rs | 8 ++++++-- mirrord/kube/src/api/runtime.rs | 12 ++++++++++++ 14 files changed, 92 insertions(+), 32 deletions(-) create mode 100644 changelog.d/2572.fixed.md diff --git a/Cargo.lock b/Cargo.lock index fb046e7cdb8..002f9b5d833 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1976,6 +1976,15 @@ dependencies = [ "log", ] +[[package]] +name = "envy" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f47e0157f2cb54f5ae1bd371b30a2ae4311e1c028f575cd4e81de7353215965" +dependencies = [ + "serde", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -3866,6 +3875,7 @@ dependencies = [ "dashmap", "drain", "enum_dispatch", + "envy", "faccess", "fancy-regex", "futures", diff --git a/changelog.d/2572.fixed.md b/changelog.d/2572.fixed.md new file mode 100644 index 00000000000..f1a04b92332 --- /dev/null +++ b/changelog.d/2572.fixed.md @@ -0,0 +1 @@ +Update loopback detection to include pod ip's diff --git a/mirrord/agent/Cargo.toml b/mirrord/agent/Cargo.toml index 2628e92610e..65d9891edea 100644 --- a/mirrord/agent/Cargo.toml +++ b/mirrord/agent/Cargo.toml @@ -71,6 +71,7 @@ drain.workspace = true tokio-rustls = "0.26" x509-parser = "0.16" rustls.workspace = true +envy = "0.4" [target.'cfg(target_os = "linux")'.dependencies] iptables = { git = "https://github.com/metalbear-co/rust-iptables.git", rev = "e66c7332e361df3c61a194f08eefe3f40763d624" } diff --git a/mirrord/agent/src/steal/connection.rs b/mirrord/agent/src/steal/connection.rs index b49c61997a2..d175a58936e 100644 --- a/mirrord/agent/src/steal/connection.rs +++ b/mirrord/agent/src/steal/connection.rs @@ -22,6 +22,7 @@ use mirrord_protocol::{ RemoteError::{BadHttpFilterExRegex, BadHttpFilterRegex}, RequestId, }; +use serde::Deserialize; use tokio::{ net::TcpStream, sync::mpsc::{Receiver, Sender}, @@ -260,6 +261,12 @@ impl Client { } } +#[derive(Deserialize, Debug, Default)] +struct TcpStealerConfig { + stealer_flush_connections: bool, + pod_ips: Option, +} + /// Created once per agent during initialization. /// /// Meant to be run (see [`TcpConnectionStealer::start`]) in a separate thread while the agent @@ -287,12 +294,13 @@ impl TcpConnectionStealer { /// You need to call [`TcpConnectionStealer::start`] to do so. #[tracing::instrument(level = "trace")] pub(crate) async fn new(command_rx: Receiver) -> Result { + let config = envy::prefixed("MIRRORD_AGENT_") + .from_env::() + .unwrap_or_default(); + let port_subscriptions = { - let flush_connections = std::env::var("MIRRORD_AGENT_STEALER_FLUSH_CONNECTIONS") - .ok() - .and_then(|var| var.parse::().ok()) - .unwrap_or_default(); - let redirector = IpTablesRedirector::new(flush_connections).await?; + let redirector = + IpTablesRedirector::new(config.stealer_flush_connections, config.pod_ips).await?; PortSubscriptions::new(redirector, 4) }; diff --git a/mirrord/agent/src/steal/ip_tables.rs b/mirrord/agent/src/steal/ip_tables.rs index 9078169895d..b12440cc792 100644 --- a/mirrord/agent/src/steal/ip_tables.rs +++ b/mirrord/agent/src/steal/ip_tables.rs @@ -236,13 +236,17 @@ impl SafeIpTables where IPT: IPTables + Send + Sync, { - pub(super) async fn create(ipt: IPT, flush_connections: bool) -> Result { + pub(super) async fn create( + ipt: IPT, + flush_connections: bool, + pod_ips: Option<&str>, + ) -> Result { let ipt = Arc::new(ipt); let mut redirect = if let Some(vendor) = MeshVendor::detect(ipt.as_ref())? { - Redirects::Mesh(MeshRedirect::create(ipt.clone(), vendor)?) + Redirects::Mesh(MeshRedirect::create(ipt.clone(), vendor, pod_ips)?) } else { - match StandardRedirect::create(ipt.clone()) { + match StandardRedirect::create(ipt.clone(), pod_ips) { Err(err) => { warn!("Unable to create StandardRedirect chain: {err}"); @@ -416,7 +420,7 @@ mod tests { .times(1) .returning(|_| Ok(())); - let ipt = SafeIpTables::create(mock, false) + let ipt = SafeIpTables::create(mock, false, None) .await .expect("Create Failed"); @@ -549,7 +553,7 @@ mod tests { .times(1) .returning(|_| Ok(())); - let ipt = SafeIpTables::create(mock, false) + let ipt = SafeIpTables::create(mock, false, None) .await .expect("Create Failed"); diff --git a/mirrord/agent/src/steal/ip_tables/mesh.rs b/mirrord/agent/src/steal/ip_tables/mesh.rs index 7ce85e0e0e9..f365aeb8b8e 100644 --- a/mirrord/agent/src/steal/ip_tables/mesh.rs +++ b/mirrord/agent/src/steal/ip_tables/mesh.rs @@ -27,14 +27,14 @@ impl MeshRedirect where IPT: IPTables, { - pub fn create(ipt: Arc, vendor: MeshVendor) -> Result { + pub fn create(ipt: Arc, vendor: MeshVendor, pod_ips: Option<&str>) -> Result { let prerouteing = PreroutingRedirect::create(ipt.clone())?; for port in Self::get_skip_ports(&ipt, &vendor)? { prerouteing.add_rule(&format!("-m multiport -p tcp ! --dports {port} -j RETURN"))?; } - let output = OutputRedirect::create(ipt, IPTABLE_MESH.to_string())?; + let output = OutputRedirect::create(ipt, IPTABLE_MESH.to_string(), pod_ips)?; Ok(MeshRedirect { prerouteing, @@ -220,7 +220,7 @@ mod tests { mock.expect_insert_rule() .with( eq(IPTABLE_MESH.as_str()), - eq(format!("-m owner --gid-owner {gid} -p tcp -j RETURN")), + eq(format!("-m owner --gid-owner {gid} -p tcp -j RETURN")), eq(1), ) .times(1) @@ -245,8 +245,8 @@ mod tests { .times(1) .returning(|_| Ok(())); - let prerouting = - MeshRedirect::create(Arc::new(mock), MeshVendor::Linkerd).expect("Unable to create"); + let prerouting = MeshRedirect::create(Arc::new(mock), MeshVendor::Linkerd, None) + .expect("Unable to create"); assert!(prerouting.add_redirect(69, 420).await.is_ok()); } diff --git a/mirrord/agent/src/steal/ip_tables/output.rs b/mirrord/agent/src/steal/ip_tables/output.rs index 19ef3758753..96f21eccb32 100644 --- a/mirrord/agent/src/steal/ip_tables/output.rs +++ b/mirrord/agent/src/steal/ip_tables/output.rs @@ -20,12 +20,18 @@ where { const ENTRYPOINT: &'static str = "OUTPUT"; - pub fn create(ipt: Arc, chain_name: String) -> Result { + pub fn create(ipt: Arc, chain_name: String, pod_ips: Option<&str>) -> Result { let managed = IPTableChain::create(ipt, chain_name)?; + let exclude_source_ips = pod_ips + .map(|pod_ips| format!("! -s {pod_ips}")) + .unwrap_or_default(); + let gid = getgid(); managed - .add_rule(&format!("-m owner --gid-owner {gid} -p tcp -j RETURN")) + .add_rule(&format!( + "-m owner --gid-owner {gid} -p tcp {exclude_source_ips} -j RETURN" + )) .inspect_err(|_| { warn!("Unable to create iptable rule with \"--gid-owner {gid}\" filter") })?; @@ -34,7 +40,7 @@ where } pub fn load(ipt: Arc, chain_name: String) -> Result { - let managed = IPTableChain::create(ipt, chain_name)?; + let managed = IPTableChain::load(ipt, chain_name)?; Ok(OutputRedirect { managed }) } diff --git a/mirrord/agent/src/steal/ip_tables/standard.rs b/mirrord/agent/src/steal/ip_tables/standard.rs index 15155c2f3bb..79ac0e897ed 100644 --- a/mirrord/agent/src/steal/ip_tables/standard.rs +++ b/mirrord/agent/src/steal/ip_tables/standard.rs @@ -20,9 +20,9 @@ impl StandardRedirect where IPT: IPTables, { - pub fn create(ipt: Arc) -> Result { + pub fn create(ipt: Arc, pod_ips: Option<&str>) -> Result { let prerouteing = PreroutingRedirect::create(ipt.clone())?; - let output = OutputRedirect::create(ipt, IPTABLE_STANDARD.to_string())?; + let output = OutputRedirect::create(ipt, IPTABLE_STANDARD.to_string(), pod_ips)?; Ok(StandardRedirect { prerouteing, diff --git a/mirrord/agent/src/steal/subscriptions.rs b/mirrord/agent/src/steal/subscriptions.rs index 6ca96a81585..1fc0c04c756 100644 --- a/mirrord/agent/src/steal/subscriptions.rs +++ b/mirrord/agent/src/steal/subscriptions.rs @@ -58,6 +58,8 @@ pub(crate) struct IpTablesRedirector { redirect_to: Port, /// Listener to which redirect all connections. listener: TcpListener, + + pod_ips: Option, } impl IpTablesRedirector { @@ -73,7 +75,10 @@ impl IpTablesRedirector { /// /// * `flush_connections` - whether exisitng connections should be flushed when adding new /// redirects - pub(crate) async fn new(flush_connections: bool) -> Result { + pub(crate) async fn new( + flush_connections: bool, + pod_ips: Option, + ) -> Result { let listener = TcpListener::bind((Ipv4Addr::UNSPECIFIED, 0)).await?; let redirect_to = listener.local_addr()?.port(); @@ -82,6 +87,7 @@ impl IpTablesRedirector { flush_connections, redirect_to, listener, + pod_ips, }) } } @@ -95,7 +101,12 @@ impl PortRedirector for IpTablesRedirector { Some(iptables) => iptables, None => { let iptables = new_iptables(); - let safe = SafeIpTables::create(iptables.into(), self.flush_connections).await?; + let safe = SafeIpTables::create( + iptables.into(), + self.flush_connections, + self.pod_ips.as_deref(), + ) + .await?; self.iptables.insert(safe) } }; diff --git a/mirrord/kube/src/api/container.rs b/mirrord/kube/src/api/container.rs index ad34966a270..289b4aee49a 100644 --- a/mirrord/kube/src/api/container.rs +++ b/mirrord/kube/src/api/container.rs @@ -39,10 +39,11 @@ pub struct ContainerParams { /// Value for [`AGENT_OPERATOR_CERT_ENV`](mirrord_protocol::AGENT_OPERATOR_CERT_ENV) set in /// the agent container. pub tls_cert: Option, + pub pod_ips: Option, } impl ContainerParams { - pub fn new() -> ContainerParams { + pub fn new(tls_cert: Option, pod_ips: Option) -> ContainerParams { let port: u16 = rand::thread_rng().gen_range(30000..=65535); let gid: u16 = rand::thread_rng().gen_range(3000..u16::MAX); @@ -57,17 +58,12 @@ impl ContainerParams { name, gid, port, - tls_cert: None, + tls_cert, + pod_ips, } } } -impl Default for ContainerParams { - fn default() -> Self { - Self::new() - } -} - pub trait ContainerVariant { type Update; diff --git a/mirrord/kube/src/api/container/job.rs b/mirrord/kube/src/api/container/job.rs index 96deb958c0d..c6ebc150cb3 100644 --- a/mirrord/kube/src/api/container/job.rs +++ b/mirrord/kube/src/api/container/job.rs @@ -245,6 +245,7 @@ mod test { port: 3000, gid: 13, tls_cert: None, + pod_ips: None, }; let update = JobVariant::new(&agent, ¶ms).as_update(); @@ -327,6 +328,7 @@ mod test { port: 3000, gid: 13, tls_cert: None, + pod_ips: None, }; let update = JobTargetedVariant::new( @@ -335,6 +337,7 @@ mod test { &RuntimeData { mesh: None, pod_name: "pod".to_string(), + pod_ips: vec![], pod_namespace: None, node_name: "foobaz".to_string(), container_id: "container".to_string(), diff --git a/mirrord/kube/src/api/container/util.rs b/mirrord/kube/src/api/container/util.rs index 80b4a27a6c6..77f917378ce 100644 --- a/mirrord/kube/src/api/container/util.rs +++ b/mirrord/kube/src/api/container/util.rs @@ -63,6 +63,10 @@ pub(super) fn agent_env(agent: &AgentConfig, params: &&ContainerParams) -> Vec, pub pod_namespace: Option, pub node_name: String, pub container_id: String, @@ -109,6 +110,16 @@ impl RuntimeData { .ok_or_else(|| KubeApiError::missing_field(pod, ".spec.nodeName"))? .to_owned(); + let pod_ips = pod + .status + .as_ref() + .and_then(|spec| spec.pod_ips.as_ref()) + .ok_or_else(|| KubeApiError::missing_field(pod, ".status.podIPs"))? + .iter() + .filter_map(|pod_ip| pod_ip.ip.as_ref()) + .cloned() + .collect(); + let container_statuses = pod .status .as_ref() @@ -155,6 +166,7 @@ impl RuntimeData { }; Ok(RuntimeData { + pod_ips, pod_name, pod_namespace: pod.metadata.namespace.clone(), node_name,