Skip to content

Commit

Permalink
Extend operator headers (#2550)
Browse files Browse the repository at this point in the history
* Reworked OperatorApi to be more reusable, sending extra headers with each request

* Added x-client-name and x-client-hostname headers

* Changelog entry

* Improved tracing

* is it nice?

* Fmt

* Fixed error name

* Improved progress messages

* Try to add client cert in ls

* copy_target check function renamed

* OperatorApi::try_new reads config.operator setting

* Type state machine

* Removed extra subtask
  • Loading branch information
Razz4780 authored Jul 2, 2024
1 parent 2121901 commit 63b3eed
Show file tree
Hide file tree
Showing 17 changed files with 1,252 additions and 933 deletions.
15 changes: 5 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ reqwest = { version = "0.12", default-features = false, features = [
"socks",
"http2",
] }
kube = { version = "0.92", default-features = false, features = [
kube = { git = "https://github.com/kube-rs/kube", rev = "f9902f1439b3c0baafc2ece1680644c2bfade742", default-features = false, features = [
"runtime",
"derive",
"client",
Expand All @@ -83,7 +83,7 @@ kube = { version = "0.92", default-features = false, features = [
"oidc",
"socks5",
"http-proxy",
]}
] }
hickory-resolver = { version = "0.24", features = [
"serde-config",
"tokio-runtime",
Expand Down
1 change: 1 addition & 0 deletions changelog.d/2466.internal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CLI now sends additional headers with each request to the mirrord operator.
154 changes: 71 additions & 83 deletions mirrord/cli/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@
use std::{collections::HashSet, time::Duration};

use kube::{api::GroupVersionKind, discovery, Resource};
use mirrord_analytics::Reporter;
use mirrord_config::LayerConfig;
use mirrord_intproxy::agent_conn::AgentConnectInfo;
use mirrord_kube::{
api::{
kubernetes::{create_kube_api, KubernetesAPI},
wrap_raw_connection,
},
api::{kubernetes::KubernetesAPI, wrap_raw_connection},
error::KubeApiError,
};
use mirrord_operator::{
client::{OperatorApi, OperatorApiError, OperatorOperation},
crd::MirrordOperatorCrd,
};
use mirrord_operator::client::{OperatorApi, OperatorSessionConnection};
use mirrord_progress::{
messages::MULTIPOD_WARNING, IdeAction, IdeMessage, NotificationLevel, Progress,
};
Expand All @@ -28,36 +21,73 @@ pub(crate) struct AgentConnection {
pub receiver: mpsc::Receiver<DaemonMessage>,
}

#[tracing::instrument(level = "trace", skip(config), ret, err)]
async fn check_if_operator_resource_exists(config: &LayerConfig) -> Result<bool, KubeApiError> {
let client = create_kube_api(
config.accept_invalid_certificates,
config.kubeconfig.clone(),
config.kube_context.clone(),
)
.await?;
/// 1. If mirrord-operator is explicitly enabled in the given [`LayerConfig`], makes a connection
/// with the target using the mirrord-operator.
/// 2. If mirrord-operator is explicitly disabled in the given [`LayerConfig`], returns [`None`].
/// 3. Otherwise, attempts to use the mirrord-operator and returns [`None`] in case mirrord-operator
/// is not found or its license is invalid.
async fn try_connect_using_operator<P, R>(
config: &LayerConfig,
progress: &P,
analytics: &mut R,
) -> Result<Option<OperatorSessionConnection>>
where
P: Progress,
R: Reporter,
{
let mut operator_subtask = progress.subtask("checking operator");
if config.operator == Some(false) {
operator_subtask.success(Some("operator disabled"));
return Ok(None);
}

let gvk = GroupVersionKind {
group: MirrordOperatorCrd::group(&()).into_owned(),
version: MirrordOperatorCrd::version(&()).into_owned(),
kind: MirrordOperatorCrd::kind(&()).into_owned(),
let Some(api) = OperatorApi::try_new(config, analytics).await? else {
operator_subtask.success(Some("operator not found"));
return Ok(None);
};

match discovery::oneshot::pinned_kind(&client, &gvk).await {
Ok(..) => Ok(true),
Err(kube::Error::Api(response)) if response.code == 404 => Ok(false),
Err(error) => Err(error.into()),
let mut version_cmp_subtask = operator_subtask.subtask("checking version compatibility");
let compatible = api.check_operator_version(&version_cmp_subtask);
if compatible {
version_cmp_subtask.success(Some("operator version compatible"));
} else {
version_cmp_subtask.failure(Some("operator version may not be compatible"));
}

let mut license_subtask = operator_subtask.subtask("checking license");
match api.check_license_validity(&license_subtask) {
Ok(()) => license_subtask.success(Some("operator license valid")),
Err(error) => {
license_subtask.failure(Some("operator license expired"));

if config.operator == Some(true) {
return Err(error.into());
} else {
operator_subtask.failure(Some("proceeding without operator"));
return Ok(None);
}
}
}

let mut user_cert_subtask = operator_subtask.subtask("preparing user credentials");
let api = api.prepare_client_cert(analytics).await.into_certified()?;
user_cert_subtask.success(Some("user credentials prepared"));

let mut session_subtask = operator_subtask.subtask("starting session");
let connection = api.connect_in_new_session(config, &session_subtask).await?;
session_subtask.success(Some("session started"));

operator_subtask.success(Some("using operator"));

Ok(Some(connection))
}

/// Creates an agent if needed then connects to it.
///
/// First it checks if we have an `operator` in the [`config`](LayerConfig), which we do if the
/// user has installed the mirrord-operator in their cluster, even without a valid license. And
/// then we create a session with the operator with [`OperatorApi::create_session`].
///
/// If there is no operator, or the license is not good enough for starting an operator session,
/// then we create the mirrord-agent and run mirrord by itself, without the operator.
/// 1. If mirrord-operator is explicitly enabled in the given [`LayerConfig`], makes a connection
/// with the target using the mirrord-operator.
/// 2. If mirrord-operator is explicitly disabled in the given [`LayerConfig`], creates a
/// mirrord-agent and runs session without the mirrord-operator.
/// 3. Otherwise, attempts to use the mirrord-operator and falls back to OSS flow in case
/// mirrord-operator is not found or its license is invalid.
///
/// Here is where we start interactions with the kubernetes API.
#[tracing::instrument(level = "trace", skip_all)]
Expand All @@ -69,56 +99,14 @@ pub(crate) async fn create_and_connect<P, R: Reporter>(
where
P: Progress + Send + Sync,
{
if config.operator != Some(false) {
let mut subtask = progress.subtask("checking operator");

match OperatorApi::create_session(config, &subtask, analytics).await {
Ok(session) => {
subtask.success(Some("connected to the operator"));

return Ok((
AgentConnectInfo::Operator(session.info),
AgentConnection {
sender: session.tx,
receiver: session.rx,
},
));
}

Err(OperatorApiError::NoLicense) if config.operator.is_none() => {
tracing::trace!("operator license expired");
subtask.success(Some("operator license expired"));
}

Err(
e @ OperatorApiError::KubeError {
operation: OperatorOperation::FindingOperator,
..
},
) if config.operator.is_none() => {
// We need to check if the operator is really installed or not.
match check_if_operator_resource_exists(config).await {
// Operator is installed yet we failed to use it, abort
Ok(true) => {
return Err(e.into());
}
// Operator is not installed, fallback to OSS
Ok(false) => {
subtask.success(Some("operator not found"));
}
// We don't know if operator is installed or not,
// prompt a warning and fallback to OSS
Err(error) => {
let message = "failed to check if operator is installed";
tracing::debug!(%error, message);
subtask.warning(message);
subtask.success(Some("operator not found"));
}
}
}

Err(e) => return Err(e.into()),
}
if let Some(connection) = try_connect_using_operator(config, progress, analytics).await? {
return Ok((
AgentConnectInfo::Operator(connection.session),
AgentConnection {
sender: connection.tx,
receiver: connection.rx,
},
));
}

if config.feature.copy_target.enabled {
Expand Down
9 changes: 7 additions & 2 deletions mirrord/cli/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use mirrord_config::config::ConfigError;
use mirrord_console::error::ConsoleError;
use mirrord_intproxy::error::IntProxyError;
use mirrord_kube::error::KubeApiError;
use mirrord_operator::client::{HttpError, OperatorApiError, OperatorOperation};
use mirrord_operator::client::error::{HttpError, OperatorApiError, OperatorOperation};
use reqwest::StatusCode;
use thiserror::Error;

Expand Down Expand Up @@ -232,6 +232,10 @@ pub(crate) enum CliError {
"This usually means that connectivity was lost while pinging.{GENERAL_HELP}"
))]
PingPongFailed(String),

#[error("Failed to prepare mirrord operator client certificate: {0}")]
#[diagnostic(help("{GENERAL_BUG}"))]
OperatorClientCertError(String),
}

impl From<OperatorApiError> for CliError {
Expand All @@ -244,7 +248,7 @@ impl From<OperatorApiError> for CliError {
feature,
operator_version,
},
OperatorApiError::CreateApiError(e) => Self::CreateKubeApiFailed(e),
OperatorApiError::CreateKubeClient(e) => Self::CreateKubeApiFailed(e),
OperatorApiError::ConnectRequestBuildError(e) => Self::ConnectRequestBuildError(e),
OperatorApiError::KubeError {
error: kube::Error::Api(ErrorResponse { message, code, .. }),
Expand All @@ -269,6 +273,7 @@ impl From<OperatorApiError> for CliError {
Self::OperatorApiFailed(operation, error)
}
OperatorApiError::NoLicense => Self::OperatorLicenseExpired,
OperatorApiError::ClientCertError(error) => Self::OperatorClientCertError(error),
}
}
}
Loading

0 comments on commit 63b3eed

Please sign in to comment.