Skip to content

Commit

Permalink
Add support for port forwarding (#2649)
Browse files Browse the repository at this point in the history
* Implement parsing of port-forward specific arg

* Add test for port mapping parsing

* Create and run PortForwarder from main CLI loop

* Add new and run methods to PortForwarder

* Improve PortForwardError

* Add variant to CliError containing PortForwardError

* Add BiMap for connection IDs and sockets, restructure message type matching

* Add ping/pong routine

* Complete match arm for received DaemonTcpOutgoing::Connect

* Restructure port forwarding to spawn tasks

* Spawn new task when stream received from local socket

* Add matching on internal messages from tasks to main PortForwarder loop

* Add matching on messages from agent connection

* Warn user that port forwarding is unstable

* Add basic unit test for port forwarding

* Add unit test for forwarding multiple ports

* Add analytics ExecutionKind for port forwarding

* Add changelog file

* Move complexity to functions and fix server reply handling

* Fix test race condition

* Update changelog

* Fix clippy warnings

* Apply suggestions (first batch)

* Apply suggestions (second batch)

* Move peripheral tasks to LocalConnectionTask struct

* Update tests for lazy connections

* Extract shared parameters for target to TargetParams

* Apply suggestions (third batch)

* Restructure fields in PortForwarder, add cleanup to end of each LocalConnectionTask
  • Loading branch information
gememma authored Aug 16, 2024
1 parent 9d9257b commit 38407db
Show file tree
Hide file tree
Showing 8 changed files with 1,130 additions and 17 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions changelog.d/567.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Add port forwarding feature which can be used to proxy data from a local port to a remote one -
if the local port is not specified, it will default to the same as the remote
```
mirrord port-forward [options] -L [local_port:]remote_ip:remote_port
```
2 changes: 2 additions & 0 deletions mirrord/analytics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub enum ExecutionKind {
Container = 1,
#[default]
Exec = 2,
PortForward = 3,
Other = 0,
}

Expand All @@ -43,6 +44,7 @@ impl From<u32> for ExecutionKind {
match kind {
1 => ExecutionKind::Container,
2 => ExecutionKind::Exec,
3 => ExecutionKind::PortForward,
_ => ExecutionKind::Other,
}
}
Expand Down
16 changes: 12 additions & 4 deletions mirrord/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
mirrord-operator = { path = "../operator", features = ["client", "license-fetch", "setup"] }
mirrord-operator = { path = "../operator", features = [
"client",
"license-fetch",
"setup",
] }
mirrord-progress = { path = "../progress" }
mirrord-kube = { path = "../kube" }
mirrord-config = { path = "../config" }
Expand All @@ -40,13 +44,13 @@ semver.workspace = true
exec.workspace = true
reqwest.workspace = true
const-random = "0.1.15"
tokio = { workspace = true, features = ["rt", "net", "macros", "process"]}
tokio = { workspace = true, features = ["rt", "net", "macros", "process"] }
kube.workspace = true
k8s-openapi.workspace = true
miette = { version = "7", features = ["fancy"] }
thiserror.workspace = true
humantime = "2"
nix = {workspace = true, features = ["process", "resource"]}
nix = { workspace = true, features = ["process", "resource"] }
tokio-util.workspace = true
socket2.workspace = true
drain.workspace = true
Expand All @@ -58,11 +62,15 @@ tempfile = "3"
rcgen = "0.13"
rustls-pemfile = "2"
tokio-rustls = "0.26"
tokio-stream = { workspace = true, features = ["net"] }


[target.'cfg(target_os = "macos")'.dependencies]
mirrord-sip = { path = "../sip" }


[build-dependencies]
mirrord-layer = { artifact = "cdylib", path="../layer" }
mirrord-layer = { artifact = "cdylib", path = "../layer" }

[dev-dependencies]
rstest = "0.21"
224 changes: 212 additions & 12 deletions mirrord/cli/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
#![deny(missing_docs)]

use std::{collections::HashMap, ffi::OsString, fmt::Display, path::PathBuf};
use std::{
collections::HashMap,
ffi::OsString,
fmt::Display,
net::{IpAddr, Ipv4Addr, SocketAddr},
path::PathBuf,
str::FromStr,
};

use clap::{ArgGroup, Args, Parser, Subcommand, ValueEnum, ValueHint};
use clap_complete::Shell;
use mirrord_config::MIRRORD_CONFIG_FILE_ENV;
use mirrord_operator::setup::OperatorNamespace;
use thiserror::Error;

use crate::error::CliError;

Expand Down Expand Up @@ -58,6 +66,10 @@ pub(super) enum Commands {
#[command(hide = true, name = "intproxy")]
InternalProxy,

/// Port forwarding - UNSTABLE FEATURE
#[command(name = "port-forward")]
PortForward(Box<PortForwardArgs>),

/// Verify config file without starting mirrord.
#[command(hide = true)]
VerifyConfig(VerifyConfigArgs),
Expand Down Expand Up @@ -96,15 +108,9 @@ impl Display for FsMode {
#[derive(Args, Debug)]
/// Parameters to override any values from mirrord-config as part of `exec` or `container` commands.
pub(super) struct ExecParams {
/// Target name to mirror.
/// Target can either be a deployment or a pod.
/// Valid formats: deployment/name, pod/name, pod/name/container/name
#[arg(short = 't', long)]
pub target: Option<String>,

/// Namespace of the pod to mirror. Defaults to "default".
#[arg(short = 'n', long)]
pub target_namespace: Option<String>,
/// Parameters for the target
#[clap(flatten)]
pub target: TargetParams,

/// Namespace to place agent in.
#[arg(short = 'a', long)]
Expand Down Expand Up @@ -191,7 +197,7 @@ impl ExecParams {
pub fn as_env_vars(&self) -> Result<HashMap<String, OsString>, CliError> {
let mut envs: HashMap<String, OsString> = HashMap::new();

if let Some(target) = &self.target {
if let Some(target) = &self.target.target {
envs.insert("MIRRORD_IMPERSONATED_TARGET".into(), target.into());
}

Expand All @@ -203,7 +209,7 @@ impl ExecParams {
envs.insert("MIRRORD_SKIP_PROCESSES".into(), skip_processes.into());
}

if let Some(namespace) = &self.target_namespace {
if let Some(namespace) = &self.target.target_namespace {
envs.insert("MIRRORD_TARGET_NAMESPACE".into(), namespace.into());
}

Expand Down Expand Up @@ -303,6 +309,200 @@ pub(super) struct ExecArgs {
pub(super) binary_args: Vec<String>,
}

#[derive(Args, Debug)]
pub(super) struct TargetParams {
/// Target name to mirror.
/// Target can either be a deployment or a pod.
/// Valid formats: deployment/name, pod/name, pod/name/container/name
#[arg(short = 't', long)]
pub target: Option<String>,

/// Namespace of the pod to mirror. Defaults to "default".
#[arg(short = 'n', long)]
pub target_namespace: Option<String>,
}

impl TargetParams {
pub fn as_env_vars(&self) -> Result<HashMap<String, OsString>, CliError> {
let mut envs: HashMap<String, OsString> = HashMap::new();

if let Some(target) = &self.target {
envs.insert("MIRRORD_IMPERSONATED_TARGET".into(), target.into());
}
if let Some(namespace) = &self.target_namespace {
envs.insert("MIRRORD_TARGET_NAMESPACE".into(), namespace.into());
}

Ok(envs)
}
}

#[derive(Args, Debug)]
#[command(group(ArgGroup::new("port-forward")))]
pub(super) struct PortForwardArgs {
/// Parameters for the target
#[clap(flatten)]
pub target: TargetParams,

/// Namespace to place agent in
#[arg(short = 'a', long)]
pub agent_namespace: Option<String>,

/// Agent log level
#[arg(short = 'l', long)]
pub agent_log_level: Option<String>,

/// Agent image
#[arg(short = 'i', long)]
pub agent_image: Option<String>,

/// Agent TTL
#[arg(long)]
pub agent_ttl: Option<u16>,

/// Agent Startup Timeout seconds
#[arg(long)]
pub agent_startup_timeout: Option<u16>,

/// Accept/reject invalid certificates
#[arg(short = 'c', long)]
pub accept_invalid_certificates: bool,

/// Use an Ephemeral Container to mirror traffic
#[arg(short, long)]
pub ephemeral_container: bool,

/// Disable telemetry - see <https://github.com/metalbear-co/mirrord/blob/main/TELEMETRY.md>
#[arg(long)]
pub no_telemetry: bool,

#[arg(long)]
/// Disable version check on startup
pub disable_version_check: bool,

/// Load config from config file
#[arg(short = 'f', long, value_hint = ValueHint::FilePath)]
pub config_file: Option<PathBuf>,

/// Kube context to use from Kubeconfig
#[arg(long)]
pub context: Option<String>,

/// Mappings for port forwarding
#[arg(short = 'L', long)]
pub port_mappings: Vec<PortMapping>,
}

#[derive(Clone, Debug, PartialEq)]
pub struct PortMapping {
pub local: SocketAddr,
pub remote: SocketAddr,
}

impl FromStr for PortMapping {
type Err = PortMappingParseErr;

fn from_str(string: &str) -> Result<Self, Self::Err> {
fn parse_port(string: &str, original: &str) -> Result<u16, PortMappingParseErr> {
match string.parse::<u16>() {
Ok(0) => Err(PortMappingParseErr::PortZeroInvalid(string.to_string())),
Ok(port) => Ok(port),
Err(_error) => Err(PortMappingParseErr::PortParseErr(
string.to_string(),
original.to_string(),
)),
}
}

fn parse_ip(string: &str, original: &str) -> Result<Ipv4Addr, PortMappingParseErr> {
match string.parse::<Ipv4Addr>() {
Ok(ip) => Ok(ip),
Err(_error) => Err(PortMappingParseErr::IpParseErr(
string.to_string(),
original.to_string(),
)),
}
}

// expected format = local_port:dest_server:remote_port
// alternatively, = dest_server:remote_port
let vec: Vec<&str> = string.split(':').collect();
let (local_port, remote_ip, remote_port) = match vec.as_slice() {
[local_port, remote_ip, remote_port] => {
let local_port = parse_port(local_port, string)?;
let remote_port = parse_port(remote_port, string)?;
let remote_ip = parse_ip(remote_ip, string)?;
(local_port, remote_ip, remote_port)
}
[remote_ip, remote_port] => {
let remote_port = parse_port(remote_port, string)?;
let remote_ip = parse_ip(remote_ip, string)?;
(remote_port, remote_ip, remote_port)
}
_ => {
return Err(PortMappingParseErr::InvalidFormat(string.to_string()));
}
};

Ok(Self {
local: SocketAddr::new(IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), local_port),
remote: SocketAddr::new(IpAddr::V4(remote_ip), remote_port),
})
}
}

#[derive(Error, Debug, PartialEq)]
pub enum PortMappingParseErr {
#[error("Invalid format of argument `{0}`, expected `[local-port:]remote-ipv4:remote-port`")]
InvalidFormat(String),

#[error("Failed to parse port `{0}` in argument `{1}`")]
PortParseErr(String, String),

#[error("Failed to parse IPv4 address `{0}` in argument `{1}`")]
IpParseErr(String, String),

#[error("Port `0` is not allowed in argument `{0}`")]
PortZeroInvalid(String),
}

#[cfg(test)]
mod test {
use std::str::FromStr;

use rstest::rstest;

use super::PortMapping;

#[rstest]
#[case("3030:152.37.110.132:3038", "127.0.0.1:3030", "152.37.110.132:3038")]
#[case("152.37.110.132:3038", "127.0.0.1:3038", "152.37.110.132:3038")]
fn parse_valid_mapping(
#[case] input: &str,
#[case] expected_local: &str,
#[case] expected_remote: &str,
) {
let expected = PortMapping {
local: expected_local.parse().unwrap(),
remote: expected_remote.parse().unwrap(),
};
assert_eq!(PortMapping::from_str(input).unwrap(), expected);
}

#[rstest]
#[case("3030:152.37.110.132:3038:2027")]
#[case("152.37.110.132:3030:3038")]
#[case("3030:152.37.110.132:0")]
#[case("3o3o:152.37.11o.132:3o38")]
#[case("3030:152110.132:3038")]
#[case("30303030:152.37.110.132:3038")]
#[case("")]
#[should_panic]
fn parse_invalid_mapping(#[case] input: &str) {
PortMapping::from_str(input).unwrap();
}
}

#[derive(Args, Debug)]
pub(super) struct OperatorArgs {
#[command(subcommand)]
Expand Down
6 changes: 6 additions & 0 deletions mirrord/cli/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use mirrord_operator::client::error::{HttpError, OperatorApiError, OperatorOpera
use reqwest::StatusCode;
use thiserror::Error;

use crate::port_forward::PortForwardError;

pub(crate) type Result<T, E = CliError> = core::result::Result<T, E>;

const GENERAL_HELP: &str = r#"
Expand Down Expand Up @@ -327,6 +329,10 @@ pub(crate) enum CliError {
#[error("mirrord returned a target resource of unknown type: {0}")]
#[diagnostic(help("{GENERAL_BUG}"))]
OperatorReturnedUnknownTargetType(String),

#[error("An error occurred in the port-forwarding process: {0}")]
#[diagnostic(help("{GENERAL_BUG}"))]
PortForwardingError(#[from] PortForwardError),
}

impl From<OperatorApiError> for CliError {
Expand Down
Loading

0 comments on commit 38407db

Please sign in to comment.