Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't filter outgoing traffic to target #2588

Merged
merged 6 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions changelog.d/2572.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Update loopback detection to include pod ip's
1 change: 1 addition & 0 deletions mirrord/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ drain.workspace = true
tokio-rustls = "0.26"
x509-parser = "0.16"
rustls.workspace = true
envy = "0.4"

[target.'cfg(target_os = "linux")'.dependencies]
iptables = { git = "https://github.com/metalbear-co/rust-iptables.git", rev = "e66c7332e361df3c61a194f08eefe3f40763d624" }
Expand Down
18 changes: 13 additions & 5 deletions mirrord/agent/src/steal/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use mirrord_protocol::{
RemoteError::{BadHttpFilterExRegex, BadHttpFilterRegex},
RequestId,
};
use serde::Deserialize;
use tokio::{
net::TcpStream,
sync::mpsc::{Receiver, Sender},
Expand Down Expand Up @@ -260,6 +261,12 @@ impl Client {
}
}

#[derive(Deserialize, Debug, Default)]
struct TcpStealerConfig {
stealer_flush_connections: bool,
pod_ips: Option<String>,
}

/// Created once per agent during initialization.
///
/// Meant to be run (see [`TcpConnectionStealer::start`]) in a separate thread while the agent
Expand Down Expand Up @@ -287,12 +294,13 @@ impl TcpConnectionStealer {
/// You need to call [`TcpConnectionStealer::start`] to do so.
#[tracing::instrument(level = "trace")]
pub(crate) async fn new(command_rx: Receiver<StealerCommand>) -> Result<Self, AgentError> {
let config = envy::prefixed("MIRRORD_AGENT_")
.from_env::<TcpStealerConfig>()
.unwrap_or_default();

let port_subscriptions = {
let flush_connections = std::env::var("MIRRORD_AGENT_STEALER_FLUSH_CONNECTIONS")
.ok()
.and_then(|var| var.parse::<bool>().ok())
.unwrap_or_default();
let redirector = IpTablesRedirector::new(flush_connections).await?;
let redirector =
IpTablesRedirector::new(config.stealer_flush_connections, config.pod_ips).await?;

PortSubscriptions::new(redirector, 4)
};
Expand Down
14 changes: 9 additions & 5 deletions mirrord/agent/src/steal/ip_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,13 +236,17 @@ impl<IPT> SafeIpTables<IPT>
where
IPT: IPTables + Send + Sync,
{
pub(super) async fn create(ipt: IPT, flush_connections: bool) -> Result<Self> {
pub(super) async fn create(
ipt: IPT,
flush_connections: bool,
pod_ips: Option<&str>,
) -> Result<Self> {
let ipt = Arc::new(ipt);

let mut redirect = if let Some(vendor) = MeshVendor::detect(ipt.as_ref())? {
Redirects::Mesh(MeshRedirect::create(ipt.clone(), vendor)?)
Redirects::Mesh(MeshRedirect::create(ipt.clone(), vendor, pod_ips)?)
} else {
match StandardRedirect::create(ipt.clone()) {
match StandardRedirect::create(ipt.clone(), pod_ips) {
Err(err) => {
warn!("Unable to create StandardRedirect chain: {err}");

Expand Down Expand Up @@ -416,7 +420,7 @@ mod tests {
.times(1)
.returning(|_| Ok(()));

let ipt = SafeIpTables::create(mock, false)
let ipt = SafeIpTables::create(mock, false, None)
.await
.expect("Create Failed");

Expand Down Expand Up @@ -549,7 +553,7 @@ mod tests {
.times(1)
.returning(|_| Ok(()));

let ipt = SafeIpTables::create(mock, false)
let ipt = SafeIpTables::create(mock, false, None)
.await
.expect("Create Failed");

Expand Down
10 changes: 5 additions & 5 deletions mirrord/agent/src/steal/ip_tables/mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ impl<IPT> MeshRedirect<IPT>
where
IPT: IPTables,
{
pub fn create(ipt: Arc<IPT>, vendor: MeshVendor) -> Result<Self> {
pub fn create(ipt: Arc<IPT>, vendor: MeshVendor, pod_ips: Option<&str>) -> Result<Self> {
let prerouteing = PreroutingRedirect::create(ipt.clone())?;

for port in Self::get_skip_ports(&ipt, &vendor)? {
prerouteing.add_rule(&format!("-m multiport -p tcp ! --dports {port} -j RETURN"))?;
}

let output = OutputRedirect::create(ipt, IPTABLE_MESH.to_string())?;
let output = OutputRedirect::create(ipt, IPTABLE_MESH.to_string(), pod_ips)?;

Ok(MeshRedirect {
prerouteing,
Expand Down Expand Up @@ -220,7 +220,7 @@ mod tests {
mock.expect_insert_rule()
.with(
eq(IPTABLE_MESH.as_str()),
eq(format!("-m owner --gid-owner {gid} -p tcp -j RETURN")),
eq(format!("-m owner --gid-owner {gid} -p tcp -j RETURN")),
eq(1),
)
.times(1)
Expand All @@ -245,8 +245,8 @@ mod tests {
.times(1)
.returning(|_| Ok(()));

let prerouting =
MeshRedirect::create(Arc::new(mock), MeshVendor::Linkerd).expect("Unable to create");
let prerouting = MeshRedirect::create(Arc::new(mock), MeshVendor::Linkerd, None)
.expect("Unable to create");

assert!(prerouting.add_redirect(69, 420).await.is_ok());
}
Expand Down
12 changes: 9 additions & 3 deletions mirrord/agent/src/steal/ip_tables/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@ where
{
const ENTRYPOINT: &'static str = "OUTPUT";

pub fn create(ipt: Arc<IPT>, chain_name: String) -> Result<Self> {
pub fn create(ipt: Arc<IPT>, chain_name: String, pod_ips: Option<&str>) -> Result<Self> {
let managed = IPTableChain::create(ipt, chain_name)?;

let exclude_source_ips = pod_ips
.map(|pod_ips| format!("! -s {pod_ips}"))
.unwrap_or_default();
Razz4780 marked this conversation as resolved.
Show resolved Hide resolved

let gid = getgid();
managed
.add_rule(&format!("-m owner --gid-owner {gid} -p tcp -j RETURN"))
.add_rule(&format!(
"-m owner --gid-owner {gid} -p tcp {exclude_source_ips} -j RETURN"
))
.inspect_err(|_| {
warn!("Unable to create iptable rule with \"--gid-owner {gid}\" filter")
})?;
Expand All @@ -34,7 +40,7 @@ where
}

pub fn load(ipt: Arc<IPT>, chain_name: String) -> Result<Self> {
let managed = IPTableChain::create(ipt, chain_name)?;
let managed = IPTableChain::load(ipt, chain_name)?;

Ok(OutputRedirect { managed })
}
Expand Down
4 changes: 2 additions & 2 deletions mirrord/agent/src/steal/ip_tables/standard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ impl<IPT> StandardRedirect<IPT>
where
IPT: IPTables,
{
pub fn create(ipt: Arc<IPT>) -> Result<Self> {
pub fn create(ipt: Arc<IPT>, pod_ips: Option<&str>) -> Result<Self> {
let prerouteing = PreroutingRedirect::create(ipt.clone())?;
let output = OutputRedirect::create(ipt, IPTABLE_STANDARD.to_string())?;
let output = OutputRedirect::create(ipt, IPTABLE_STANDARD.to_string(), pod_ips)?;

Ok(StandardRedirect {
prerouteing,
Expand Down
15 changes: 13 additions & 2 deletions mirrord/agent/src/steal/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ pub(crate) struct IpTablesRedirector {
redirect_to: Port,
/// Listener to which redirect all connections.
listener: TcpListener,

pod_ips: Option<String>,
}

impl IpTablesRedirector {
Expand All @@ -73,7 +75,10 @@ impl IpTablesRedirector {
///
/// * `flush_connections` - whether exisitng connections should be flushed when adding new
/// redirects
pub(crate) async fn new(flush_connections: bool) -> Result<Self, AgentError> {
pub(crate) async fn new(
flush_connections: bool,
pod_ips: Option<String>,
) -> Result<Self, AgentError> {
let listener = TcpListener::bind((Ipv4Addr::UNSPECIFIED, 0)).await?;
let redirect_to = listener.local_addr()?.port();

Expand All @@ -82,6 +87,7 @@ impl IpTablesRedirector {
flush_connections,
redirect_to,
listener,
pod_ips,
})
}
}
Expand All @@ -95,7 +101,12 @@ impl PortRedirector for IpTablesRedirector {
Some(iptables) => iptables,
None => {
let iptables = new_iptables();
let safe = SafeIpTables::create(iptables.into(), self.flush_connections).await?;
let safe = SafeIpTables::create(
iptables.into(),
self.flush_connections,
self.pod_ips.as_deref(),
)
.await?;
self.iptables.insert(safe)
}
};
Expand Down
12 changes: 4 additions & 8 deletions mirrord/kube/src/api/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ pub struct ContainerParams {
/// Value for [`AGENT_OPERATOR_CERT_ENV`](mirrord_protocol::AGENT_OPERATOR_CERT_ENV) set in
/// the agent container.
pub tls_cert: Option<String>,
pub pod_ips: Option<String>,
}

impl ContainerParams {
pub fn new() -> ContainerParams {
pub fn new(tls_cert: Option<String>, pod_ips: Option<String>) -> ContainerParams {
let port: u16 = rand::thread_rng().gen_range(30000..=65535);
let gid: u16 = rand::thread_rng().gen_range(3000..u16::MAX);

Expand All @@ -57,17 +58,12 @@ impl ContainerParams {
name,
gid,
port,
tls_cert: None,
tls_cert,
pod_ips,
}
}
}

impl Default for ContainerParams {
fn default() -> Self {
Self::new()
}
}

pub trait ContainerVariant {
type Update;

Expand Down
3 changes: 3 additions & 0 deletions mirrord/kube/src/api/container/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ mod test {
port: 3000,
gid: 13,
tls_cert: None,
pod_ips: None,
};

let update = JobVariant::new(&agent, &params).as_update();
Expand Down Expand Up @@ -327,6 +328,7 @@ mod test {
port: 3000,
gid: 13,
tls_cert: None,
pod_ips: None,
};

let update = JobTargetedVariant::new(
Expand All @@ -335,6 +337,7 @@ mod test {
&RuntimeData {
mesh: None,
pod_name: "pod".to_string(),
pod_ips: vec![],
pod_namespace: None,
node_name: "foobaz".to_string(),
container_id: "container".to_string(),
Expand Down
4 changes: 4 additions & 0 deletions mirrord/kube/src/api/container/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ pub(super) fn agent_env(agent: &AgentConfig, params: &&ContainerParams) -> Vec<E
env.push(("MIRRORD_AGENT_DNS_TIMEOUT".to_string(), timeout.to_string()));
};

if let Some(pod_ips) = params.pod_ips.clone() {
env.push(("MIRRORD_AGENT_POD_IPS".to_string(), pod_ips));
}

env.into_iter()
.chain(
params
Expand Down
8 changes: 6 additions & 2 deletions mirrord/kube/src/api/kubernetes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,12 @@ impl KubernetesAPI {
.into(),
};

let mut params = ContainerParams::new();
params.tls_cert = tls_cert;
let pod_ips = runtime_data
.as_ref()
.filter(|runtime_data| !runtime_data.pod_ips.is_empty())
.map(|runtime_data| runtime_data.pod_ips.join(","));

let params = ContainerParams::new(tls_cert, pod_ips);

Ok((params, runtime_data))
}
Expand Down
12 changes: 12 additions & 0 deletions mirrord/kube/src/api/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ impl Display for ContainerRuntime {
#[derive(Debug)]
pub struct RuntimeData {
pub pod_name: String,
pub pod_ips: Vec<String>,
pub pod_namespace: Option<String>,
pub node_name: String,
pub container_id: String,
Expand Down Expand Up @@ -109,6 +110,16 @@ impl RuntimeData {
.ok_or_else(|| KubeApiError::missing_field(pod, ".spec.nodeName"))?
.to_owned();

let pod_ips = pod
.status
.as_ref()
.and_then(|spec| spec.pod_ips.as_ref())
.ok_or_else(|| KubeApiError::missing_field(pod, ".status.podIPs"))?
.iter()
.filter_map(|pod_ip| pod_ip.ip.as_ref())
.cloned()
.collect();

let container_statuses = pod
.status
.as_ref()
Expand Down Expand Up @@ -155,6 +166,7 @@ impl RuntimeData {
};

Ok(RuntimeData {
pod_ips,
pod_name,
pod_namespace: pod.metadata.namespace.clone(),
node_name,
Expand Down
Loading