Skip to content

Commit

Permalink
Queue Splitting Feature (#2173)
Browse files Browse the repository at this point in the history
* Splitter CRD

Add new features enum and field for operator

The old enum cannot be extended as it would break old clients.
So we define a new extensible enum and a new field for it.
We make all of those fields private so that users of that struct
only access them via a getter that handles objects deserialized
from either old or new versions.

use copy target when splitting

* working env

* deletecollection for sqs sessions

* change SqsMessageFilter to BTreeMap for iterating.

* change queue_names to BTreeMap for iterating

* change config_map_updates to BTreeMap for iterating

* change queue_names to BTreeMap for iterating

* change queues to BTreeMap for iterating

* get output queue names from splitter status

* splitter status

* todo

* set -> vec

* tags

* docs

* remove active_sessions from splitter status

* MirrordQueueSplitter -> MirrordWorkloadQueueRegistry, splitters. -> queues.

* merge fix

* lock

* mergefix

* remove pod spec from session status

* remove podspec from registry crd status

* save only updates instead of containers

* docs

* changelog

* remove tests (moved to sqs-tests branch)

* docs and improvements

* cargo fmt

* mirrord-schema.json

* run medschool

* fmt

* oops

* merge

* display Q consumer

* fix merge

* fix feature use

* rename sqs details and make optional

* rename QueueDetails -> ActiveSqsSplits

* move output_q_names to ActiveSqsSplits

* fmt

* CR: lambda var name

* move method call to own .map call

* CR: remove match arm for future variants.

* CR: document SqsMessageFilter

* CR: remove get_target_type

* CR: Operator feature variant names.

* CR: unused type

* CR: docs: "pod spec" -> "pod template"

Co-authored-by: Michał Smolarek <[email protected]>

* CR: remove dead configmaps code

* CR: make session status enum

* CR: remove non_exauhstive

* status enum stuff

* CR: sqs session error - code + reason

* just need to push a commit

* pub sqs details

* pub sqs details

* impl Display for SqsSessionError

* schema

* CR: add container to queue consumer

* QueueConsumer registry matching

* QueueConsumer container getter

* Add RegisteringFilters Status variant

* get_split_details

* CR: registry docs

* CR: doc text formatting suggestion

Co-authored-by: Michał Smolarek <[email protected]>

* CR: individual tags per queue

* missing pub

* mark unstable

* CR: CopyTargetCrd, send queues config

* non non exauhstive

* CR: CopyTargetCrd, send queues config

* Added unknown variant to QueueFilter

* CR: info log on implicit copy-target

* Change QueueConsumer to have a valid k8s schema

* CRD fixes: skip unknown variants in schema, change CleanupError status for k8s

* starting status has to have items

* docs

* change start time to str because I remember k8s actually can't deal with unsigned integer of 64 bits and more

* rule to patch pods

* api group of pods is empty string

* Can't patch pod container env :(

* CR: out of place serde tag

Co-authored-by: Michał Smolarek <[email protected]>

* Support installing an SQS-enabled operator via `mirrord operator setup` command

* incomplete flag docs

---------

Co-authored-by: Michał Smolarek <[email protected]>
  • Loading branch information
t4lz and Razz4780 authored Aug 16, 2024
1 parent 38407db commit d97df8d
Show file tree
Hide file tree
Showing 16 changed files with 867 additions and 117 deletions.
1 change: 1 addition & 0 deletions changelog.d/2066.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Client side support for the upcoming SQS queue splitting support in *mirrord for Teams*.
49 changes: 49 additions & 0 deletions mirrord-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,18 @@
"type": "null"
}
]
},
"split_queues": {
"title": "feature.split_queues {#feature-split_queues}",
"description": "Define filters to split queues by, and make your local application consume only messages that match those filters. If you don't specify any filter for a queue that is however declared in the `MirrordWorkloadQueueRegistry` of the target you're using, a match-nothing filter will be used, and your local application will not receive any messages from that queue.",
"anyOf": [
{
"$ref": "#/definitions/SplitQueuesConfig"
},
{
"type": "null"
}
]
}
},
"additionalProperties": false
Expand Down Expand Up @@ -1431,6 +1443,33 @@
}
]
},
"QueueFilter": {
"description": "More queue types might be added in the future.",
"oneOf": [
{
"description": "Amazon Simple Queue Service.",
"type": "object",
"required": [
"message_filter",
"queue_type"
],
"properties": {
"message_filter": {
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"queue_type": {
"type": "string",
"enum": [
"SQS"
]
}
}
}
]
},
"RolloutTarget": {
"description": "<!--${internal}--> Mirror the rollout specified by [`RolloutTarget::rollout`].",
"type": "object",
Expand All @@ -1451,6 +1490,16 @@
},
"additionalProperties": false
},
"SplitQueuesConfig": {
"description": "```json { \"feature\": { \"split_queues\": { \"first-queue\": { \"queue_type\": \"SQS\", \"message_filter\": { \"wows\": \"so wows\", \"coolz\": \"^very .*\" } }, \"second-queue\": { \"queue_type\": \"SQS\", \"message_filter\": { \"who\": \"*you$\" } }, } } } ```",
"type": [
"object",
"null"
],
"additionalProperties": {
"$ref": "#/definitions/QueueFilter"
}
},
"StatefulSetTarget": {
"type": "object",
"required": [
Expand Down
20 changes: 20 additions & 0 deletions mirrord/cli/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,26 @@ pub(super) enum OperatorCommand {
/// will be able to access)
#[arg(short, long, default_value = "mirrord")]
namespace: OperatorNamespace,

/// AWS role ARN for the operator's service account.
/// Necessary for enabling SQS queue splitting.
/// For successfully running an SQS queue splitting operator the given IAM role must be
/// able to create, read from, write to, and delete SQS queues.
/// If the queue messages are encrypted using KMS, the operator also needs the
/// `kms:Encrypt`, `kms:Decrypt` and `kms:GenerateDataKey` permissions.
#[arg(long, visible_alias = "arn")]
aws_role_arn: Option<String>,

/// Enable SQS queue splitting.
/// When set, some extra CRDs will be installed on the cluster, and the operator will run
/// an SQS splitting component.
#[arg(
long,
visible_alias = "sqs",
default_value_t = false,
requires = "aws_role_arn"
)]
sqs_splitting: bool,
},
/// Print operator status
Status {
Expand Down
2 changes: 1 addition & 1 deletion mirrord/cli/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ impl From<OperatorApiError> for CliError {
feature,
operator_version,
} => Self::FeatureNotSupportedInOperatorError {
feature,
feature: feature.to_string(),
operator_version,
},
OperatorApiError::CreateKubeClient(e) => Self::CreateKubeApiFailed(e),
Expand Down
20 changes: 17 additions & 3 deletions mirrord/cli/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ async fn operator_setup(
namespace: OperatorNamespace,
license_key: Option<String>,
license_path: Option<PathBuf>,
aws_role_arn: Option<String>,
sqs_splitting: bool,
) -> Result<(), OperatorSetupError> {
if !accept_tos {
eprintln!("Please note that mirrord operator installation requires an active subscription for the mirrord Operator provided by MetalBear Tech LTD.\nThe service ToS can be read here - https://metalbear.co/legal/terms\nPass --accept-tos to accept the TOS");
Expand Down Expand Up @@ -101,6 +103,8 @@ async fn operator_setup(
license,
namespace,
image,
aws_role_arn,
sqs_splitting,
});

match file {
Expand Down Expand Up @@ -299,9 +303,19 @@ pub(crate) async fn operator_command(args: OperatorArgs) -> Result<()> {
namespace,
license_key,
license_path,
} => operator_setup(accept_tos, file, namespace, license_key, license_path)
.await
.map_err(CliError::from),
aws_role_arn,
sqs_splitting,
} => operator_setup(
accept_tos,
file,
namespace,
license_key,
license_path,
aws_role_arn,
sqs_splitting,
)
.await
.map_err(CliError::from),
OperatorCommand::Status { config_file } => operator_status(config_file.as_deref()).await,
OperatorCommand::Session(session_command) => {
SessionCommandHandler::new(session_command)
Expand Down
24 changes: 12 additions & 12 deletions mirrord/cli/src/operator/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use mirrord_operator::{
error::{OperatorApiError, OperatorOperation},
MaybeClientCert, OperatorApi,
},
crd::SessionCrd,
crd::{NewOperatorFeature, SessionCrd},
};
use mirrord_progress::{Progress, ProgressTracker};
use tracing::Level;
Expand Down Expand Up @@ -99,7 +99,7 @@ impl SessionCommandHandler {
if code == 404 && reason.contains("parse") =>
{
OperatorApiError::UnsupportedFeature {
feature: "session management".to_string(),
feature: NewOperatorFeature::SessionManagement,
operator_version: operator_api.operator().spec.operator_version.clone(),
}
}
Expand Down Expand Up @@ -135,16 +135,16 @@ impl SessionCommandHandler {
)));
progress.success(Some("Session operation is completed."));

Ok(())
}
})
.transpose()?
// We might've gotten a `SessionCrd` instead of a `Status` (we have a `Left(T)`),
// meaning that the operation has started, but it might not be finished yet.
.unwrap_or_else(|| {
sub_progress.success(Some(&format!("No issues found when executing `{command}`, but the operation status could not be determined at this time.")));
progress.success(Some(&format!("`{command}` is done, but the operation might be pending.")));
});
Ok(())
}
})
.transpose()?
// We might've gotten a `SessionCrd` instead of a `Status` (we have a `Left(T)`),
// meaning that the operation has started, but it might not be finished yet.
.unwrap_or_else(|| {
sub_progress.success(Some(&format!("No issues found when executing `{command}`, but the operation status could not be determined at this time.")));
progress.success(Some(&format!("`{command}` is done, but the operation might be pending.")));
});

Ok(())
}
Expand Down
30 changes: 30 additions & 0 deletions mirrord/config/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1196,6 +1196,36 @@ of regexes specified here. If there is a match, mirrord will connect your applic
the target unix socket address on the target pod. Otherwise, it will leave the connection
to happen locally on your machine.

## feature.split_queues {#feature-split_queues}

Define filters to split queues by, and make your local application consume only messages
that match those filters.
If you don't specify any filter for a queue that is however declared in the
`MirrordWorkloadQueueRegistry` of the target you're using, a match-nothing filter
will be used, and your local application will not receive any messages from that queue.

```json
{
"feature": {
"split_queues": {
"first-queue": {
"queue_type": "SQS",
"message_filter": {
"wows": "so wows",
"coolz": "^very .*"
}
},
"second-queue": {
"queue_type": "SQS",
"message_filter": {
"who": "*you$"
}
},
}
}
}
```

## internal_proxy {#root-internal_proxy}

Configuration for the internal proxy mirrord spawns for each local mirrord session
Expand Down
14 changes: 13 additions & 1 deletion mirrord/config/src/feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ use schemars::JsonSchema;
use serde::Serialize;

use self::{copy_target::CopyTargetConfig, env::EnvConfig, fs::FsConfig, network::NetworkConfig};
use crate::config::source::MirrordConfigSource;
use crate::{config::source::MirrordConfigSource, feature::split_queues::SplitQueuesConfig};

pub mod copy_target;
pub mod env;
pub mod fs;
pub mod network;
pub mod split_queues;

/// Controls mirrord features.
///
Expand Down Expand Up @@ -96,6 +97,16 @@ pub struct FeatureConfig {
/// Should mirrord return the hostname of the target pod when calling `gethostname`
#[config(default = true)]
pub hostname: bool,

/// ## feature.split_queues {#feature-split_queues}
///
/// Define filters to split queues by, and make your local application consume only messages
/// that match those filters.
/// If you don't specify any filter for a queue that is however declared in the
/// `MirrordWorkloadQueueRegistry` of the target you're using, a match-nothing filter
/// will be used, and your local application will not receive any messages from that queue.
#[config(nested, unstable)]
pub split_queues: SplitQueuesConfig,
}

impl CollectAnalytics for &FeatureConfig {
Expand All @@ -105,5 +116,6 @@ impl CollectAnalytics for &FeatureConfig {
analytics.add("network", &self.network);
analytics.add("copy_target", &self.copy_target);
analytics.add("hostname", self.hostname);
analytics.add("split_queues", &self.split_queues);
}
}
113 changes: 113 additions & 0 deletions mirrord/config/src/feature/split_queues.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
use std::{
collections::{BTreeMap, HashMap},
ops::Not,
};

use mirrord_analytics::{Analytics, CollectAnalytics};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

use crate::config::{ConfigContext, FromMirrordConfig, MirrordConfig};

pub type QueueId = String;

/// ```json
/// {
/// "feature": {
/// "split_queues": {
/// "first-queue": {
/// "queue_type": "SQS",
/// "message_filter": {
/// "wows": "so wows",
/// "coolz": "^very .*"
/// }
/// },
/// "second-queue": {
/// "queue_type": "SQS",
/// "message_filter": {
/// "who": "*you$"
/// }
/// },
/// }
/// }
/// }
/// ```
#[derive(Clone, Debug, Eq, PartialEq, JsonSchema, Serialize, Deserialize, Default)]
pub struct SplitQueuesConfig(pub Option<BTreeMap<QueueId, QueueFilter>>);

impl SplitQueuesConfig {
pub fn is_set(&self) -> bool {
self.0.is_some()
}

/// Out of the whole queue splitting config, get only the sqs queues.
pub fn get_sqs_filter(&self) -> Option<HashMap<String, SqsMessageFilter>> {
self.0
.as_ref()
.map(BTreeMap::iter)
.map(|filters| {
filters
// When there are more variants of QueueFilter, change this to a `filter_map`.
.filter_map(|(queue_id, queue_filter)| match queue_filter {
QueueFilter::Sqs(filter_mapping) => {
Some((queue_id.clone(), filter_mapping.clone()))
}
_ => None,
})
.collect()
})
.and_then(|filters_map: HashMap<String, SqsMessageFilter>| {
filters_map.is_empty().not().then_some(filters_map)
})
}
}

impl MirrordConfig for SplitQueuesConfig {
type Generated = Self;

fn generate_config(
self,
_context: &mut ConfigContext,
) -> crate::config::Result<Self::Generated> {
Ok(self)
}
}

impl FromMirrordConfig for SplitQueuesConfig {
type Generator = Self;
}

pub type MessageAttributeName = String;
pub type AttributeValuePattern = String;

/// A filter is a mapping between message attribute names and regexes they should match.
/// The local application will only receive messages that match **all** of the given patterns.
/// This means, only messages that have **all** the `MessageAttributeName`s in the filter,
/// with values of those attributes matching the respective `AttributeValuePattern`.
pub type SqsMessageFilter = BTreeMap<MessageAttributeName, AttributeValuePattern>;

/// More queue types might be added in the future.
#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, JsonSchema)]
#[serde(tag = "queue_type", content = "message_filter")]
pub enum QueueFilter {
/// Amazon Simple Queue Service.
#[serde(rename = "SQS")]
Sqs(SqsMessageFilter),
/// When a newer client sends a new filter kind to an older operator, that does not yet know
/// about that filter type, this is what that filter will be deserialized to.
#[schemars(skip)]
#[serde(other, skip_serializing)]
Unknown,
}

impl CollectAnalytics for &SplitQueuesConfig {
fn collect_analytics(&self, analytics: &mut Analytics) {
analytics.add(
"queue_count",
self.0
.as_ref()
.map(|mapping| mapping.len())
.unwrap_or_default(),
)
}
}
1 change: 1 addition & 0 deletions mirrord/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,7 @@ mod tests {
})),
copy_target: None,
hostname: None,
split_queues: None,
}),
connect_tcp: None,
container: None,
Expand Down
Loading

0 comments on commit d97df8d

Please sign in to comment.