Skip to content

Commit 847d264

Browse files
committed
implements owns_stream and owns_stream_with for Controller
Signed-off-by: David Herberth <[email protected]>
1 parent aed07be commit 847d264

File tree

2 files changed

+59
-1
lines changed

2 files changed

+59
-1
lines changed

kube-runtime/Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@ rust-version = "1.63.0"
1515
edition = "2021"
1616

1717
[features]
18-
unstable-runtime = ["unstable-runtime-subscribe"]
18+
unstable-runtime = ["unstable-runtime-subscribe", "unstable-runtime-owns-stream"]
1919
unstable-runtime-subscribe = []
20+
unstable-runtime-owns-stream = []
2021

2122
[package.metadata.docs.rs]
2223
features = ["k8s-openapi/v1_26", "unstable-runtime"]

kube-runtime/src/controller/mod.rs

+57
Original file line numberDiff line numberDiff line change
@@ -584,6 +584,63 @@ where
584584
self
585585
}
586586

587+
/// Trigger the reconciliation process for a stream of `Child` objects of the owner `K`
588+
///
589+
/// Same as [`Controller::owns`], but instad of a resource a stream of resources is used.
590+
/// This allows for customized and pre-filtered watch streams to be used as a trigger.
591+
///
592+
/// # Example:
593+
///
594+
/// ```no_run
595+
/// # use futures::StreamExt;
596+
/// # use k8s_openapi::api::core::v1::ConfigMap;
597+
/// # use k8s_openapi::api::apps::v1::StatefulSet;
598+
/// # use kube::runtime::controller::Action;
599+
/// # use kube::runtime::{watcher, Controller, WatchStreamExt};
600+
/// # use kube::{Api, Client, Error, ResourceExt};
601+
/// # use std::sync::Arc;
602+
/// # type CustomResource = ConfigMap;
603+
/// # async fn reconcile(_: Arc<CustomResource>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
604+
/// # fn error_policy(_: Arc<CustomResource>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
605+
/// # async fn doc(client: kube::Client) {
606+
/// let sts_stream = watcher(Api::<StatefulSet>::all(client.clone()), watcher::Config::default())
607+
/// .touched_objects()
608+
/// .predicate_filter(predicates::generation);
609+
///
610+
/// Controller::new(Api::<CustomResource>::all(client), watcher::Config::default())
611+
/// .owns_stream(sts_stream)
612+
/// .run(reconcile, error_policy, Arc::new(()))
613+
/// .for_each(|_| std::future::ready(()))
614+
/// .await;
615+
/// # }
616+
/// ```
617+
#[cfg(feature = "unstable-runtime-owns-stream")]
618+
#[must_use]
619+
pub fn owns_stream<Child: Resource<DynamicType = ()> + Send + 'static>(
620+
self,
621+
trigger: impl Stream<Item = Result<Child, watcher::Error>> + Send + 'static,
622+
) -> Self {
623+
self.owns_stream_with(trigger, ())
624+
}
625+
626+
/// Trigger the reconciliation process for a stream of `Child` objects of the owner `K`
627+
///
628+
/// Same as [`Controller::owns_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
629+
#[cfg(feature = "unstable-runtime-owns-stream")]
630+
#[must_use]
631+
pub fn owns_stream_with<Child: Resource + Send + 'static>(
632+
mut self,
633+
trigger: impl Stream<Item = Result<Child, watcher::Error>> + Send + 'static,
634+
dyntype: Child::DynamicType,
635+
) -> Self
636+
where
637+
Child::DynamicType: Debug + Eq + Hash + Clone,
638+
{
639+
let child_watcher = trigger_owners(trigger, self.dyntype.clone(), dyntype);
640+
self.trigger_selector.push(child_watcher.boxed());
641+
self
642+
}
643+
587644
/// Specify `Watched` object which `K` has a custom relation to and should be watched
588645
///
589646
/// To define the `Watched` relation with `K`, you **must** define a custom relation mapper, which,

0 commit comments

Comments
 (0)