diff --git a/changelog.d/2066.added.md b/changelog.d/2066.added.md new file mode 100644 index 00000000000..cb6f7a7f102 --- /dev/null +++ b/changelog.d/2066.added.md @@ -0,0 +1 @@ +Client side support for the upcoming SQS queue splitting support in *mirrord for Teams*. diff --git a/mirrord-schema.json b/mirrord-schema.json index 3ed7bfe4d7a..e456db15f34 100644 --- a/mirrord-schema.json +++ b/mirrord-schema.json @@ -882,6 +882,18 @@ "type": "null" } ] + }, + "split_queues": { + "title": "feature.split_queues {#feature-split_queues}", + "description": "Define filters to split queues by, and make your local application consume only messages that match those filters. If you don't specify any filter for a queue that is however declared in the `MirrordWorkloadQueueRegistry` of the target you're using, a match-nothing filter will be used, and your local application will not receive any messages from that queue.", + "anyOf": [ + { + "$ref": "#/definitions/SplitQueuesConfig" + }, + { + "type": "null" + } + ] } }, "additionalProperties": false @@ -1431,6 +1443,33 @@ } ] }, + "QueueFilter": { + "description": "More queue types might be added in the future.", + "oneOf": [ + { + "description": "Amazon Simple Queue Service.", + "type": "object", + "required": [ + "message_filter", + "queue_type" + ], + "properties": { + "message_filter": { + "type": "object", + "additionalProperties": { + "type": "string" + } + }, + "queue_type": { + "type": "string", + "enum": [ + "SQS" + ] + } + } + } + ] + }, "RolloutTarget": { "description": " Mirror the rollout specified by [`RolloutTarget::rollout`].", "type": "object", @@ -1451,6 +1490,16 @@ }, "additionalProperties": false }, + "SplitQueuesConfig": { + "description": "```json { \"feature\": { \"split_queues\": { \"first-queue\": { \"queue_type\": \"SQS\", \"message_filter\": { \"wows\": \"so wows\", \"coolz\": \"^very .*\" } }, \"second-queue\": { \"queue_type\": \"SQS\", \"message_filter\": { \"who\": \"*you$\" } }, } } } ```", + "type": [ + "object", + "null" + ], + "additionalProperties": { + "$ref": "#/definitions/QueueFilter" + } + }, "StatefulSetTarget": { "type": "object", "required": [ diff --git a/mirrord/cli/src/config.rs b/mirrord/cli/src/config.rs index 799b6de2f62..a1482decd03 100644 --- a/mirrord/cli/src/config.rs +++ b/mirrord/cli/src/config.rs @@ -536,6 +536,26 @@ pub(super) enum OperatorCommand { /// will be able to access) #[arg(short, long, default_value = "mirrord")] namespace: OperatorNamespace, + + /// AWS role ARN for the operator's service account. + /// Necessary for enabling SQS queue splitting. + /// For successfully running an SQS queue splitting operator the given IAM role must be + /// able to create, read from, write to, and delete SQS queues. + /// If the queue messages are encrypted using KMS, the operator also needs the + /// `kms:Encrypt`, `kms:Decrypt` and `kms:GenerateDataKey` permissions. + #[arg(long, visible_alias = "arn")] + aws_role_arn: Option, + + /// Enable SQS queue splitting. + /// When set, some extra CRDs will be installed on the cluster, and the operator will run + /// an SQS splitting component. + #[arg( + long, + visible_alias = "sqs", + default_value_t = false, + requires = "aws_role_arn" + )] + sqs_splitting: bool, }, /// Print operator status Status { diff --git a/mirrord/cli/src/error.rs b/mirrord/cli/src/error.rs index 0958785d712..d0fb6ce11b0 100644 --- a/mirrord/cli/src/error.rs +++ b/mirrord/cli/src/error.rs @@ -342,7 +342,7 @@ impl From for CliError { feature, operator_version, } => Self::FeatureNotSupportedInOperatorError { - feature, + feature: feature.to_string(), operator_version, }, OperatorApiError::CreateKubeClient(e) => Self::CreateKubeApiFailed(e), diff --git a/mirrord/cli/src/operator.rs b/mirrord/cli/src/operator.rs index f7a255dc5b1..45b11ffeb58 100644 --- a/mirrord/cli/src/operator.rs +++ b/mirrord/cli/src/operator.rs @@ -59,6 +59,8 @@ async fn operator_setup( namespace: OperatorNamespace, license_key: Option, license_path: Option, + aws_role_arn: Option, + sqs_splitting: bool, ) -> Result<(), OperatorSetupError> { if !accept_tos { eprintln!("Please note that mirrord operator installation requires an active subscription for the mirrord Operator provided by MetalBear Tech LTD.\nThe service ToS can be read here - https://metalbear.co/legal/terms\nPass --accept-tos to accept the TOS"); @@ -101,6 +103,8 @@ async fn operator_setup( license, namespace, image, + aws_role_arn, + sqs_splitting, }); match file { @@ -299,9 +303,19 @@ pub(crate) async fn operator_command(args: OperatorArgs) -> Result<()> { namespace, license_key, license_path, - } => operator_setup(accept_tos, file, namespace, license_key, license_path) - .await - .map_err(CliError::from), + aws_role_arn, + sqs_splitting, + } => operator_setup( + accept_tos, + file, + namespace, + license_key, + license_path, + aws_role_arn, + sqs_splitting, + ) + .await + .map_err(CliError::from), OperatorCommand::Status { config_file } => operator_status(config_file.as_deref()).await, OperatorCommand::Session(session_command) => { SessionCommandHandler::new(session_command) diff --git a/mirrord/cli/src/operator/session.rs b/mirrord/cli/src/operator/session.rs index a89d8448453..dbadb691e27 100644 --- a/mirrord/cli/src/operator/session.rs +++ b/mirrord/cli/src/operator/session.rs @@ -6,7 +6,7 @@ use mirrord_operator::{ error::{OperatorApiError, OperatorOperation}, MaybeClientCert, OperatorApi, }, - crd::SessionCrd, + crd::{NewOperatorFeature, SessionCrd}, }; use mirrord_progress::{Progress, ProgressTracker}; use tracing::Level; @@ -99,7 +99,7 @@ impl SessionCommandHandler { if code == 404 && reason.contains("parse") => { OperatorApiError::UnsupportedFeature { - feature: "session management".to_string(), + feature: NewOperatorFeature::SessionManagement, operator_version: operator_api.operator().spec.operator_version.clone(), } } @@ -135,16 +135,16 @@ impl SessionCommandHandler { ))); progress.success(Some("Session operation is completed.")); - Ok(()) - } - }) - .transpose()? - // We might've gotten a `SessionCrd` instead of a `Status` (we have a `Left(T)`), - // meaning that the operation has started, but it might not be finished yet. - .unwrap_or_else(|| { - sub_progress.success(Some(&format!("No issues found when executing `{command}`, but the operation status could not be determined at this time."))); - progress.success(Some(&format!("`{command}` is done, but the operation might be pending."))); - }); + Ok(()) + } + }) + .transpose()? + // We might've gotten a `SessionCrd` instead of a `Status` (we have a `Left(T)`), + // meaning that the operation has started, but it might not be finished yet. + .unwrap_or_else(|| { + sub_progress.success(Some(&format!("No issues found when executing `{command}`, but the operation status could not be determined at this time."))); + progress.success(Some(&format!("`{command}` is done, but the operation might be pending."))); + }); Ok(()) } diff --git a/mirrord/config/configuration.md b/mirrord/config/configuration.md index a1be92b8808..f247b82cd4d 100644 --- a/mirrord/config/configuration.md +++ b/mirrord/config/configuration.md @@ -1196,6 +1196,36 @@ of regexes specified here. If there is a match, mirrord will connect your applic the target unix socket address on the target pod. Otherwise, it will leave the connection to happen locally on your machine. +## feature.split_queues {#feature-split_queues} + +Define filters to split queues by, and make your local application consume only messages +that match those filters. +If you don't specify any filter for a queue that is however declared in the +`MirrordWorkloadQueueRegistry` of the target you're using, a match-nothing filter +will be used, and your local application will not receive any messages from that queue. + +```json +{ + "feature": { + "split_queues": { + "first-queue": { + "queue_type": "SQS", + "message_filter": { + "wows": "so wows", + "coolz": "^very .*" + } + }, + "second-queue": { + "queue_type": "SQS", + "message_filter": { + "who": "*you$" + } + }, + } + } +} +``` + ## internal_proxy {#root-internal_proxy} Configuration for the internal proxy mirrord spawns for each local mirrord session diff --git a/mirrord/config/src/feature.rs b/mirrord/config/src/feature.rs index 93f690ead74..68f529c39f0 100644 --- a/mirrord/config/src/feature.rs +++ b/mirrord/config/src/feature.rs @@ -4,12 +4,13 @@ use schemars::JsonSchema; use serde::Serialize; use self::{copy_target::CopyTargetConfig, env::EnvConfig, fs::FsConfig, network::NetworkConfig}; -use crate::config::source::MirrordConfigSource; +use crate::{config::source::MirrordConfigSource, feature::split_queues::SplitQueuesConfig}; pub mod copy_target; pub mod env; pub mod fs; pub mod network; +pub mod split_queues; /// Controls mirrord features. /// @@ -96,6 +97,16 @@ pub struct FeatureConfig { /// Should mirrord return the hostname of the target pod when calling `gethostname` #[config(default = true)] pub hostname: bool, + + /// ## feature.split_queues {#feature-split_queues} + /// + /// Define filters to split queues by, and make your local application consume only messages + /// that match those filters. + /// If you don't specify any filter for a queue that is however declared in the + /// `MirrordWorkloadQueueRegistry` of the target you're using, a match-nothing filter + /// will be used, and your local application will not receive any messages from that queue. + #[config(nested, unstable)] + pub split_queues: SplitQueuesConfig, } impl CollectAnalytics for &FeatureConfig { @@ -105,5 +116,6 @@ impl CollectAnalytics for &FeatureConfig { analytics.add("network", &self.network); analytics.add("copy_target", &self.copy_target); analytics.add("hostname", self.hostname); + analytics.add("split_queues", &self.split_queues); } } diff --git a/mirrord/config/src/feature/split_queues.rs b/mirrord/config/src/feature/split_queues.rs new file mode 100644 index 00000000000..1b25c0cf199 --- /dev/null +++ b/mirrord/config/src/feature/split_queues.rs @@ -0,0 +1,113 @@ +use std::{ + collections::{BTreeMap, HashMap}, + ops::Not, +}; + +use mirrord_analytics::{Analytics, CollectAnalytics}; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use crate::config::{ConfigContext, FromMirrordConfig, MirrordConfig}; + +pub type QueueId = String; + +/// ```json +/// { +/// "feature": { +/// "split_queues": { +/// "first-queue": { +/// "queue_type": "SQS", +/// "message_filter": { +/// "wows": "so wows", +/// "coolz": "^very .*" +/// } +/// }, +/// "second-queue": { +/// "queue_type": "SQS", +/// "message_filter": { +/// "who": "*you$" +/// } +/// }, +/// } +/// } +/// } +/// ``` +#[derive(Clone, Debug, Eq, PartialEq, JsonSchema, Serialize, Deserialize, Default)] +pub struct SplitQueuesConfig(pub Option>); + +impl SplitQueuesConfig { + pub fn is_set(&self) -> bool { + self.0.is_some() + } + + /// Out of the whole queue splitting config, get only the sqs queues. + pub fn get_sqs_filter(&self) -> Option> { + self.0 + .as_ref() + .map(BTreeMap::iter) + .map(|filters| { + filters + // When there are more variants of QueueFilter, change this to a `filter_map`. + .filter_map(|(queue_id, queue_filter)| match queue_filter { + QueueFilter::Sqs(filter_mapping) => { + Some((queue_id.clone(), filter_mapping.clone())) + } + _ => None, + }) + .collect() + }) + .and_then(|filters_map: HashMap| { + filters_map.is_empty().not().then_some(filters_map) + }) + } +} + +impl MirrordConfig for SplitQueuesConfig { + type Generated = Self; + + fn generate_config( + self, + _context: &mut ConfigContext, + ) -> crate::config::Result { + Ok(self) + } +} + +impl FromMirrordConfig for SplitQueuesConfig { + type Generator = Self; +} + +pub type MessageAttributeName = String; +pub type AttributeValuePattern = String; + +/// A filter is a mapping between message attribute names and regexes they should match. +/// The local application will only receive messages that match **all** of the given patterns. +/// This means, only messages that have **all** the `MessageAttributeName`s in the filter, +/// with values of those attributes matching the respective `AttributeValuePattern`. +pub type SqsMessageFilter = BTreeMap; + +/// More queue types might be added in the future. +#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, JsonSchema)] +#[serde(tag = "queue_type", content = "message_filter")] +pub enum QueueFilter { + /// Amazon Simple Queue Service. + #[serde(rename = "SQS")] + Sqs(SqsMessageFilter), + /// When a newer client sends a new filter kind to an older operator, that does not yet know + /// about that filter type, this is what that filter will be deserialized to. + #[schemars(skip)] + #[serde(other, skip_serializing)] + Unknown, +} + +impl CollectAnalytics for &SplitQueuesConfig { + fn collect_analytics(&self, analytics: &mut Analytics) { + analytics.add( + "queue_count", + self.0 + .as_ref() + .map(|mapping| mapping.len()) + .unwrap_or_default(), + ) + } +} diff --git a/mirrord/config/src/lib.rs b/mirrord/config/src/lib.rs index fb1e9c4744a..6ace5e3c200 100644 --- a/mirrord/config/src/lib.rs +++ b/mirrord/config/src/lib.rs @@ -852,6 +852,7 @@ mod tests { })), copy_target: None, hostname: None, + split_queues: None, }), connect_tcp: None, container: None, diff --git a/mirrord/kube/src/api/kubernetes/rollout.rs b/mirrord/kube/src/api/kubernetes/rollout.rs index 5e29894756e..34f610fa0d9 100644 --- a/mirrord/kube/src/api/kubernetes/rollout.rs +++ b/mirrord/kube/src/api/kubernetes/rollout.rs @@ -4,9 +4,9 @@ use k8s_openapi::{ apimachinery::pkg::apis::meta::v1::ObjectMeta, ListableResource, Metadata, NamespaceResourceScope, Resource, }; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Rollout { metadata: ObjectMeta, pub spec: serde_json::Value, diff --git a/mirrord/operator/Cargo.toml b/mirrord/operator/Cargo.toml index 6ca84a0e65f..5bf3d1a114f 100644 --- a/mirrord/operator/Cargo.toml +++ b/mirrord/operator/Cargo.toml @@ -56,7 +56,7 @@ mirrord-progress = { path = "../progress", optional = true } mirrord-protocol = { path = "../protocol", optional = true } base64 = { workspace = true, optional = true } -bincode = { version = "2.0.0-rc.2", features = ["serde"], optional = true } +bincode = { version = "2.0.0-rc.2", features = ["serde"], optional = true } chrono = { version = "0.4", features = ["clock", "serde"] } http = { version = "1", optional = true } http-body-util = { workspace = true, optional = true } diff --git a/mirrord/operator/src/client.rs b/mirrord/operator/src/client.rs index d8982586074..bd3f5b0b8d6 100644 --- a/mirrord/operator/src/client.rs +++ b/mirrord/operator/src/client.rs @@ -1,4 +1,4 @@ -use std::fmt; +use std::{fmt, ops::Not}; use base64::{engine::general_purpose, Engine}; use chrono::{DateTime, Utc}; @@ -15,7 +15,11 @@ use mirrord_auth::{ credential_store::{CredentialStoreSync, UserIdentity}, credentials::LicenseValidity, }; -use mirrord_config::{feature::network::incoming::ConcurrentSteal, target::Target, LayerConfig}; +use mirrord_config::{ + feature::{network::incoming::ConcurrentSteal, split_queues::SplitQueuesConfig}, + target::Target, + LayerConfig, +}; use mirrord_kube::{api::kubernetes::create_kube_config, error::KubeApiError}; use mirrord_progress::Progress; use mirrord_protocol::{ClientMessage, DaemonMessage}; @@ -26,7 +30,7 @@ use tracing::Level; use crate::{ crd::{ - CopyTargetCrd, CopyTargetSpec, MirrordOperatorCrd, OperatorFeatures, TargetCrd, + CopyTargetCrd, CopyTargetSpec, MirrordOperatorCrd, NewOperatorFeature, TargetCrd, OPERATOR_STATUS_NAME, }, types::{ @@ -542,16 +546,17 @@ where Ok(client_config) } - /// If `copy_target` feature is enabled in the given [`LayerConfig`], checks that the operator - /// supports it. - fn check_copy_target_feature_support(&self, config: &LayerConfig) -> OperatorApiResult<()> { - let client_wants_copy = config.feature.copy_target.enabled; - let operator_supports_copy = self.operator.spec.copy_target_enabled.unwrap_or(false); - if client_wants_copy && !operator_supports_copy { - return Err(OperatorApiError::UnsupportedFeature { - feature: "copy target".into(), - operator_version: self.operator.spec.operator_version.clone(), - }); + /// Check the operator supports all the operator features required by the user's configuration. + fn check_feature_support(&self, config: &LayerConfig) -> OperatorApiResult<()> { + if config.feature.copy_target.enabled { + self.operator + .spec + .require_feature(NewOperatorFeature::CopyTarget)? + } + if config.feature.split_queues.is_set() { + self.operator + .spec + .require_feature(NewOperatorFeature::SqsQueueSplitting)?; } Ok(()) @@ -642,16 +647,24 @@ impl OperatorApi { where P: Progress, { - self.check_copy_target_feature_support(config)?; - - let target = if config.feature.copy_target.enabled { + self.check_feature_support(config)?; + + let target = if config.feature.copy_target.enabled + // use copy_target for splitting queues + || config.feature.split_queues.is_set() + { + if config.feature.copy_target.enabled.not() { + tracing::info!("Creating a copy-target for queue-splitting (even thought copy_target was not explicitly set).") + } let mut copy_subtask = progress.subtask("copying target"); // We do not validate the `target` here, it's up to the operator. let target = config.target.path.clone().unwrap_or(Target::Targetless); let scale_down = config.feature.copy_target.scale_down; let namespace = self.target_namespace(config); - let copied = self.copy_target(target, scale_down, namespace).await?; + let copied = self + .copy_target(target, scale_down, namespace, &config.feature.split_queues) + .await?; copy_subtask.success(Some("target copied")); @@ -676,10 +689,8 @@ impl OperatorApi { let use_proxy_api = self .operator .spec - .features - .as_ref() - .map(|features| features.contains(&OperatorFeatures::ProxyApi)) - .unwrap_or(false); + .supported_features() + .contains(&NewOperatorFeature::ProxyApi); let connect_url = target.connect_url( use_proxy_api, config.feature.network.incoming.on_concurrent_steal, @@ -718,6 +729,7 @@ impl OperatorApi { target: Target, scale_down: bool, namespace: &str, + split_queues: &SplitQueuesConfig, ) -> OperatorApiResult { let name = TargetCrd::urlfied_name(&target); @@ -727,6 +739,7 @@ impl OperatorApi { target, idle_ttl: Some(Self::COPIED_POD_IDLE_TTL), scale_down, + split_queues: split_queues.clone(), }, ); diff --git a/mirrord/operator/src/client/error.rs b/mirrord/operator/src/client/error.rs index 83e46eeebaf..1f08661542f 100644 --- a/mirrord/operator/src/client/error.rs +++ b/mirrord/operator/src/client/error.rs @@ -4,7 +4,7 @@ pub use http::Error as HttpError; use mirrord_kube::error::KubeApiError; use thiserror::Error; -use crate::crd::kube_target::UnknownTargetType; +use crate::crd::{kube_target::UnknownTargetType, NewOperatorFeature}; /// Operations performed on the operator via [`kube`] API. #[derive(Debug)] @@ -50,7 +50,7 @@ pub enum OperatorApiError { #[error("mirrord operator {operator_version} does not support feature {feature}")] UnsupportedFeature { - feature: String, + feature: NewOperatorFeature, operator_version: String, }, diff --git a/mirrord/operator/src/crd.rs b/mirrord/operator/src/crd.rs index b61fb5a0e38..2a7cbe0b0ff 100644 --- a/mirrord/operator/src/crd.rs +++ b/mirrord/operator/src/crd.rs @@ -1,10 +1,21 @@ +use std::{ + collections::{BTreeMap, HashMap}, + fmt::{Display, Formatter}, +}; + use kube::CustomResource; use kube_target::{KubeTarget, UnknownTargetType}; -use mirrord_config::target::{Target, TargetConfig}; +pub use mirrord_config::feature::split_queues::QueueId; +use mirrord_config::{ + feature::split_queues::{SplitQueuesConfig, SqsMessageFilter}, + target::{Target, TargetConfig}, +}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use self::label_selector::LabelSelector; +#[cfg(feature = "client")] +use crate::client::error::OperatorApiError; use crate::types::LicenseInfoOwned; pub mod kube_target; @@ -86,10 +97,100 @@ pub static OPERATOR_STATUS_NAME: &str = "operator"; pub struct MirrordOperatorSpec { pub operator_version: String, pub default_namespace: String, - pub features: Option>, + /// Should be removed when we can stop supporting compatibility with versions from before the + /// `supported_features` field was added. + /// "Breaking" that compatibility by removing this field and then running with one old (from + /// before the `supported_features` field) side (client or operator) would make the client + /// think `ProxyApi` is not supported even if it is. + #[deprecated(note = "use supported_features instead")] + features: Option>, + /// Replaces both `features` and `copy_target_enabled`. Operator versions that use a version + /// of this code that has both this and the old fields are expected to populate this field with + /// the full set of features they support, and the old fields with their limited info they + /// support, for old clients. + /// + /// Access this info only via `supported_features()`. + /// Optional for backwards compatibility (new clients can talk to old operators that don't send + /// this field). + supported_features: Option>, pub license: LicenseInfoOwned, pub protocol_version: Option, - pub copy_target_enabled: Option, + /// Should be removed when we can stop supporting compatibility with versions from before the + /// `supported_features` field was added. + /// "Breaking" that compatibility by removing this field and then running with one old (from + /// before the `supported_features` field) side (client or operator) would make the client + /// think copy target is not enabled even if it is. + /// Optional for backwards compatibility (new clients can talk to old operators that don't send + /// this field). + #[deprecated(note = "use supported_features instead")] + copy_target_enabled: Option, +} + +impl MirrordOperatorSpec { + pub fn new( + operator_version: String, + default_namespace: String, + supported_features: Vec, + license: LicenseInfoOwned, + protocol_version: Option, + ) -> Self { + let features = supported_features + .contains(&NewOperatorFeature::ProxyApi) + .then(|| vec![OperatorFeatures::ProxyApi]); + let copy_target_enabled = + Some(supported_features.contains(&NewOperatorFeature::CopyTarget)); + #[allow(deprecated)] // deprecated objects must still be included in construction. + Self { + operator_version, + default_namespace, + supported_features: Some(supported_features), + license, + protocol_version, + features, + copy_target_enabled, + } + } + + /// Get a vector with the features the operator supports. + /// Handles objects sent from old and new operators. + // When the deprecated fields are removed, this can be changed to just return + // `self.supported_features.unwrap_or_default()`. + pub fn supported_features(&self) -> Vec { + self.supported_features + .clone() + // if supported_features was sent, just use that. If not we are dealing with an older + // operator, so we build a vector of new features from the old fields. + .or_else(|| { + // object was sent by an old operator that still uses fields that are now deprecated + #[allow(deprecated)] + self.features.as_ref().map(|features| { + features + .iter() + .map(From::from) + .chain( + self.copy_target_enabled.and_then(|enabled| { + enabled.then_some(NewOperatorFeature::CopyTarget) + }), + ) + .collect() + }) + }) + // Convert `None` to empty vector since we don't expect this to often be + // `None` (although it's ok if it is) and that way the return type is simpler. + .unwrap_or_default() + } + + #[cfg(feature = "client")] + pub fn require_feature(&self, feature: NewOperatorFeature) -> Result<(), OperatorApiError> { + if self.supported_features().contains(&feature) { + Ok(()) + } else { + Err(OperatorApiError::UnsupportedFeature { + feature, + operator_version: self.operator_version.clone(), + }) + } + } } #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema)] @@ -137,9 +238,49 @@ pub struct Session { )] pub struct SessionSpec; +/// Since this enum does not have a variant marked with `#[serde(other)]`, and is present like that +/// in released clients, existing clients would fail to parse any new variant. This means the +/// operator can never send anything but the one existing variant, otherwise the client will error +/// out. #[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize, JsonSchema)] pub enum OperatorFeatures { ProxyApi, + // DON'T ADD VARIANTS - old clients won't be able to deserialize them. + // Add new features in NewOperatorFeature +} + +#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize, JsonSchema)] +pub enum NewOperatorFeature { + ProxyApi, + CopyTarget, + SqsQueueSplitting, + SessionManagement, + /// This variant is what a client sees when the operator includes a feature the client is not + /// yet aware of, because it was introduced in a version newer than the client's. + #[schemars(skip)] + #[serde(other, skip_serializing)] + Unknown, +} + +impl Display for NewOperatorFeature { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let name = match self { + NewOperatorFeature::ProxyApi => "proxy API", + NewOperatorFeature::CopyTarget => "copy target", + NewOperatorFeature::SqsQueueSplitting => "SQS queue splitting", + NewOperatorFeature::Unknown => "unknown feature", + NewOperatorFeature::SessionManagement => "session management", + }; + f.write_str(name) + } +} + +impl From<&OperatorFeatures> for NewOperatorFeature { + fn from(old_feature: &OperatorFeatures) -> Self { + match old_feature { + OperatorFeatures::ProxyApi => NewOperatorFeature::ProxyApi, + } + } } /// This [`Resource`](kube::Resource) represents a copy pod created from an existing [`Target`] @@ -161,6 +302,8 @@ pub struct CopyTargetSpec { /// Should the operator scale down target deployment to 0 while this pod is alive. /// Ignored if [`Target`] is not [`Target::Deployment`]. pub scale_down: bool, + /// Split queues client side configuration. + pub split_queues: SplitQueuesConfig, } /// Features and operations that can be blocked by a `MirrordPolicy`. @@ -199,3 +342,270 @@ pub struct MirrordPolicySpec { /// List of features and operations blocked by this policy. pub block: Vec, } + +/// Set where the application reads the name of the queue from, so that mirrord can find that queue, +/// split it, and temporarily change the name there to the name of the branch queue when splitting. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, JsonSchema)] +#[serde(rename_all = "camelCase")] // EnvVar -> envVar in yaml. +pub enum QueueNameSource { + EnvVar(String), +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, JsonSchema)] +#[serde(rename_all = "camelCase")] // name_source -> nameSource in yaml. +pub struct SqsQueueDetails { + /// Where the application gets the queue name from. Will be used to read messages from that + /// queue and distribute them to the output queues. When running with mirrord and splitting + /// this queue, applications will get a modified name from that source. + pub name_source: QueueNameSource, + + /// These tags will be set for all temporary SQS queues created by mirrord for queues defined + /// in this MirrordWorkloadQueueRegistry, alongside with the original tags of the respective + /// original queue. In case of a collision, the temporary queue will get the value from the + /// tag passed in here. + pub tags: Option>, +} + +/// The details of a queue that should be split. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, JsonSchema)] +#[serde(tag = "queueType")] +pub enum SplitQueue { + /// Amazon SQS + #[serde(rename = "SQS")] + Sqs(SqsQueueDetails), +} + +/// A workload that is a consumer of a queue that is being split. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, JsonSchema)] +pub struct QueueConsumer { + pub name: String, + /// If a container is not specified, the workload queue registry will apply to every run that + /// targets any of the workload's containers. + pub container: Option, + pub workload_type: QueueConsumerType, +} + +/// A workload that is a consumer of a queue that is being split. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, JsonSchema)] +pub enum QueueConsumerType { + Deployment, + + Rollout, + + #[schemars(skip)] + #[serde(other, skip_serializing)] + Unsupported, +} + +impl Display for QueueConsumerType { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + QueueConsumerType::Deployment => write!(f, "deployment"), + QueueConsumerType::Rollout => write!(f, "rollout"), + QueueConsumerType::Unsupported => write!(f, "unsupported"), + } + } +} + +impl QueueConsumer { + /// For self that is the queue consumer of a run, test if a given registry object is the correct + /// registry for this run. + pub fn registry_matches(&self, registry: &MirrordWorkloadQueueRegistry) -> bool { + let registry_consumer = ®istry.spec.consumer; + self.workload_type == registry_consumer.workload_type + && self.name == registry_consumer.name + && (self.container == registry_consumer.container + // If registry does not specify a container, it applies to all runs with + // this target, regardless of what container they are targeting. + || registry_consumer.container.is_none()) + } +} + +impl Display for QueueConsumer { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + if let Some(ref container) = self.container { + write!( + f, + "{}/{}/container/{container}", + self.workload_type, self.name + ) + } else { + write!(f, "{}/{}", self.workload_type, self.name) + } + } +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema)] +pub struct QueueNameUpdate { + pub original_name: String, + pub output_name: String, +} + +/// Details retrieved from K8s resources once the splitter is active, used on filter session +/// creation to determine the required config changes that make the application use the +/// output queues instead of the original. +// This status struct is not optimal in that it contains redundant information. This makes the +// controller's code a bit simpler. +// Some information is present in the spec, but it is organized differently. +#[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema)] +pub struct ActiveSqsSplits { + /// For each queue_id, the actual queue name as retrieved from the target's pod spec or config + /// map, together with the name of its temporary output queue. + pub queue_names: BTreeMap, + + /// Names of env vars that contain the queue name directly in the pod template, without config + /// map refs, mapped to their queue id. + pub direct_env_vars: HashMap, + + pub env_updates: BTreeMap, +} + +impl ActiveSqsSplits { + pub fn output_queue_names(&self) -> Vec<&str> { + self.queue_names + .values() + .map(|QueueNameUpdate { output_name, .. }| output_name.as_str()) + .collect() + } +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema)] +#[serde(rename_all = "camelCase")] // active_filters -> activeFilters +pub struct WorkloadQueueRegistryStatus { + /// Optional even though it's currently the only field, because in the future there will be + /// fields for other queue types. + pub sqs_details: Option, +} + +/// Defines a Custom Resource that holds a central configuration for splitting queues for a +/// QueueConsumer (a target workload for which queues should be split). +/// +/// This means there should be 1 such resource per queue splitting target. +#[derive(CustomResource, Clone, Debug, Deserialize, Serialize, JsonSchema)] +#[kube( + group = "queues.mirrord.metalbear.co", + version = "v1alpha", + kind = "MirrordWorkloadQueueRegistry", + shortname = "qs", + status = "WorkloadQueueRegistryStatus", + namespaced +)] +pub struct MirrordWorkloadQueueRegistrySpec { + /// A map of the queues that should be split. + /// The key is used by users to associate filters to the right queues. + pub queues: BTreeMap, + + /// The resource (deployment or Argo rollout) that reads from the queues. + pub consumer: QueueConsumer, +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema)] +#[serde(rename = "SQSSplitDetails", rename_all = "camelCase")] +pub struct SqsSplitDetails { + /// Queue ID -> old and new queue names. + pub queue_names: BTreeMap, + + // A bit redundant, because the registry resource status has the mapping from env var name + // to queue id, and `queue_names` has the mapping from queue id to name update, but, saving + // it here in the form that is useful to reader, for simplicity and readability. + /// Env var name -> old and new queue names. + pub env_updates: BTreeMap, +} + +/// Representation of Sqs errors for the status of SQS session resources. +#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct SqsSessionError { + /// HTTP code for operator response. + pub status_code: u16, + + /// Human-readable explanation of what went wrong. + pub reason: String, +} + +impl Display for SqsSessionError { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + // Write strictly the first element into the supplied output + // stream: `f`. Returns `fmt::Result` which indicates whether the + // operation succeeded or failed. Note that `write!` uses syntax which + // is very similar to `println!`. + write!(f, "{}", self.reason) + } +} + +#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)] +#[serde(rename = "SQSSessionStatus")] +pub enum SqsSessionStatus { + // kube-rs does not allow mixing unit variants with tuple/struct variants, so this variant + // has to be a tuple/struct too. If we leave the tuple empty, k8s complains about an object + // without any items, and kube-rs does not support internally tagged enums, so we actually + // have to put something in there, even if we don't actually care about that info. + Starting { + start_time_utc: String, + }, + /// SQS operator sets this status before it starts registering filters, so that if anything + /// fails during the registration of filters, we have all the queues we need to delete on + /// cleanup. + RegisteringFilters(SqsSplitDetails), + Ready(SqsSplitDetails), + StartError(SqsSessionError), + CleanupError { + error: SqsSessionError, + details: Option, + }, +} + +impl SqsSessionStatus { + pub fn get_split_details(&self) -> Option<&SqsSplitDetails> { + match self { + SqsSessionStatus::RegisteringFilters(details) | SqsSessionStatus::Ready(details) => { + Some(details) + } + SqsSessionStatus::CleanupError { details, .. } => details.as_ref(), + _ => None, + } + } +} + +/// The [`kube::runtime::wait::Condition`] trait is auto-implemented for this function. +/// To be used in [`kube::runtime::wait::await_condition`]. +pub fn is_session_ready(session: Option<&MirrordSqsSession>) -> bool { + session + .and_then(|session| session.status.as_ref()) + .map(|status| { + matches!( + status, + SqsSessionStatus::Ready(..) + | SqsSessionStatus::StartError(..) + | SqsSessionStatus::CleanupError { .. } + ) + }) + .unwrap_or_default() +} + +/// The operator creates this object when a user runs mirrord against a target that is a queue +/// consumer. +#[derive(CustomResource, Clone, Debug, Deserialize, Serialize, JsonSchema)] +#[kube( +group = "queues.mirrord.metalbear.co", +version = "v1alpha", +kind = "MirrordSQSSession", +root = "MirrordSqsSession", // for Rust naming conventions (Sqs, not SQS) +status = "SqsSessionStatus", +namespaced +)] +#[serde(rename_all = "camelCase")] // queue_filters -> queueFilters +pub struct MirrordSqsSessionSpec { + /// For each queue_id, a mapping from attribute name, to attribute value regex. + /// The queue_id for a queue is determined at the queue registry. It is not (necessarily) + /// The name of the queue on AWS. + pub queue_filters: HashMap, + + /// The target of this session. + pub queue_consumer: QueueConsumer, + + /// The id of the mirrord exec session, from the operator. + // The Kubernetes API can't deal with 64 bit numbers (with most significant bit set) + // so we save that field as a (HEX) string even though its source is a u64 + pub session_id: String, +} diff --git a/mirrord/operator/src/setup.rs b/mirrord/operator/src/setup.rs index 0fc49a8343e..d9e6a7432c3 100644 --- a/mirrord/operator/src/setup.rs +++ b/mirrord/operator/src/setup.rs @@ -26,7 +26,7 @@ use k8s_openapi::{ use kube::{CustomResourceExt, Resource}; use thiserror::Error; -use crate::crd::{MirrordPolicy, TargetCrd}; +use crate::crd::{MirrordPolicy, MirrordSqsSession, MirrordWorkloadQueueRegistry, TargetCrd}; static OPERATOR_NAME: &str = "mirrord-operator"; /// 443 is standard port for APIService, do not change this value @@ -88,6 +88,8 @@ pub struct SetupOptions { pub license: LicenseType, pub namespace: OperatorNamespace, pub image: String, + pub aws_role_arn: Option, + pub sqs_splitting: bool, } #[derive(Debug)] @@ -103,6 +105,7 @@ pub struct Operator { user_cluster_role: OperatorClusterUserRole, client_ca_role: OperatorClientCaRole, client_ca_role_binding: OperatorClientCaRoleBinding, + sqs_splitting: bool, } impl Operator { @@ -111,6 +114,8 @@ impl Operator { license, namespace, image, + aws_role_arn, + sqs_splitting, } = options; let (license_secret, license_key) = match license { @@ -120,9 +125,9 @@ impl Operator { } }; - let service_account = OperatorServiceAccount::new(&namespace); + let service_account = OperatorServiceAccount::new(&namespace, aws_role_arn); - let role = OperatorRole::new(); + let role = OperatorRole::new(sqs_splitting); let role_binding = OperatorRoleBinding::new(&role, &service_account); let user_cluster_role = OperatorClusterUserRole::new(); @@ -136,6 +141,7 @@ impl Operator { license_secret.as_ref(), license_key, image, + sqs_splitting, ); let service = OperatorService::new(&namespace); @@ -154,6 +160,7 @@ impl Operator { user_cluster_role, client_ca_role, client_ca_role_binding, + sqs_splitting, } } } @@ -197,6 +204,14 @@ impl OperatorSetup for Operator { writer.write_all(b"---\n")?; MirrordPolicy::crd().to_writer(&mut writer)?; + if self.sqs_splitting { + writer.write_all(b"---\n")?; + MirrordWorkloadQueueRegistry::crd().to_writer(&mut writer)?; + + writer.write_all(b"---\n")?; + MirrordSqsSession::crd().to_writer(&mut writer)?; + } + Ok(()) } } @@ -236,6 +251,7 @@ impl OperatorDeployment { license_secret: Option<&OperatorLicenseSecret>, license_key: Option, image: String, + sqs_splitting: bool, ) -> Self { let mut envs = vec![ EnvVar { @@ -295,6 +311,14 @@ impl OperatorDeployment { }); } + if sqs_splitting { + envs.push(EnvVar { + name: "OPERATOR_SQS_SPLITTING".to_owned(), + value: Some("true".to_string()), + value_from: None, + }); + } + let health_probe = Probe { http_get: Some(HTTPGetAction { path: Some("/health".to_owned()), @@ -379,11 +403,13 @@ impl OperatorDeployment { pub struct OperatorServiceAccount(ServiceAccount); impl OperatorServiceAccount { - pub fn new(namespace: &OperatorNamespace) -> Self { + pub fn new(namespace: &OperatorNamespace, aws_role_arn: Option) -> Self { let sa = ServiceAccount { metadata: ObjectMeta { name: Some(OPERATOR_SERVICE_ACCOUNT_NAME.to_owned()), namespace: Some(namespace.name().to_owned()), + annotations: aws_role_arn + .map(|arn| BTreeMap::from([("eks.amazonaws.com/role-arn".to_string(), arn)])), labels: Some(APP_LABELS.clone()), ..Default::default() }, @@ -411,86 +437,147 @@ impl OperatorServiceAccount { pub struct OperatorRole(ClusterRole); impl OperatorRole { - pub fn new() -> Self { - let role = ClusterRole { - metadata: ObjectMeta { - name: Some(OPERATOR_ROLE_NAME.to_owned()), + pub fn new(sqs_splitting: bool) -> Self { + let mut rules = vec![ + PolicyRule { + api_groups: Some(vec![ + "".to_owned(), + "apps".to_owned(), + "batch".to_owned(), + "argoproj.io".to_owned(), + ]), + resources: Some(vec![ + "nodes".to_owned(), + "pods".to_owned(), + "pods/log".to_owned(), + "pods/ephemeralcontainers".to_owned(), + "deployments".to_owned(), + "deployments/scale".to_owned(), + "rollouts".to_owned(), + "rollouts/scale".to_owned(), + "jobs".to_owned(), + "cronjobs".to_owned(), + "statefulsets".to_owned(), + "statefulsets/scale".to_owned(), + ]), + verbs: vec!["get".to_owned(), "list".to_owned(), "watch".to_owned()], ..Default::default() }, - rules: Some(vec![ - PolicyRule { - api_groups: Some(vec![ - "".to_owned(), - "apps".to_owned(), - "batch".to_owned(), - "argoproj.io".to_owned(), - ]), - resources: Some(vec![ - "nodes".to_owned(), - "pods".to_owned(), - "pods/log".to_owned(), - "pods/ephemeralcontainers".to_owned(), - "deployments".to_owned(), - "deployments/scale".to_owned(), - "rollouts".to_owned(), - "rollouts/scale".to_owned(), - "jobs".to_owned(), - "cronjobs".to_owned(), - "statefulsets".to_owned(), - "statefulsets/scale".to_owned(), - ]), - verbs: vec!["get".to_owned(), "list".to_owned(), "watch".to_owned()], - ..Default::default() - }, - PolicyRule { - api_groups: Some(vec!["apps".to_owned(), "argoproj.io".to_owned()]), - resources: Some(vec![ - "deployments/scale".to_owned(), - "rollouts/scale".to_owned(), - "statefulsets/scale".to_owned(), - ]), - verbs: vec!["patch".to_owned()], - ..Default::default() - }, + // For SQS controller to temporarily change deployments to use changed queues. + PolicyRule { + api_groups: Some(vec!["apps".to_owned()]), + resources: Some(vec!["deployments".to_owned()]), + verbs: vec!["patch".to_owned()], + ..Default::default() + }, + // For SQS controller to temporarily change Argo Rollouts to use changed queues. + PolicyRule { + api_groups: Some(vec!["argoproj.io".to_owned()]), + resources: Some(vec!["rollouts".to_owned()]), + verbs: vec!["patch".to_owned()], + ..Default::default() + }, + PolicyRule { + api_groups: Some(vec!["apps".to_owned(), "argoproj.io".to_owned()]), + resources: Some(vec![ + "deployments/scale".to_owned(), + "rollouts/scale".to_owned(), + "statefulsets/scale".to_owned(), + ]), + verbs: vec!["patch".to_owned()], + ..Default::default() + }, + PolicyRule { + api_groups: Some(vec!["".to_owned(), "batch".to_owned()]), + resources: Some(vec!["jobs".to_owned(), "pods".to_owned()]), + verbs: vec!["create".to_owned(), "delete".to_owned()], + ..Default::default() + }, + PolicyRule { + api_groups: Some(vec!["".to_owned()]), + resources: Some(vec!["pods/ephemeralcontainers".to_owned()]), + verbs: vec!["update".to_owned()], + ..Default::default() + }, + PolicyRule { + api_groups: Some(vec!["".to_owned(), "authentication.k8s.io".to_owned()]), + resources: Some(vec![ + "groups".to_owned(), + "users".to_owned(), + "userextras/accesskeyid".to_owned(), + "userextras/arn".to_owned(), + "userextras/canonicalarn".to_owned(), + "userextras/sessionname".to_owned(), + "userextras/iam.gke.io/user-assertion".to_owned(), + "userextras/user-assertion.cloud.google.com".to_owned(), + "userextras/principalid".to_owned(), + "userextras/oid".to_owned(), + "userextras/username".to_owned(), + "userextras/licensekey".to_owned(), + ]), + verbs: vec!["impersonate".to_owned()], + ..Default::default() + }, + // Allow the operator to list+get mirrord policies. + PolicyRule { + api_groups: Some(vec!["policies.mirrord.metalbear.co".to_owned()]), + resources: Some(vec![MirrordPolicy::plural(&()).to_string()]), + verbs: vec!["list".to_owned(), "get".to_owned()], + ..Default::default() + }, + ]; + if sqs_splitting { + rules.extend([ + // Allow the operator to list mirrord queue registries. PolicyRule { - api_groups: Some(vec!["".to_owned(), "batch".to_owned()]), - resources: Some(vec!["jobs".to_owned(), "pods".to_owned()]), - verbs: vec!["create".to_owned(), "delete".to_owned()], + api_groups: Some(vec!["queues.mirrord.metalbear.co".to_owned()]), + resources: Some(vec![MirrordWorkloadQueueRegistry::plural(&()).to_string()]), + verbs: vec!["list".to_owned()], ..Default::default() }, + // Allow the SQS controller to update queue registry status. PolicyRule { - api_groups: Some(vec!["".to_owned()]), - resources: Some(vec!["pods/ephemeralcontainers".to_owned()]), - verbs: vec!["update".to_owned()], + api_groups: Some(vec!["queues.mirrord.metalbear.co".to_owned()]), + resources: Some(vec!["mirrordworkloadqueueregistries/status".to_string()]), + verbs: vec![ + // For setting the status in the SQS controller. + "update".to_owned(), + ], ..Default::default() }, + // Allow the operator to control mirrord SQS session objects. PolicyRule { - api_groups: Some(vec!["".to_owned(), "authentication.k8s.io".to_owned()]), - resources: Some(vec![ - "groups".to_owned(), - "users".to_owned(), - "userextras/accesskeyid".to_owned(), - "userextras/arn".to_owned(), - "userextras/canonicalarn".to_owned(), - "userextras/sessionname".to_owned(), - "userextras/iam.gke.io/user-assertion".to_owned(), - "userextras/user-assertion.cloud.google.com".to_owned(), - "userextras/principalid".to_owned(), - "userextras/oid".to_owned(), - "userextras/username".to_owned(), - "userextras/licensekey".to_owned(), - ]), - verbs: vec!["impersonate".to_owned()], + api_groups: Some(vec!["queues.mirrord.metalbear.co".to_owned()]), + resources: Some(vec![MirrordSqsSession::plural(&()).to_string()]), + verbs: vec![ + "create".to_owned(), + "watch".to_owned(), + "list".to_owned(), + "get".to_owned(), + "delete".to_owned(), + "deletecollection".to_owned(), + "patch".to_owned(), + ], ..Default::default() }, - // Allow the operator to list mirrord policies. + // Allow the SQS controller to update queue registry status. PolicyRule { - api_groups: Some(vec!["policies.mirrord.metalbear.co".to_owned()]), - resources: Some(vec![MirrordPolicy::plural(&()).to_string()]), - verbs: vec!["list".to_owned(), "get".to_owned()], + api_groups: Some(vec!["queues.mirrord.metalbear.co".to_owned()]), + resources: Some(vec!["mirrordsqssessions/status".to_string()]), + verbs: vec![ + // For setting the status in the SQS controller. + "update".to_owned(), + ], ..Default::default() }, - ]), + ]); + } + let role = ClusterRole { + metadata: ObjectMeta { + name: Some(OPERATOR_ROLE_NAME.to_owned()), + ..Default::default() + }, + rules: Some(rules), ..Default::default() }; @@ -508,7 +595,7 @@ impl OperatorRole { impl Default for OperatorRole { fn default() -> Self { - Self::new() + Self::new(false) } }