Skip to content

Commit

Permalink
Improve operator detection + errors in CLI (#2520)
Browse files Browse the repository at this point in the history
* errors fixed in CLI, operator resource checked

* Changelog entry

* Progress messages fixed

* Operator rejections handled

* Cargo.lock

* Docs fixed

* Use 'message' instead of 'reason'

* Fixed agent keepalive

* ...

* Forcing agent connection in intproxy

* CR suggestions
  • Loading branch information
Razz4780 authored Jun 19, 2024
1 parent 5a7b846 commit 639d5e4
Show file tree
Hide file tree
Showing 27 changed files with 750 additions and 708 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions changelog.d/2487.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mirrord now queries kube discovery API to confirm that mirrord operator is not installed (when an attempt to use operator API fails).
16 changes: 8 additions & 8 deletions mirrord/cli/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ pub(super) enum OperatorCommand {
/// Print operator status
Status {
/// Specify config file to use
#[arg(short = 'f')]
config_file: Option<String>,
#[arg(short = 'f', long, value_hint = ValueHint::FilePath)]
config_file: Option<PathBuf>,
},
/// Operator session management commands.
///
Expand Down Expand Up @@ -289,15 +289,15 @@ pub(super) struct ListTargetArgs {
pub namespace: Option<String>,

/// Specify config file to use
#[arg(short = 'f')]
pub config_file: Option<String>,
#[arg(short = 'f', long, value_hint = ValueHint::FilePath)]
pub config_file: Option<PathBuf>,
}

#[derive(Args, Debug)]
pub(super) struct ExtensionExecArgs {
/// Specify config file to use
#[arg(short = 'f')]
pub config_file: Option<String>,
#[arg(short = 'f', long, value_hint = ValueHint::FilePath)]
pub config_file: Option<PathBuf>,
/// Specify target
#[arg(short = 't')]
pub target: Option<String>,
Expand Down Expand Up @@ -335,7 +335,7 @@ pub(super) enum DiagnoseCommand {
/// Check network connectivity and provide RTT (latency) statistics.
Latency {
/// Specify config file to use
#[arg(short = 'f')]
config_file: Option<String>,
#[arg(short = 'f', long, value_hint = ValueHint::FilePath)]
config_file: Option<PathBuf>,
},
}
110 changes: 62 additions & 48 deletions mirrord/cli/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
use std::{collections::HashSet, time::Duration};

use kube::{api::GroupVersionKind, discovery, Resource};
use mirrord_analytics::Reporter;
use mirrord_config::{feature::network::outgoing::OutgoingFilterConfig, LayerConfig};
use mirrord_config::LayerConfig;
use mirrord_intproxy::agent_conn::AgentConnectInfo;
use mirrord_kube::api::{kubernetes::KubernetesAPI, wrap_raw_connection};
use mirrord_operator::client::{OperatorApi, OperatorApiError, OperatorOperation};
use mirrord_kube::{
api::{
kubernetes::{create_kube_api, KubernetesAPI},
wrap_raw_connection,
},
error::KubeApiError,
};
use mirrord_operator::{
client::{OperatorApi, OperatorApiError, OperatorOperation},
crd::MirrordOperatorCrd,
};
use mirrord_progress::{
messages::MULTIPOD_WARNING, IdeAction, IdeMessage, NotificationLevel, Progress,
};
Expand All @@ -18,29 +28,31 @@ pub(crate) struct AgentConnection {
pub receiver: mpsc::Receiver<DaemonMessage>,
}

trait OperatorApiErrorExt {
/// Whether this error should abort the execution, even if the user did not specify whether to
/// use the operator or not.
fn should_abort_cli(&self) -> bool;
}

impl OperatorApiErrorExt for OperatorApiError {
fn should_abort_cli(&self) -> bool {
match self {
// Various kube errors can happen due to RBAC if the operator is not installed.
Self::KubeError {
operation: OperatorOperation::FindingOperator,
..
} => false,
// Fallback to OSS if license is expired
Self::NoLicense => false,
// These should either never happen or can happen only if the operator is installed.
Self::ConnectRequestBuildError(..)
| Self::CreateApiError(..)
| Self::InvalidTarget { .. }
| Self::UnsupportedFeature { .. }
| Self::StatusFailure { .. }
| Self::KubeError { .. } => true,
#[tracing::instrument(level = "trace", skip(config), ret, err)]
async fn check_if_operator_resource_exists(config: &LayerConfig) -> Result<bool> {
let client = create_kube_api(
config.accept_invalid_certificates,
config.kubeconfig.clone(),
config.kube_context.clone(),
)
.await
.map_err(CliError::OperatorInstallationCheckError)?;

let gvk = GroupVersionKind {
group: MirrordOperatorCrd::group(&()).into_owned(),
version: MirrordOperatorCrd::version(&()).into_owned(),
kind: MirrordOperatorCrd::kind(&()).into_owned(),
};

match discovery::oneshot::pinned_kind(&client, &gvk).await {
Ok(..) => Ok(true),
Err(kube::Error::Api(response)) if response.code == 404 => Ok(false),
Err(error) => {
tracing::trace!(
?error,
"Failed to check if operator is installed in the cluster"
);
Err(CliError::OperatorInstallationCheckError(error.into()))
}
}
}
Expand All @@ -64,18 +76,6 @@ pub(crate) async fn create_and_connect<P, R: Reporter>(
where
P: Progress + Send + Sync,
{
if let Some(outgoing_filter) = &config.feature.network.outgoing.filter {
if matches!(outgoing_filter, OutgoingFilterConfig::Remote(_)) && !config.feature.network.dns
{
progress.warning(
"The mirrord outgoing traffic filter includes host names to be connected remotely,\
but the remote DNS feature is disabled, so the addresses of these hosts will be\
resolved locally!\n\
> Consider enabling the remote DNS resolution feature.",
);
}
}

if config.operator != Some(false) {
let mut subtask = progress.subtask("checking operator");

Expand All @@ -91,16 +91,30 @@ where
},
));
}
Err(e) if config.operator == Some(true) || e.should_abort_cli() => return Err(e.into()),
Err(e) => {
tracing::trace!("{}", CliError::from(e));
subtask.success(Some("operator not found"));

Err(OperatorApiError::NoLicense) if config.operator.is_none() => {
tracing::trace!("operator license expired");
subtask.success(Some("operator license expired"));
}

Err(
e @ OperatorApiError::KubeError {
operation: OperatorOperation::FindingOperator,
..
},
) if config.operator.is_none() => {
// We need to check if the operator is really installed or not.
if check_if_operator_resource_exists(config).await? {
return Err(e.into());
}
}

Err(e) => return Err(e.into()),
}
}

if config.feature.copy_target.enabled {
return Err(CliError::FeatureRequiresOperatorError("copy target".into()));
return Err(CliError::FeatureRequiresOperatorError("copy_target".into()));
}

if matches!(
Expand Down Expand Up @@ -136,18 +150,18 @@ where

let k8s_api = KubernetesAPI::create(config)
.await
.map_err(CliError::KubernetesApiFailed)?;
.map_err(CliError::CreateAgentFailed)?;

let _ = k8s_api.detect_openshift(progress).await.map_err(|err| {
tracing::debug!("couldn't determine OpenShift: {err}");
});
if let Err(error) = k8s_api.detect_openshift(progress).await {
tracing::debug!(?error, "Failed to detect OpenShift");
};

let agent_connect_info = tokio::time::timeout(
Duration::from_secs(config.agent.startup_timeout),
k8s_api.create_agent(progress, &config.target, Some(config), Default::default()),
)
.await
.map_err(|_| CliError::AgentReadyTimeout)?
.unwrap_or(Err(KubeApiError::AgentReadyTimeout))
.map_err(CliError::CreateAgentFailed)?;

let (sender, receiver) = wrap_raw_connection(
Expand Down
55 changes: 29 additions & 26 deletions mirrord/cli/src/diagnose.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::time::Duration;
use std::{path::Path, time::Duration};

use mirrord_analytics::NullReporter;
use mirrord_config::{
Expand All @@ -10,31 +10,43 @@ use mirrord_protocol::{ClientMessage, DaemonMessage};
use tokio::{sync::mpsc, time::Instant};

use crate::{
connection::create_and_connect, util::remove_proxy_env, DiagnoseArgs, DiagnoseCommand, Result,
connection::create_and_connect, util::remove_proxy_env, CliError, DiagnoseArgs,
DiagnoseCommand, Result,
};

/// Sends a ping the connection and expects a pong.
async fn ping(
sender: &mpsc::Sender<ClientMessage>,
receiver: &mut mpsc::Receiver<DaemonMessage>,
) -> Result<()> {
sender
.send(ClientMessage::Ping)
.await
.map_err(|_| crate::CliError::CantSendPing)?;
sender.send(ClientMessage::Ping).await.map_err(|_| {
CliError::PingPongFailed(
"failed to send ping message - agent unexpectedly closed connection".to_string(),
)
})?;

loop {
match receiver.recv().await {
Some(DaemonMessage::Pong) => break Ok(()),
Some(DaemonMessage::LogMessage(..)) => {}
_ => break Err(crate::CliError::InvalidPingResponse),
}
let result = match receiver.recv().await {
Some(DaemonMessage::Pong) => Ok(()),
Some(DaemonMessage::LogMessage(..)) => continue,
Some(DaemonMessage::Close(message)) => Err(CliError::PingPongFailed(format!(
"agent closed connection with message: {message}"
))),
Some(message) => Err(CliError::PingPongFailed(format!(
"agent sent an unexpected message: {message:?}"
))),
None => Err(CliError::PingPongFailed(
"agent unexpectedly closed connection".to_string(),
)),
};

return result;
}
}

/// Create a targetless session and run pings to diagnose network latency.
#[tracing::instrument(level = "trace", ret)]
async fn diagnose_latency(config: Option<String>) -> Result<()> {
async fn diagnose_latency(config: Option<&Path>) -> Result<()> {
let mut progress = ProgressTracker::from_env("mirrord network diagnosis");

let mut cfg_context = ConfigContext::default();
Expand Down Expand Up @@ -70,32 +82,23 @@ async fn diagnose_latency(config: Option<String>) -> Result<()> {
statistics.push(elapsed);
}

let min = statistics
.iter()
.min()
.map(|d| d.as_millis().to_string())
.unwrap_or("N/A".to_string());
let max = statistics
.iter()
.max()
.map(|d| d.as_millis().to_string())
.unwrap_or("N/A".to_string());
let avg: String = (statistics.iter().sum::<Duration>() / statistics.len() as u32)
.as_millis()
.to_string();
let min = statistics.iter().min().expect("never empty").as_millis();
let max = statistics.iter().max().expect("never empty").as_millis();
let avg = (statistics.iter().sum::<Duration>() / statistics.len() as u32).as_millis();
progress.success(Some(
format!(
"Latency statistics: min={}ms, max={}ms, avg={}ms",
min, max, avg
)
.as_str(),
));

Ok(())
}

/// Handle commands related to the operator `mirrord diagnose ...`
pub(crate) async fn diagnose_command(args: DiagnoseArgs) -> Result<()> {
match args.command {
DiagnoseCommand::Latency { config_file } => diagnose_latency(config_file).await,
DiagnoseCommand::Latency { config_file } => diagnose_latency(config_file.as_deref()).await,
}
}
Loading

0 comments on commit 639d5e4

Please sign in to comment.