Skip to content

Commit

Permalink
CR: CopyTargetCrd, send queues config
Browse files Browse the repository at this point in the history
  • Loading branch information
t4lz committed Aug 7, 2024
1 parent 571b5a2 commit a920d00
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 20 deletions.
29 changes: 20 additions & 9 deletions mirrord/config/src/feature/split_queues.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::collections::{BTreeMap, HashMap};
use std::{
collections::{BTreeMap, HashMap},
ops::Not,
};

use mirrord_analytics::{Analytics, CollectAnalytics};
use schemars::JsonSchema;
Expand Down Expand Up @@ -39,14 +42,22 @@ impl SplitQueuesConfig {

/// Out of the whole queue splitting config, get only the sqs queues.
pub fn get_sqs_filter(&self) -> Option<HashMap<String, SqsMessageFilter>> {
self.0.as_ref().map(BTreeMap::iter).map(|filters| {
filters
// When there are more variants of QueueFilter, change this to a `filter_map`.
.map(|(queue_id, queue_filter)| match queue_filter {
QueueFilter::Sqs(filter_mapping) => (queue_id.clone(), filter_mapping.clone()),
})
.collect()
})
self.0
.as_ref()
.map(BTreeMap::iter)
.map(|filters| {
filters
// When there are more variants of QueueFilter, change this to a `filter_map`.
.map(|(queue_id, queue_filter)| match queue_filter {
QueueFilter::Sqs(filter_mapping) => {
(queue_id.clone(), filter_mapping.clone())
}
})
.collect()
})
.and_then(|filters_map: HashMap<String, SqsMessageFilter>| {
filters_map.is_empty().not().then_some(filters_map)
})
}
}

Expand Down
13 changes: 4 additions & 9 deletions mirrord/operator/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use mirrord_auth::{
use mirrord_config::{
feature::{
network::incoming::ConcurrentSteal,
split_queues::{QueueFilter, QueueId},
split_queues::{QueueFilter, QueueId, SplitQueuesConfig},
},
target::Target,
LayerConfig,
Expand Down Expand Up @@ -663,12 +663,7 @@ impl OperatorApi<PreparedClientCert> {
let scale_down = config.feature.copy_target.scale_down;
let namespace = self.target_namespace(config);
let copied = self
.copy_target(
target,
scale_down,
namespace,
config.feature.split_queues.0.clone(),
)
.copy_target(target, scale_down, namespace, &config.feature.split_queues)
.await?;

copy_subtask.success(Some("target copied"));
Expand Down Expand Up @@ -734,7 +729,7 @@ impl OperatorApi<PreparedClientCert> {
target: Target,
scale_down: bool,
namespace: &str,
split_queues: Option<BTreeMap<QueueId, QueueFilter>>,
split_queues: &SplitQueuesConfig,
) -> OperatorApiResult<CopyTargetCrd> {
let name = TargetCrd::urlfied_name(&target);

Expand All @@ -744,7 +739,7 @@ impl OperatorApi<PreparedClientCert> {
target,
idle_ttl: Some(Self::COPIED_POD_IDLE_TTL),
scale_down,
split_queues,
split_queues: split_queues.clone(),
},
);

Expand Down
4 changes: 2 additions & 2 deletions mirrord/operator/src/crd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use kube::CustomResource;
use kube_target::{KubeTarget, UnknownTargetType};
pub use mirrord_config::feature::split_queues::QueueId;
use mirrord_config::{
feature::split_queues::{QueueFilter, SqsMessageFilter},
feature::split_queues::{QueueFilter, SplitQueuesConfig, SqsMessageFilter},
target::{Target, TargetConfig},
};
use schemars::JsonSchema;
Expand Down Expand Up @@ -302,7 +302,7 @@ pub struct CopyTargetSpec {
/// Ignored if [`Target`] is not [`Target::Deployment`].
pub scale_down: bool,
/// Split queues client side configuration.
pub split_queues: Option<BTreeMap<QueueId, QueueFilter>>,
pub split_queues: SplitQueuesConfig,
}

/// Features and operations that can be blocked by a `MirrordPolicy`.
Expand Down

0 comments on commit a920d00

Please sign in to comment.