Skip to content

Commit

Permalink
QueueFilter CRD
Browse files Browse the repository at this point in the history
  • Loading branch information
t4lz committed Feb 21, 2024
1 parent af2b48b commit 4ba6c6d
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 10 deletions.
2 changes: 1 addition & 1 deletion mirrord/config/src/feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub mod copy_target;
pub mod env;
pub mod fs;
pub mod network;
mod split_queues;
pub mod split_queues;

/// Controls mirrord features.
///
Expand Down
13 changes: 10 additions & 3 deletions mirrord/config/src/feature/split_queues.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use serde::{Deserialize, Serialize};

use crate::config::{ConfigContext, FromMirrordConfig, MirrordConfig};

pub type QueueId = String;

/// ```json
/// {
/// "feature": {
Expand All @@ -29,7 +31,7 @@ use crate::config::{ConfigContext, FromMirrordConfig, MirrordConfig};
/// }
/// ```
#[derive(Clone, Debug, Eq, PartialEq, JsonSchema, Deserialize, Default)]
pub struct SplitQueuesConfig(Option<HashMap<String, QueueFilter>>);
pub struct SplitQueuesConfig(Option<HashMap<QueueId, QueueFilter>>);

impl SplitQueuesConfig {
pub fn is_set(&self) -> bool {
Expand Down Expand Up @@ -66,12 +68,17 @@ impl FromMirrordConfig for SplitQueuesConfig {
type Generator = Self;
}

pub type MessageAttributeName = String;
pub type AttributeValuePattern = String;

pub type SqsMessageFilter = HashMap<MessageAttributeName, AttributeValuePattern>;

/// More queue types might be added in the future.
#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, JsonSchema)]
#[serde(tag = "queue_type", content = "message_filter")]
enum QueueFilter {
pub enum QueueFilter {
#[serde(rename = "SQS")]
Sqs(HashMap<String, String>),
Sqs(SqsMessageFilter),
}

impl CollectAnalytics for &SplitQueuesConfig {
Expand Down
40 changes: 36 additions & 4 deletions mirrord/operator/src/crd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ use std::{

use chrono::NaiveDate;
use kube::CustomResource;
use mirrord_config::target::{Target, TargetConfig};
pub use mirrord_config::feature::split_queues::QueueId;
use mirrord_config::{
feature::split_queues::SqsMessageFilter,
target::{Target, TargetConfig},
};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -350,11 +354,17 @@ pub enum QueueNameSource {
EnvVar(String),
}

pub type OutputQueueName = String;

/// The details of a queue that should be split.
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, JsonSchema)]
#[serde(tag = "queueType")]
pub enum SplitQueue {
/// Amazon SQS
///
/// Where the application gets the queue name from. Will be used to read messages from that
/// queue and distribute them to the output queues. When running with mirrord and splitting
/// this queue, applications will get a modified name from that source.
#[serde(rename = "SQS")]
Sqs(QueueNameSource),
}
Expand All @@ -368,12 +378,10 @@ pub enum QueueConsumer {

#[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema)]
pub struct QueueSplitterStatus {
// TODO: this is just for now. later it will be something like a set of filters.
// TODO: ?
pub active: bool,
}

pub type QueueId = String;

/// Defines a Custom Resource that holds a central configuration for splitting a queue. mirrord
/// users specify a splitter by name in their configuration. mirrord then starts splitting according
/// to the spec and the user's filter.
Expand All @@ -394,3 +402,27 @@ pub struct MirrordQueueSplitterSpec {
/// The resource (deployment or Argo rollout) that reads from the queues.
pub consumer: QueueConsumer,
}

pub struct SQSFilterStatus {
// TODO: ?
pub active: bool,
}

// TODO: docs
#[derive(CustomResource, Clone, Debug, Deserialize, Serialize, JsonSchema)]
#[kube(
group = "splitters.mirrord.metalbear.co",
version = "v1alpha",
kind = "MirrordSQSFilter",
root = "MirrordSqsFilter", // for Rust naming conventions (Sqs, not SQS)
shortname = "qs",
status = "SQSFilterStatus",
namespaced
)]
pub struct MirrordSqsFilterSpec {
// TODO: docs
pub output_queue_name: OutputQueueName,

// TODO: docs
pub filter: SqsMessageFilter,
}
20 changes: 18 additions & 2 deletions mirrord/operator/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use k8s_openapi::{
use kube::{CustomResourceExt, Resource};
use thiserror::Error;

use crate::crd::{MirrordPolicy, MirrordQueueSplitter, TargetCrd};
use crate::crd::{MirrordPolicy, MirrordQueueSplitter, MirrordSqsFilter, TargetCrd};

static OPERATOR_NAME: &str = "mirrord-operator";
static OPERATOR_PORT: i32 = 3000;
Expand Down Expand Up @@ -180,6 +180,9 @@ impl OperatorSetup for Operator {
writer.write_all(b"---\n")?;
MirrordQueueSplitter::crd().to_writer(&mut writer)?;

writer.write_all(b"---\n")?;
MirrordSqsFilter::crd().to_writer(&mut writer)?;

Ok(())
}
}
Expand Down Expand Up @@ -464,7 +467,20 @@ impl OperatorRole {
PolicyRule {
api_groups: Some(vec!["splitters.mirrord.metalbear.co".to_owned()]),
resources: Some(vec![MirrordQueueSplitter::plural(&()).to_string()]),
verbs: vec!["list".to_owned(), "get".to_owned()],
verbs: vec!["list".to_owned()],
..Default::default()
},
// Allow the operator to control mirrord queue filters.
PolicyRule {
api_groups: Some(vec!["splitters.mirrord.metalbear.co".to_owned()]),
resources: Some(vec![MirrordSqsFilter::plural(&()).to_string()]),
verbs: vec![
"create".to_owned(),
"watch".to_owned(),
"list".to_owned(),
"get".to_owned(),
"delete".to_owned(),
],
..Default::default()
},
]),
Expand Down

0 comments on commit 4ba6c6d

Please sign in to comment.