Skip to content

Commit

Permalink
change to use operator for listing targets when possible (#2437)
Browse files Browse the repository at this point in the history
* change to use operator for listing targets when possible

* add tests

* wip

* this works

* ?
  • Loading branch information
aviramha authored May 13, 2024
1 parent f5987d1 commit 02734c6
Show file tree
Hide file tree
Showing 11 changed files with 440 additions and 276 deletions.
502 changes: 268 additions & 234 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ futures = "0.3"
thiserror = "1"
k8s-openapi = { version = "0.20", features = ["v1_24"] }
reqwest = { version = "0.11", default-features = false, features = ["blocking", "rustls-tls-native-roots", "json", "socks"] }
kube = { git = "https://github.com/metalbear-co/kube", default-features = false, features = ["runtime", "derive", "client", "ws", "rustls-tls", "oidc", "socks5", "http_proxy"] }
kube = { git = "https://github.com/metalbear-co/kube", default-features = false, features = ["runtime", "derive", "client", "ws", "rustls-tls", "oidc", "socks5", "http_proxy"], branch = "fix_mb" }
trust-dns-resolver = { version = "0.22", features = ["serde-config", "tokio-runtime"] }
tokio-util = { version = "0.7", features = ["net", "codec"] }
rand = "0.8"
Expand Down
1 change: 1 addition & 0 deletions changelog.d/1959.changed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Use operator to list targets to avoid inconsistencies
1 change: 0 additions & 1 deletion mirrord-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1211,7 +1211,6 @@
},
"path": {
"description": "<!--${internal}--> Path is optional so that it can also be specified via env var instead of via conf file, but it is not optional in a resulting [`TargetConfig`] object - either there is a path, or the target configuration is `None`.",
"default": null,
"anyOf": [
{
"$ref": "#/definitions/Target"
Expand Down
116 changes: 82 additions & 34 deletions mirrord/cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![feature(let_chains)]
#![feature(lazy_cell)]
#![feature(try_blocks)]
#![warn(clippy::indexing_slicing)]

use std::{collections::HashMap, time::Duration};
Expand All @@ -21,6 +22,7 @@ use miette::JSONReportHandler;
use mirrord_analytics::{AnalyticsError, AnalyticsReporter, CollectAnalytics, Reporter};
use mirrord_config::{
config::{ConfigContext, MirrordConfig},
target::TargetDisplay,
LayerConfig, LayerFileConfig,
};
use mirrord_kube::{
Expand All @@ -30,6 +32,7 @@ use mirrord_kube::{
},
error::KubeApiError,
};
use mirrord_operator::client::OperatorApi;
use mirrord_progress::{Progress, ProgressTracker};
use operator::operator_command;
use semver::Version;
Expand Down Expand Up @@ -336,44 +339,27 @@ where
.unwrap_or_else(|_| Vec::new().into_iter())
}

/// Lists all possible target paths for pods.
/// Example: ```[
/// "pod/metalbear-deployment-85c754c75f-982p5",
/// "pod/nginx-deployment-66b6c48dd5-dc9wk",
/// "pod/py-serv-deployment-5c57fbdc98-pdbn4/container/py-serv",
/// ]```
async fn print_pod_targets(args: &ListTargetArgs) -> Result<()> {
let (accept_invalid_certificates, kubeconfig, namespace, kube_context) = if let Some(config) =
&args.config_file
{
let mut cfg_context = ConfigContext::default();
let layer_config = LayerFileConfig::from_path(config)?.generate_config(&mut cfg_context)?;
if !layer_config.use_proxy {
remove_proxy_env();
}
(
layer_config.accept_invalid_certificates,
layer_config.kubeconfig,
layer_config.target.namespace,
layer_config.kube_context,
)
} else {
(false, None, None, None)
};
async fn list_pods(layer_config: &LayerConfig, args: &ListTargetArgs) -> Result<Vec<String>> {
let client = create_kube_api(
layer_config.accept_invalid_certificates,
layer_config.kubeconfig.clone(),
layer_config.kube_context.clone(),
)
.await
.map_err(CliError::KubernetesApiFailed)?;

let client = create_kube_api(accept_invalid_certificates, kubeconfig, kube_context)
.await
.map_err(CliError::KubernetesApiFailed)?;

let namespace = args.namespace.as_deref().or(namespace.as_deref());
let namespace = args
.namespace
.as_deref()
.or(layer_config.target.namespace.as_deref());

let (pods, deployments, rollouts) = futures::try_join!(
get_kube_pods(namespace, &client),
get_kube_deployments(namespace, &client),
get_kube_rollouts(namespace, &client)
)?;

let mut target_vector = pods
Ok(pods
.iter()
.flat_map(|(pod, containers)| {
if containers.len() == 1 {
Expand All @@ -387,11 +373,73 @@ async fn print_pod_targets(args: &ListTargetArgs) -> Result<()> {
})
.chain(deployments.map(|deployment| format!("deployment/{deployment}")))
.chain(rollouts.map(|rollout| format!("rollout/{rollout}")))
.collect::<Vec<String>>();
.collect::<Vec<String>>())
}

/// Lists all possible target paths.
/// Tries to use operator if available, otherwise falls back to k8s API (if operator isn't
/// explicitly true). Example: ```[
/// "pod/metalbear-deployment-85c754c75f-982p5",
/// "pod/nginx-deployment-66b6c48dd5-dc9wk",
/// "pod/py-serv-deployment-5c57fbdc98-pdbn4/container/py-serv",
/// "deployment/nginx-deployment"
/// "deployment/nginx-deployment/container/nginx"
/// "rollout/nginx-rollout"
/// ]```
async fn print_targets(args: &ListTargetArgs) -> Result<()> {
let mut layer_config = if let Some(config) = &args.config_file {
let mut cfg_context = ConfigContext::default();
LayerFileConfig::from_path(config)?.generate_config(&mut cfg_context)?
} else {
LayerConfig::from_env()?
};

if let Some(namespace) = &args.namespace {
layer_config.target.namespace = Some(namespace.clone());
};

if !layer_config.use_proxy {
remove_proxy_env();
}

// Try operator first if relevant
let mut targets = match &layer_config.operator {
Some(true) | None => {
let operator_targets = try {
let operator_api = OperatorApi::new(&layer_config).await?;
operator_api.list_targets().await?
};
match operator_targets {
Ok(targets) => {
// adjust format to match non-operator output
targets
.iter()
.filter_map(|target_crd| {
let target = target_crd.spec.target.as_ref()?;
if let Some(container) = target.container_name() {
if !SKIP_NAMES.contains(container.as_str()) {
return None;
}
}
Some(format!("{target}"))
})
.collect::<Vec<String>>()
}
Err(e) => {
if layer_config.operator.is_some() {
error!(?e, "Operator is enabled but failed to list targets");
return Err(e);
}
list_pods(&layer_config, args).await?
}
}
}
Some(false) => list_pods(&layer_config, args).await?,
};

target_vector.sort();
targets.sort();

let json_obj = json!(target_vector);
let json_obj = json!(targets);
println!("{json_obj}");
Ok(())
}
Expand Down Expand Up @@ -427,7 +475,7 @@ fn main() -> miette::Result<()> {
false,
)?;
}
Commands::ListTargets(args) => print_pod_targets(&args).await?,
Commands::ListTargets(args) => print_targets(&args).await?,
Commands::Operator(args) => operator_command(*args).await?,
Commands::ExtensionExec(args) => {
extension_exec(*args, watch).await?;
Expand Down
30 changes: 29 additions & 1 deletion mirrord/config/src/target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl Target {
}
}

trait TargetDisplay {
pub trait TargetDisplay {
fn target_type(&self) -> &str;

fn target_name(&self) -> &str;
Expand Down Expand Up @@ -306,6 +306,34 @@ impl fmt::Display for Target {
}
}

impl TargetDisplay for Target {
fn target_type(&self) -> &str {
match self {
Target::Deployment(x) => x.target_type(),
Target::Pod(x) => x.target_type(),
Target::Rollout(x) => x.target_type(),
Target::Targetless => "targetless",
}
}

fn target_name(&self) -> &str {
match self {
Target::Deployment(x) => x.target_name(),
Target::Pod(x) => x.target_name(),
Target::Rollout(x) => x.target_name(),
Target::Targetless => "targetless",
}
}

fn container_name(&self) -> Option<&String> {
match self {
Target::Deployment(x) => x.container_name(),
Target::Pod(x) => x.container_name(),
Target::Rollout(x) => x.container_name(),
Target::Targetless => None,
}
}
}
/// <!--${internal}-->
/// Mirror the pod specified by [`PodTarget::pod`].
#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Hash, Debug, JsonSchema)]
Expand Down
22 changes: 20 additions & 2 deletions mirrord/operator/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use base64::{engine::general_purpose, Engine as _};
use chrono::{DateTime, Utc};
use futures::{SinkExt, StreamExt};
use http::request::Request;
use kube::{api::PostParams, Api, Client, Resource};
use kube::{
api::{ListParams, PostParams},
Api, Client, Resource,
};
use mirrord_analytics::{AnalyticsHash, AnalyticsOperatorProperties, Reporter};
use mirrord_auth::{
certificate::Certificate,
Expand Down Expand Up @@ -51,6 +54,7 @@ pub enum OperatorOperation {
CopyingTarget,
GettingStatus,
SessionManagement,
ListingTargets,
}

impl Display for OperatorOperation {
Expand All @@ -62,6 +66,7 @@ impl Display for OperatorOperation {
Self::CopyingTarget => "copying target",
Self::GettingStatus => "getting status",
Self::SessionManagement => "session management",
Self::ListingTargets => "listing targets",
};

f.write_str(as_str)
Expand Down Expand Up @@ -374,7 +379,7 @@ impl OperatorApi {
operator_api.connect_target(session_information).await
}

async fn new(config: &LayerConfig) -> Result<Self> {
pub async fn new(config: &LayerConfig) -> Result<Self> {
let target_config = config.target.clone();
let on_concurrent_steal = config.feature.network.incoming.on_concurrent_steal;

Expand Down Expand Up @@ -613,6 +618,19 @@ impl OperatorApi {
operation: OperatorOperation::CopyingTarget,
})
}

/// List targets using the operator
#[tracing::instrument(level = "trace", skip(self), ret)]
pub async fn list_targets(&self) -> Result<Vec<TargetCrd>> {
self.target_api
.list(&ListParams::default())
.await
.map_err(|error| OperatorApiError::KubeError {
error,
operation: OperatorOperation::ListingTargets,
})
.map(|list| list.items)
}
}

#[derive(Error, Debug)]
Expand Down
2 changes: 1 addition & 1 deletion tests/src/cli/sanity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ mod cli {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
pub async fn mirrord_ls(#[future] service: KubeService) {
let service = service.await;
let mut process = run_ls(None, None).await;
let mut process = run_ls::<false>(None, None).await;
let res = process.wait().await;
assert!(res.success());
let stdout = process.get_stdout().await;
Expand Down
1 change: 1 addition & 0 deletions tests/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@

mod concurrent_steal;
mod policies;
mod sanity;
mod setup;
27 changes: 27 additions & 0 deletions tests/src/operator/sanity.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#![cfg(test)]
#![cfg(feature = "operator")]
use std::time::Duration;

use regex::Regex;
use rstest::rstest;

use crate::utils::{config_dir, run_ls, run_verify_config, service, KubeService};

/// Tests for the `mirrord ls` command with operator
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
pub async fn mirrord_ls(#[future] service: KubeService) {
let service = service.await;
let mut process = run_ls::<true>(None, None).await;
let res = process.wait().await;
assert!(res.success());
let stdout = process.get_stdout().await;
let targets: Vec<String> = serde_json::from_str(&stdout).unwrap();
let re = Regex::new(r"^(pod|deployment)/.+(/container/.+)?$").unwrap();
targets
.iter()
.for_each(|output| assert!(re.is_match(output)));
assert!(targets
.iter()
.any(|output| output.starts_with(&format!("pod/{}", service.name))));
}
12 changes: 10 additions & 2 deletions tests/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,10 @@ pub async fn run_exec(
}

/// Runs `mirrord ls` command and asserts if the json matches the expected format
pub async fn run_ls(args: Option<Vec<&str>>, namespace: Option<&str>) -> TestProcess {
pub async fn run_ls<const USE_OPERATOR: bool>(
args: Option<Vec<&str>>,
namespace: Option<&str>,
) -> TestProcess {
let mut mirrord_args = vec!["ls"];
if let Some(args) = args {
mirrord_args.extend(args);
Expand All @@ -518,7 +521,12 @@ pub async fn run_ls(args: Option<Vec<&str>>, namespace: Option<&str>) -> TestPro
mirrord_args.extend(vec!["--namespace", namespace]);
}

run_mirrord(mirrord_args, Default::default()).await
let mut env = HashMap::new();
if USE_OPERATOR {
env.insert("MIRRORD_OPERATOR_ENABLE", "true");
};

run_mirrord(mirrord_args, env).await
}

/// Runs `mirrord verify-config [--ide] "/path/config.json"`.
Expand Down

0 comments on commit 02734c6

Please sign in to comment.