diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index fa317472b..171019a86 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -15,10 +15,10 @@ rust-version = "1.63.0" edition = "2021" [features] -unstable-runtime = ["unstable-runtime-subscribe", "unstable-runtime-predicates", "unstable-runtime-owns-stream"] +unstable-runtime = ["unstable-runtime-subscribe", "unstable-runtime-predicates", "unstable-runtime-stream-control"] unstable-runtime-subscribe = [] unstable-runtime-predicates = [] -unstable-runtime-owns-stream = [] +unstable-runtime-stream-control = [] [package.metadata.docs.rs] features = ["k8s-openapi/v1_26", "unstable-runtime"] diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index 454d62f1a..0c5d80a70 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -118,6 +118,37 @@ where }) } +/// Enqueues any mapper returned `K` types for reconciliation +fn trigger_others( + stream: S, + mapper: impl Fn(S::Ok) -> I + Sync + Send + 'static, + dyntype: ::DynamicType, +) -> impl Stream, S::Error>> +where + // Input stream has items as some Resource (via Controller::watches) + S: TryStream, + S::Ok: Resource, + ::DynamicType: Clone, + // Output stream is requests for the root type K + K: Resource, + K::DynamicType: Clone, + // but the mapper can produce many of them + I: 'static + IntoIterator>, + I::IntoIter: Send, +{ + trigger_with(stream, move |obj| { + let watch_ref = ObjectRef::from_obj_with(&obj, dyntype.clone()).erase(); + mapper(obj) + .into_iter() + .map(move |mapped_obj_ref| ReconcileRequest { + obj_ref: mapped_obj_ref, + reason: ReconcileReason::RelatedObjectUpdated { + obj_ref: Box::new(watch_ref.clone()), + }, + }) + }) +} + /// Enqueues any owners of type `KOwner` for reconciliation pub fn trigger_owners( stream: S, @@ -131,22 +162,16 @@ where KOwner: Resource, KOwner::DynamicType: Clone, { - trigger_with(stream, move |obj| { + let mapper = move |obj: S::Ok| { let meta = obj.meta().clone(); let ns = meta.namespace; let owner_type = owner_type.clone(); - let child_ref = ObjectRef::from_obj_with(&obj, child_type.clone()).erase(); meta.owner_references .into_iter() .flatten() .filter_map(move |owner| ObjectRef::from_owner_ref(ns.as_deref(), &owner, owner_type.clone())) - .map(move |owner_ref| ReconcileRequest { - obj_ref: owner_ref, - reason: ReconcileReason::RelatedObjectUpdated { - obj_ref: Box::new(child_ref.clone()), - }, - }) - }) + }; + trigger_others(stream, mapper, child_type) } /// A request to reconcile an object, annotated with why that request was made. @@ -380,32 +405,32 @@ where } } -/// Controller +/// Controller for a Resource `K` /// -/// A controller is made up of: -/// - 1 `reflector` (for the core object) -/// - N `watcher` objects for each object child object -/// - user defined `reconcile` + `error_policy` callbacks -/// - a generated input stream considering all sources +/// A controller is an infinite stream of objects to be reconciled. /// -/// And all reconcile requests through an internal scheduler +/// Once `run` and continuously awaited, it continuously calls out to user provided +/// `reconcile` and `error_policy` callbacks whenever relevant changes are detected +/// or if errors are seen from `reconcile`. /// -/// Pieces: +/// Reconciles are generally requested for all changes on your root objects. +/// Changes to managed child resources will also trigger the reconciler for the +/// managing object by travirsing owner references (for `Controller::owns`), +/// or traverse a custom mapping (for `Controller::watches`). +/// +/// This mapping mechanism ultimately hides the reason for the reconciliation request, +/// and forces you to write an idempotent reconciler. +/// +/// General setup: /// ```no_run -/// use kube::{ -/// Client, CustomResource, -/// api::{Api, ListParams}, -/// runtime::{ -/// controller::{Controller, Action}, -/// watcher, -/// }, -/// }; -/// use serde::{Deserialize, Serialize}; -/// use tokio::time::Duration; +/// use kube::{Api, Client, CustomResource}; +/// use kube::runtime::{controller::{Controller, Action}, watcher, reflector}; +/// # use serde::{Deserialize, Serialize}; +/// # use tokio::time::Duration; /// use futures::StreamExt; /// use k8s_openapi::api::core::v1::ConfigMap; /// use schemars::JsonSchema; -/// use std::sync::Arc; +/// # use std::sync::Arc; /// use thiserror::Error; /// /// #[derive(Debug, Error)] @@ -475,7 +500,7 @@ where K: Clone + Resource + DeserializeOwned + Debug + Send + Sync + 'static, K::DynamicType: Eq + Hash + Clone, { - /// Create a Controller on a type `K` + /// Create a Controller for a resource `K` /// /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`. /// @@ -483,14 +508,14 @@ where /// and receive reconcile events for. /// For the full set of objects `K` in the given `Api` scope, you can use [`Config::default`]. #[must_use] - pub fn new(owned_api: Api, wc: Config) -> Self + pub fn new(main_api: Api, wc: Config) -> Self where K::DynamicType: Default, { - Self::new_with(owned_api, wc, Default::default()) + Self::new_with(main_api, wc, Default::default()) } - /// Create a Controller on a type `K` + /// Create a Controller for a resource `K` /// /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`. /// @@ -504,12 +529,12 @@ where /// [`Api`]: kube_client::Api /// [`dynamic`]: kube_client::core::dynamic /// [`Config::default`]: crate::watcher::Config::default - pub fn new_with(owned_api: Api, wc: Config, dyntype: K::DynamicType) -> Self { + pub fn new_with(main_api: Api, wc: Config, dyntype: K::DynamicType) -> Self { let writer = Writer::::new(dyntype.clone()); let reader = writer.as_reader(); let mut trigger_selector = stream::SelectAll::new(); let self_watcher = trigger_self( - reflector(writer, watcher(owned_api, wc)).applied_objects(), + reflector(writer, watcher(main_api, wc)).applied_objects(), dyntype.clone(), ) .boxed(); @@ -530,6 +555,89 @@ where } } + /// Create a Controller for a resource `K` from a stream of `K` objects + /// + /// Same as [`Controller::new`], but instead of an `Api`, a stream of resources is used. + /// This allows for customized and pre-filtered watch streams to be used as a trigger, + /// as well as sharing input streams between multiple controllers. + /// + /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature. + /// + /// # Example: + /// + /// ```no_run + /// # use futures::StreamExt; + /// # use k8s_openapi::api::apps::v1::Deployment; + /// # use kube::runtime::controller::{Action, Controller}; + /// # use kube::runtime::{predicates, watcher, reflector, WatchStreamExt}; + /// # use kube::{Api, Client, Error, ResourceExt}; + /// # use std::sync::Arc; + /// # async fn reconcile(_: Arc, _: Arc<()>) -> Result { Ok(Action::await_change()) } + /// # fn error_policy(_: Arc, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() } + /// # async fn doc(client: kube::Client) { + /// let api: Api = Api::default_namespaced(client); + /// let (reader, writer) = reflector::store(); + /// let deploys = reflector(writer, watcher(api, watcher::Config::default())) + /// .applied_objects() + /// .predicate_filter(predicates::generation); + /// + /// Controller::for_stream(deploys, reader) + /// .run(reconcile, error_policy, Arc::new(())) + /// .for_each(|_| std::future::ready(())) + /// .await; + /// # } + /// ``` + /// + /// Prefer [`Controller::new`] if you do not need to share the stream, or do not need pre-filtering. + #[cfg(feature = "unstable-runtime-stream-control")] + pub fn for_stream( + trigger: impl Stream> + Send + 'static, + reader: Store, + ) -> Self + where + K::DynamicType: Default, + { + Self::for_stream_with(trigger, reader, Default::default()) + } + + /// Create a Controller for a resource `K` from a stream of `K` objects + /// + /// Same as [`Controller::new`], but instead of an `Api`, a stream of resources is used. + /// This allows for customized and pre-filtered watch streams to be used as a trigger, + /// as well as sharing input streams between multiple controllers. + /// + /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature. + /// + /// Prefer [`Controller::new`] if you do not need to share the stream, or do not need pre-filtering. + /// + /// This variant constructor is for [`dynamic`] types found through discovery. Prefer [`Controller::for_stream`] for static types. + /// + /// [`dynamic`]: kube_client::core::dynamic + #[cfg(feature = "unstable-runtime-stream-control")] + pub fn for_stream_with( + trigger: impl Stream> + Send + 'static, + reader: Store, + dyntype: K::DynamicType, + ) -> Self { + let mut trigger_selector = stream::SelectAll::new(); + let self_watcher = trigger_self(trigger, dyntype.clone()).boxed(); + trigger_selector.push(self_watcher); + Self { + trigger_selector, + trigger_backoff: Box::new(watcher::default_backoff()), + graceful_shutdown_selector: vec![ + // Fallback future, ensuring that we never terminate if no additional futures are added to the selector + future::pending().boxed(), + ], + forceful_shutdown_selector: vec![ + // Fallback future, ensuring that we never terminate if no additional futures are added to the selector + future::pending().boxed(), + ], + dyntype, + reader, + } + } + /// Specify the backoff policy for "trigger" watches /// /// This includes the core watch, as well as auxilary watches introduced by [`Self::owns`] and [`Self::watches`]. @@ -579,6 +687,7 @@ where where Child::DynamicType: Debug + Eq + Hash + Clone, { + // TODO: call owns_stream_with when it's stable let child_watcher = trigger_owners(watcher(api, wc).touched_objects(), self.dyntype.clone(), dyntype); self.trigger_selector.push(child_watcher.boxed()); self @@ -586,8 +695,13 @@ where /// Trigger the reconciliation process for a stream of `Child` objects of the owner `K` /// - /// Same as [`Controller::owns`], but instad of a resource a stream of resources is used. - /// This allows for customized and pre-filtered watch streams to be used as a trigger. + /// Same as [`Controller::owns`], but instead of an `Api`, a stream of resources is used. + /// This allows for customized and pre-filtered watch streams to be used as a trigger, + /// as well as sharing input streams between multiple controllers. + /// + /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature. + /// + /// Watcher streams passed in here should be filtered first through `touched_objects`. /// /// # Example: /// @@ -614,7 +728,7 @@ where /// .await; /// # } /// ``` - #[cfg(feature = "unstable-runtime-owns-stream")] + #[cfg(feature = "unstable-runtime-stream-control")] #[must_use] pub fn owns_stream + Send + 'static>( self, @@ -625,8 +739,14 @@ where /// Trigger the reconciliation process for a stream of `Child` objects of the owner `K` /// + /// Same as [`Controller::owns`], but instead of an `Api`, a stream of resources is used. + /// This allows for customized and pre-filtered watch streams to be used as a trigger, + /// as well as sharing input streams between multiple controllers. + /// + /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature. + /// /// Same as [`Controller::owns_stream`], but accepts a `DynamicType` so it can be used with dynamic resources. - #[cfg(feature = "unstable-runtime-owns-stream")] + #[cfg(feature = "unstable-runtime-stream-control")] #[must_use] pub fn owns_stream_with( mut self, @@ -660,8 +780,7 @@ where /// /// ``` /// # use kube::runtime::{Controller, controller::Action, reflector::ObjectRef, watcher}; - /// # use kube::api::{Api, ListParams}; - /// # use kube::ResourceExt; + /// # use kube::{Api, ResourceExt}; /// # use k8s_openapi::api::core::v1::{ConfigMap, Namespace}; /// # use futures::StreamExt; /// # use std::sync::Arc; @@ -707,29 +826,26 @@ where /// /// [Operator-SDK]: https://sdk.operatorframework.io/docs/building-operators/ansible/reference/retroactively-owned-resources/ #[must_use] - pub fn watches< - Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static, - I: 'static + IntoIterator>, - >( + pub fn watches( self, api: Api, wc: Config, mapper: impl Fn(Other) -> I + Sync + Send + 'static, ) -> Self where + Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static, + Other::DynamicType: Default + Debug + Clone + Eq + Hash, + I: 'static + IntoIterator>, I::IntoIter: Send, { - self.watches_with(api, (), wc, mapper) + self.watches_with(api, Default::default(), wc, mapper) } /// Specify `Watched` object which `K` has a custom relation to and should be watched /// /// Same as [`Controller::watches`], but accepts a `DynamicType` so it can be used with dynamic resources. #[must_use] - pub fn watches_with< - Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static, - I: 'static + IntoIterator>, - >( + pub fn watches_with( mut self, api: Api, dyntype: Other::DynamicType, @@ -737,20 +853,94 @@ where mapper: impl Fn(Other) -> I + Sync + Send + 'static, ) -> Self where + Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static, + I: 'static + IntoIterator>, I::IntoIter: Send, - Other::DynamicType: Clone, + Other::DynamicType: Debug + Clone + Eq + Hash, { - let other_watcher = trigger_with(watcher(api, wc).touched_objects(), move |obj| { - let watched_obj_ref = ObjectRef::from_obj_with(&obj, dyntype.clone()).erase(); - mapper(obj) - .into_iter() - .map(move |mapped_obj_ref| ReconcileRequest { - obj_ref: mapped_obj_ref, - reason: ReconcileReason::RelatedObjectUpdated { - obj_ref: Box::new(watched_obj_ref.clone()), - }, - }) - }); + let other_watcher = trigger_others(watcher(api, wc).touched_objects(), mapper, dyntype); + self.trigger_selector.push(other_watcher.boxed()); + self + } + + /// Trigger the reconciliation process for a stream of `Other` objects related to a `K` + /// + /// Same as [`Controller::watches`], but instead of an `Api`, a stream of resources is used. + /// This allows for customized and pre-filtered watch streams to be used as a trigger, + /// as well as sharing input streams between multiple controllers. + /// + /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature. + /// + /// Watcher streams passed in here should be filtered first through `touched_objects`. + /// + /// # Example: + /// + /// ```no_run + /// # use futures::StreamExt; + /// # use k8s_openapi::api::core::v1::ConfigMap; + /// # use k8s_openapi::api::apps::v1::DaemonSet; + /// # use kube::runtime::controller::Action; + /// # use kube::runtime::{predicates, reflector::ObjectRef, watcher, Controller, WatchStreamExt}; + /// # use kube::{Api, Client, Error, ResourceExt}; + /// # use std::sync::Arc; + /// # type CustomResource = ConfigMap; + /// # async fn reconcile(_: Arc, _: Arc<()>) -> Result { Ok(Action::await_change()) } + /// # fn error_policy(_: Arc, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() } + /// fn mapper(_: DaemonSet) -> Option> { todo!() } + /// # async fn doc(client: kube::Client) { + /// let api: Api = Api::all(client.clone()); + /// let cr: Api = Api::all(client.clone()); + /// let daemons = watcher(api, watcher::Config::default()) + /// .touched_objects() + /// .predicate_filter(predicates::generation); + /// + /// Controller::new(cr, watcher::Config::default()) + /// .watches_stream(daemons, mapper) + /// .run(reconcile, error_policy, Arc::new(())) + /// .for_each(|_| std::future::ready(())) + /// .await; + /// # } + /// ``` + #[cfg(feature = "unstable-runtime-stream-control")] + #[must_use] + pub fn watches_stream( + self, + trigger: impl Stream> + Send + 'static, + mapper: impl Fn(Other) -> I + Sync + Send + 'static, + ) -> Self + where + Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static, + Other::DynamicType: Default + Debug + Clone, + I: 'static + IntoIterator>, + I::IntoIter: Send, + { + self.watches_stream_with(trigger, mapper, Default::default()) + } + + /// Trigger the reconciliation process for a stream of `Other` objects related to a `K` + /// + /// Same as [`Controller::owns`], but instead of an `Api`, a stream of resources is used. + /// This allows for customized and pre-filtered watch streams to be used as a trigger, + /// as well as sharing input streams between multiple controllers. + /// + /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature. + /// + /// Same as [`Controller::watches_stream`], but accepts a `DynamicType` so it can be used with dynamic resources. + #[cfg(feature = "unstable-runtime-stream-control")] + #[must_use] + pub fn watches_stream_with( + mut self, + trigger: impl Stream> + Send + 'static, + mapper: impl Fn(Other) -> I + Sync + Send + 'static, + dyntype: Other::DynamicType, + ) -> Self + where + Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static, + Other::DynamicType: Debug + Clone, + I: 'static + IntoIterator>, + I::IntoIter: Send, + { + let other_watcher = trigger_others(trigger, mapper, dyntype); self.trigger_selector.push(other_watcher.boxed()); self } diff --git a/kube-runtime/src/utils/predicate.rs b/kube-runtime/src/utils/predicate.rs index 5905365cb..4ca66abe6 100644 --- a/kube-runtime/src/utils/predicate.rs +++ b/kube-runtime/src/utils/predicate.rs @@ -75,6 +75,12 @@ where } } +/// Predicate functions for [`WatchStreamExt::predicate_filter`](crate::WatchStreamExt::predicate_filter) +/// +/// These functions just return a hash of commonly compared values, +/// to help decide whether to pass a watch event along or not. +/// +/// Functional rewrite of the [controller-runtime/predicate module](https://github.com/kubernetes-sigs/controller-runtime/blob/main/pkg/predicate/predicate.go). pub mod predicates { use kube_client::{Resource, ResourceExt}; use std::{ @@ -82,8 +88,6 @@ pub mod predicates { hash::{Hash, Hasher}, }; - // See: https://github.com/kubernetes-sigs/controller-runtime/blob/v0.12.0/pkg/predicate/predicate.go - fn hash(t: &T) -> u64 { let mut hasher = DefaultHasher::new(); t.hash(&mut hasher); diff --git a/kube-runtime/src/utils/watch_ext.rs b/kube-runtime/src/utils/watch_ext.rs index e2e3e1b50..8246c23ed 100644 --- a/kube-runtime/src/utils/watch_ext.rs +++ b/kube-runtime/src/utils/watch_ext.rs @@ -47,7 +47,9 @@ pub trait WatchStreamExt: Stream { /// /// This will filter out repeat calls where the predicate returns the same result. /// Common use case for this is to avoid repeat events for status updates - /// by filtering on []`predicates::generation`]. + /// by filtering on [`predicates::generation`](crate::predicates::generation). + /// + /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature. /// /// ## Usage /// ```no_run @@ -84,12 +86,14 @@ pub trait WatchStreamExt: Stream { /// The [`StreamSubscribe::subscribe()`] method which allows additional consumers /// of events from a stream without consuming the stream itself. /// - /// If a subscriber begins to lag behind the stream, it will receive an [`Error::Lagged`] + /// If a subscriber begins to lag behind the stream, it will receive an [`Error::Lagged`](crate::utils::stream_subscribe::Error::Lagged) /// error. The subscriber can then decide to abort its task or tolerate the lost events. /// /// If the [`Stream`] is dropped or ends, any [`StreamSubscribe::subscribe()`] streams /// will also end. /// + /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature. + /// /// ## Warning /// /// If the primary [`Stream`] is not polled, the [`StreamSubscribe::subscribe()`] streams diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index fe62202cd..48029ff3e 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -508,7 +508,7 @@ where /// runtime::{watcher, WatchStreamExt} /// }; /// use k8s_openapi::api::core::v1::Pod; -/// use futures::{StreamExt, TryStreamExt}; +/// use futures::TryStreamExt; /// #[tokio::main] /// async fn main() -> Result<(), watcher::Error> { /// let client = Client::try_default().await.unwrap(); @@ -571,7 +571,7 @@ pub fn watcher( /// runtime::{watcher, metadata_watcher, WatchStreamExt} /// }; /// use k8s_openapi::api::core::v1::Pod; -/// use futures::{StreamExt, TryStreamExt}; +/// use futures::TryStreamExt; /// #[tokio::main] /// async fn main() -> Result<(), watcher::Error> { /// let client = Client::try_default().await.unwrap();