Skip to content

Commit

Permalink
Use envy
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitryDodzin committed Jul 16, 2024
1 parent 9be23ce commit 828dbbe
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 48 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions mirrord/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ drain.workspace = true
tokio-rustls = "0.26"
x509-parser = "0.16"
rustls.workspace = true
envy = "0.4"

[target.'cfg(target_os = "linux")'.dependencies]
iptables = {git = "https://github.com/metalbear-co/rust-iptables.git", rev = "e66c7332e361df3c61a194f08eefe3f40763d624"}
Expand Down
3 changes: 0 additions & 3 deletions mirrord/agent/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ pub struct Args {
/// If not given, the agent will not use TLS.
#[arg(long, env = AGENT_OPERATOR_CERT_ENV)]
pub operator_tls_cert_pem: Option<String>,

#[arg(long)]
pub pod_ips: Option<String>,
}

#[derive(Clone, Debug, Default, Subcommand)]
Expand Down
16 changes: 7 additions & 9 deletions mirrord/agent/src/entrypoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,15 +537,13 @@ async fn start_agent(args: Args) -> Result<()> {
let cancellation_token = cancellation_token.clone();
let watched_task = WatchedTask::new(
TcpConnectionStealer::TASK_NAME,
TcpConnectionStealer::new(stealer_command_rx, args.pod_ips).and_then(
|stealer| async move {
let res = stealer.start(cancellation_token).await;
if let Err(err) = res.as_ref() {
error!("Stealer failed: {err}");
}
res
},
),
TcpConnectionStealer::new(stealer_command_rx).and_then(|stealer| async move {
let res = stealer.start(cancellation_token).await;
if let Err(err) = res.as_ref() {
error!("Stealer failed: {err}");
}
res
}),
);
let status = watched_task.status();
let task = run_thread_in_namespace(
Expand Down
23 changes: 14 additions & 9 deletions mirrord/agent/src/steal/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use mirrord_protocol::{
RemoteError::{BadHttpFilterExRegex, BadHttpFilterRegex},
RequestId,
};
use serde::Deserialize;
use tokio::{
net::TcpStream,
sync::mpsc::{Receiver, Sender},
Expand Down Expand Up @@ -260,6 +261,12 @@ impl Client {
}
}

#[derive(Deserialize, Debug, Default)]
struct TcpStealerConfig {
stealer_flush_connections: bool,
pod_ips: Option<String>,
}

/// Created once per agent during initialization.
///
/// Meant to be run (see [`TcpConnectionStealer::start`]) in a separate thread while the agent
Expand All @@ -286,16 +293,14 @@ impl TcpConnectionStealer {
/// Initializes a new [`TcpConnectionStealer`], but doesn't start the actual work.
/// You need to call [`TcpConnectionStealer::start`] to do so.
#[tracing::instrument(level = "trace")]
pub(crate) async fn new(
command_rx: Receiver<StealerCommand>,
pod_ips: Option<String>,
) -> Result<Self, AgentError> {
pub(crate) async fn new(command_rx: Receiver<StealerCommand>) -> Result<Self, AgentError> {
let config = envy::prefixed("MIRRORD_AGENT_")
.from_env::<TcpStealerConfig>()
.unwrap_or_default();

let port_subscriptions = {
let flush_connections = std::env::var("MIRRORD_AGENT_STEALER_FLUSH_CONNECTIONS")
.ok()
.and_then(|var| var.parse::<bool>().ok())
.unwrap_or_default();
let redirector = IpTablesRedirector::new(flush_connections, pod_ips).await?;
let redirector =
IpTablesRedirector::new(config.stealer_flush_connections, config.pod_ips).await?;

PortSubscriptions::new(redirector, 4)
};
Expand Down
9 changes: 4 additions & 5 deletions mirrord/kube/src/api/container/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ pub(super) fn agent_env(agent: &AgentConfig, params: &&ContainerParams) -> Vec<E
env.push(("MIRRORD_AGENT_DNS_TIMEOUT".to_string(), timeout.to_string()));
};

if let Some(pod_ips) = params.pod_ips.clone() {
env.push(("MIRRORD_AGENT_POD_IPS".to_string(), pod_ips));
}

env.into_iter()
.chain(
params
Expand Down Expand Up @@ -94,11 +98,6 @@ pub(super) fn base_command_line(agent: &AgentConfig, params: &ContainerParams) -
command_line.push("--test-error".to_owned());
}

if let Some(pod_ips) = params.pod_ips.clone() {
command_line.push("--pod-ips".to_owned());
command_line.push(pod_ips);
}

command_line
}

Expand Down
22 changes: 0 additions & 22 deletions tests/python-e2e/app_flask.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import logging
import sys
import threading
import requests

log = logging.getLogger("werkzeug")
log.disabled = True
Expand All @@ -25,27 +24,6 @@ def kill_thread():
threading.Thread(target=kill_thread).start()


@app.route("/foobar", methods=["GET"])
def get_foobar():
print("GET: Request completed")
return "GET"

@app.route("/foobar", methods=["POST"])
def post_foobar():
print("POST: Request completed")

x = requests.get('http://10.1.62.94/foobar')
return x.text


@app.route("/foobar", methods=["PUT"])
def put_foobar():
print("PUT: Request completed")

x = requests.get('http://10.99.79.117/foobar')
return x.text


@app.route("/", methods=["GET"])
def get():
print("GET: Request completed")
Expand Down

0 comments on commit 828dbbe

Please sign in to comment.