Skip to content

Commit

Permalink
Add SQS to mirrord operator status reporting. (#2928)
Browse files Browse the repository at this point in the history
* Add SQS to mirrord operator status reporting.

* fix docs

* re-add inspect_err when operator not found status

* simplify everything, and use MirrordSqsSession instead of Registry

* add filter to print

* use proper id

* terrible loops

* froot loops

* frootier loops

* Little bit of docs for the sqs_rows

* more docs

* print table after each consumer

* extend the list of sqs

* closure fn

* remove extra into_iter

* proper merge of maps

* changelog

* Docs on session id random vs copy target.
  • Loading branch information
meowjesty authored Dec 3, 2024
1 parent 17ee312 commit 4b0d899
Show file tree
Hide file tree
Showing 8 changed files with 361 additions and 168 deletions.
1 change: 1 addition & 0 deletions changelog.d/+629-sqs-operator-status.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add SQS to mirrord operator status reporting.
10 changes: 9 additions & 1 deletion mirrord/cli/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{ffi::NulError, net::SocketAddr, path::PathBuf, str::FromStr};
use std::{ffi::NulError, net::SocketAddr, num::ParseIntError, path::PathBuf, str::FromStr};

use kube::core::ErrorResponse;
use miette::Diagnostic;
Expand Down Expand Up @@ -260,6 +260,10 @@ pub(crate) enum CliError {
#[diagnostic(transparent)]
OperatorSetupError(#[from] OperatorSetupError),

#[error("`mirrord operator status` command failed! Could not retrieve operator status API.")]
#[diagnostic(help("{GENERAL_HELP}"))]
OperatorStatusNotFound,

#[error("Failed to extract mirrord-layer to `{}`: {1}", .0.display())]
#[diagnostic(help("{GENERAL_BUG}"))]
LayerExtractError(PathBuf, std::io::Error),
Expand Down Expand Up @@ -407,6 +411,9 @@ pub(crate) enum CliError {

#[error("Couldn't resolve binary name '{0}': {1}")]
BinaryWhichError(String, String),

#[error(transparent)]
ParseInt(ParseIntError),
}

impl CliError {
Expand Down Expand Up @@ -485,6 +492,7 @@ impl From<OperatorApiError> for CliError {
Self::OperatorReturnedUnknownTargetType(error.0)
}
OperatorApiError::KubeApi(error) => Self::OperatorTargetResolution(error),
OperatorApiError::ParseInt(error) => Self::ParseInt(error),
}
}
}
Expand Down
163 changes: 9 additions & 154 deletions mirrord/cli/src/operator.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,18 @@
use std::{fs::File, path::Path, time::Duration};
use std::{fs::File, path::Path};

use futures::TryFutureExt;
use kube::{Api, Client};
use mirrord_analytics::NullReporter;
use mirrord_config::{
config::{ConfigContext, MirrordConfig},
LayerConfig, LayerFileConfig,
};
use mirrord_kube::api::kubernetes::create_kube_config;
use mirrord_operator::{
client::OperatorApi,
crd::{MirrordOperatorCrd, MirrordOperatorSpec},
crd::MirrordOperatorCrd,
setup::{LicenseType, Operator, OperatorSetup, SetupOptions},
types::LicenseInfoOwned,
};
use mirrord_progress::{Progress, ProgressTracker};
use prettytable::{row, Table};
use serde::Deserialize;
use status::StatusCommandHandler;
use tokio::fs;
use tracing::{warn, Level};

Expand Down Expand Up @@ -149,158 +145,17 @@ async fn get_status_api(config: Option<&Path>) -> CliResult<Api<MirrordOperatorC
Ok(Api::all(client))
}

#[tracing::instrument(level = Level::TRACE, ret)]
async fn operator_status(config: Option<&Path>) -> CliResult<()> {
let mut progress = ProgressTracker::from_env("Operator Status");

let layer_config = if let Some(config) = config {
let mut cfg_context = ConfigContext::default();
LayerFileConfig::from_path(config)?.generate_config(&mut cfg_context)?
} else {
LayerConfig::from_env()?
};

if !layer_config.use_proxy {
remove_proxy_env();
}

let mut status_progress = progress.subtask("fetching status");
let api = OperatorApi::try_new(&layer_config, &mut NullReporter::default())
.await
.inspect_err(|_| {
status_progress.failure(Some("failed to get status"));
})?;
let Some(api) = api else {
status_progress.failure(Some("operator not found"));
return Err(CliError::OperatorNotInstalled);
};
status_progress.success(Some("fetched status"));

progress.success(None);

let MirrordOperatorSpec {
operator_version,
default_namespace,
license:
LicenseInfoOwned {
name,
organization,
expire_at,
..
},
..
} = &api.operator().spec;

let expire_at = expire_at.format("%e-%b-%Y");

println!(
r#"
Operator version: {operator_version}
Operator default namespace: {default_namespace}
Operator License
name: {name}
organization: {organization}
expire at: {expire_at}
"#
);

let Some(status) = &api.operator().status else {
return Ok(());
};

if let Some(copy_targets) = status.copy_targets.as_ref() {
if copy_targets.is_empty() {
println!("No active copy targets.");
} else {
println!("Active Copy Targets:");
let mut copy_targets_table = Table::new();

copy_targets_table.add_row(row![
"Original Target",
"Namespace",
"Copy Pod Name",
"Scale Down?"
]);

for (pod_name, copy_target_resource) in copy_targets {
copy_targets_table.add_row(row![
copy_target_resource.spec.target.to_string(),
copy_target_resource
.metadata
.namespace
.as_deref()
.unwrap_or_default(),
pod_name,
if copy_target_resource.spec.scale_down {
"*"
} else {
""
},
]);
}

copy_targets_table.printstd();
}
println!();
}

if let Some(statistics) = status.statistics.as_ref() {
println!("Operator Daily Users: {}", statistics.dau);
println!("Operator Monthly Users: {}", statistics.mau);
}

let mut sessions = Table::new();

sessions.add_row(row![
"Session ID",
"Target",
"Namespace",
"User",
"Ports",
"Session Duration"
]);

for session in &status.sessions {
let locked_ports = session
.locked_ports
.as_deref()
.map(|ports| {
ports
.iter()
.map(|(port, type_, filter)| {
format!(
"Port: {port}, Type: {type_}{}",
filter
.as_ref()
.map(|f| format!(", Filter: {}", f))
.unwrap_or_default()
)
})
.collect::<Vec<_>>()
.join("\n")
})
.unwrap_or_default();

sessions.add_row(row![
session.id.as_deref().unwrap_or(""),
&session.target,
session.namespace.as_deref().unwrap_or("N/A"),
&session.user,
locked_ports,
humantime::format_duration(Duration::from_secs(session.duration_secs)),
]);
}

sessions.printstd();

Ok(())
}
pub(super) mod status;

/// Handle commands related to the operator `mirrord operator ...`
pub(crate) async fn operator_command(args: OperatorArgs) -> CliResult<()> {
match args.command {
OperatorCommand::Setup(params) => operator_setup(params).await.map_err(CliError::from),
OperatorCommand::Status { config_file } => operator_status(config_file.as_deref()).await,
OperatorCommand::Status { config_file } => {
StatusCommandHandler::new(config_file)
.and_then(StatusCommandHandler::handle)
.await
}
OperatorCommand::Session(session_command) => {
SessionCommandHandler::new(session_command)
.and_then(SessionCommandHandler::handle)
Expand Down
Loading

0 comments on commit 4b0d899

Please sign in to comment.