Skip to content

Commit c89a419

Browse files
mateiidavidclux
andauthored
Add shared stream interfaces (#1449)
Currently, there is no way for a controller to use a shared stream. If two or more distinct controllers require a watch on the same type of resource, separate watches have to be created. To better utilise resources, it would be ideal for only one watch to exist and share multiple streams from the same watch. This change introduces shared streams through a couple of new primitives and changes to public interfaces. Stores can now be created through a new constructor that takes a buffer size. When a buffer size is specified, stores will have an underlying channel that supports creating multiple readers from a single watch passed into a reflector. Furthermore, we introduce a new interface `for_shared_stream` that allows threading shared streams through controllers. This feature is currently gated behind a feature flag in the runtime crate. N.B. a current `StreamSubscribe` interface exists for the same reasons described above. The interface is not compatible with controllers and does not exert backpressure and will be removed in a future iteration. --------- Signed-off-by: Matei David <[email protected]> Co-authored-by: Eirik A <[email protected]>
1 parent c1df726 commit c89a419

File tree

9 files changed

+887
-7
lines changed

9 files changed

+887
-7
lines changed

Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ missing_docs = "deny"
3434
ahash = "0.8"
3535
anyhow = "1.0.71"
3636
assert-json-diff = "2.0.2"
37+
async-broadcast = "0.7.0"
38+
async-stream = "0.3.5"
3739
async-trait = "0.1.64"
3840
backoff = "0.4.0"
3941
base64 = "0.22.0"

examples/Cargo.toml

+4
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ crossterm = "0.27.0"
6161
name = "configmapgen_controller"
6262
path = "configmapgen_controller.rs"
6363

64+
[[example]]
65+
name = "shared_stream_controllers"
66+
path = "shared_stream_controllers.rs"
67+
6468
[[example]]
6569
name = "crd_api"
6670
path = "crd_api.rs"

examples/shared_stream_controllers.rs

+187
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
use std::{sync::Arc, time::Duration};
2+
3+
use futures::StreamExt;
4+
use k8s_openapi::api::core::v1::{Pod, PodCondition};
5+
use kube::{
6+
api::{Patch, PatchParams},
7+
runtime::{
8+
controller::Action,
9+
reflector::{self},
10+
watcher, Config, Controller, WatchStreamExt,
11+
},
12+
Api, Client, ResourceExt,
13+
};
14+
use tracing::{debug, error, info, warn};
15+
16+
use thiserror::Error;
17+
18+
// Helper module that namespaces two constants describing a Kubernetes status condition
19+
pub mod condition {
20+
pub static UNDOCUMENTED_TYPE: &str = "UndocumentedPort";
21+
pub static STATUS_TRUE: &str = "True";
22+
}
23+
24+
const SUBSCRIBE_BUFFER_SIZE: usize = 256;
25+
26+
#[derive(Debug, Error)]
27+
enum Error {
28+
#[error("Failed to patch pod: {0}")]
29+
WriteFailed(#[source] kube::Error),
30+
31+
#[error("Missing po field: {0}")]
32+
MissingField(&'static str),
33+
}
34+
35+
#[derive(Clone)]
36+
struct Data {
37+
client: Client,
38+
}
39+
40+
/// A simple reconciliation function that will copy a pod's labels into the annotations.
41+
async fn reconcile_metadata(pod: Arc<Pod>, ctx: Arc<Data>) -> Result<Action, Error> {
42+
let namespace = &pod.namespace().unwrap_or_default();
43+
if namespace == "kube-system" {
44+
return Ok(Action::await_change());
45+
}
46+
47+
let mut pod = (*pod).clone();
48+
pod.metadata.managed_fields = None;
49+
// combine labels and annotations into a new map
50+
let labels = pod.labels().clone().into_iter();
51+
pod.annotations_mut().extend(labels);
52+
53+
let pod_api = Api::<Pod>::namespaced(
54+
ctx.client.clone(),
55+
pod.metadata
56+
.namespace
57+
.as_ref()
58+
.ok_or_else(|| Error::MissingField(".metadata.name"))?,
59+
);
60+
61+
pod_api
62+
.patch(
63+
&pod.name_any(),
64+
&PatchParams::apply("controller-1"),
65+
&Patch::Apply(&pod),
66+
)
67+
.await
68+
.map_err(Error::WriteFailed)?;
69+
70+
Ok(Action::requeue(Duration::from_secs(300)))
71+
}
72+
73+
/// Another reconiliation function that will add an 'UndocumentedPort' condition to pods that do
74+
/// do not have any ports declared across all containers.
75+
async fn reconcile_status(pod: Arc<Pod>, ctx: Arc<Data>) -> Result<Action, Error> {
76+
for container in pod.spec.clone().unwrap_or_default().containers.iter() {
77+
if container.ports.clone().unwrap_or_default().len() != 0 {
78+
debug!(name = %pod.name_any(), "Skipped updating pod with documented ports");
79+
return Ok(Action::await_change());
80+
}
81+
}
82+
83+
let pod_api = Api::<Pod>::namespaced(
84+
ctx.client.clone(),
85+
pod.metadata
86+
.namespace
87+
.as_ref()
88+
.ok_or_else(|| Error::MissingField(".metadata.name"))?,
89+
);
90+
91+
let undocumented_condition = PodCondition {
92+
type_: condition::UNDOCUMENTED_TYPE.into(),
93+
status: condition::STATUS_TRUE.into(),
94+
..Default::default()
95+
};
96+
let value = serde_json::json!({
97+
"status": {
98+
"name": pod.name_any(),
99+
"kind": "Pod",
100+
"conditions": vec![undocumented_condition]
101+
}
102+
});
103+
pod_api
104+
.patch_status(
105+
&pod.name_any(),
106+
&PatchParams::apply("controller-2"),
107+
&Patch::Strategic(value),
108+
)
109+
.await
110+
.map_err(Error::WriteFailed)?;
111+
112+
Ok(Action::requeue(Duration::from_secs(300)))
113+
}
114+
115+
fn error_policy(obj: Arc<Pod>, error: &Error, _ctx: Arc<Data>) -> Action {
116+
error!(%error, name = %obj.name_any(), "Failed reconciliation");
117+
Action::requeue(Duration::from_secs(10))
118+
}
119+
120+
#[tokio::main]
121+
async fn main() -> anyhow::Result<()> {
122+
tracing_subscriber::fmt::init();
123+
124+
let client = Client::try_default().await?;
125+
let pods = Api::<Pod>::namespaced(client.clone(), "default");
126+
let config = Config::default().concurrency(2);
127+
let ctx = Arc::new(Data { client });
128+
129+
// Create a shared store with a predefined buffer that will be shared between subscribers.
130+
let (reader, writer) = reflector::store_shared(SUBSCRIBE_BUFFER_SIZE);
131+
// Before threading an object watch through the store, create a subscriber.
132+
// Any number of subscribers can be created from one writer.
133+
let subscriber = writer
134+
.subscribe()
135+
.expect("subscribers can only be created from shared stores");
136+
137+
// Reflect a stream of pod watch events into the store and apply a backoff. For subscribers to
138+
// be able to consume updates, the reflector must be shared.
139+
let pod_watch = watcher(pods.clone(), Default::default())
140+
.default_backoff()
141+
.reflect_shared(writer)
142+
.for_each(|res| async move {
143+
match res {
144+
Ok(event) => debug!("Received event on root stream {event:?}"),
145+
Err(error) => error!(%error, "Unexpected error when watching resource"),
146+
}
147+
});
148+
149+
// Create the first controller using the reconcile_metadata function. Controllers accept
150+
// subscribers through a dedicated interface.
151+
let metadata_controller = Controller::for_shared_stream(subscriber.clone(), reader)
152+
.with_config(config.clone())
153+
.shutdown_on_signal()
154+
.run(reconcile_metadata, error_policy, ctx.clone())
155+
.for_each(|res| async move {
156+
match res {
157+
Ok(v) => info!("Reconciled metadata {v:?}"),
158+
Err(error) => warn!(%error, "Failed to reconcile metadata"),
159+
}
160+
});
161+
162+
// Subscribers can be used to get a read handle on the store, if the initial handle has been
163+
// moved or dropped.
164+
let reader = subscriber.reader();
165+
// Create the second controller using the reconcile_status function.
166+
let status_controller = Controller::for_shared_stream(subscriber, reader)
167+
.with_config(config)
168+
.shutdown_on_signal()
169+
.run(reconcile_status, error_policy, ctx)
170+
.for_each(|res| async move {
171+
match res {
172+
Ok(v) => info!("Reconciled status {v:?}"),
173+
Err(error) => warn!(%error, "Failed to reconcile status"),
174+
}
175+
});
176+
177+
// Drive streams to readiness. The initial watch (that is reflected) needs to be driven to
178+
// consume events from the API Server and forward them to subscribers.
179+
//
180+
// Both controllers will operate on shared objects.
181+
tokio::select! {
182+
_ = futures::future::join(metadata_controller, status_controller) => {},
183+
_ = pod_watch => {}
184+
}
185+
186+
Ok(())
187+
}

kube-runtime/Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ backoff.workspace = true
4848
async-trait.workspace = true
4949
hashbrown.workspace = true
5050
k8s-openapi.workspace = true
51+
async-broadcast.workspace = true
52+
async-stream.workspace = true
5153

5254
[dev-dependencies]
5355
kube = { path = "../kube", features = ["derive", "client", "runtime"], version = "<1.0.0, >=0.60.0" }

kube-runtime/src/controller/mod.rs

+133
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,28 @@ where
123123
})
124124
}
125125

126+
/// Enqueues the object itself for reconciliation when the object is behind a
127+
/// shared pointer
128+
#[cfg(feature = "unstable-runtime-subscribe")]
129+
fn trigger_self_shared<K, S>(
130+
stream: S,
131+
dyntype: K::DynamicType,
132+
) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
133+
where
134+
// Input stream has item as some Arc'd Resource (via
135+
// Controller::for_shared_stream)
136+
S: TryStream<Ok = Arc<K>>,
137+
K: Resource,
138+
K::DynamicType: Clone,
139+
{
140+
trigger_with(stream, move |obj| {
141+
Some(ReconcileRequest {
142+
obj_ref: ObjectRef::from_obj_with(obj.as_ref(), dyntype.clone()),
143+
reason: ReconcileReason::ObjectUpdated,
144+
})
145+
})
146+
}
147+
126148
/// Enqueues any mapper returned `K` types for reconciliation
127149
fn trigger_others<S, K, I>(
128150
stream: S,
@@ -703,6 +725,117 @@ where
703725
}
704726
}
705727

728+
/// This is the same as [`Controller::for_stream`]. Instead of taking an
729+
/// `Api` (e.g. [`Controller::new`]), a stream of resources is used. Shared
730+
/// streams can be created out-of-band by subscribing on a store `Writer`.
731+
/// Through this interface, multiple controllers can use the same root
732+
/// (shared) input stream of resources to keep memory overheads smaller.
733+
///
734+
/// **N.B**: This constructor requires an
735+
/// [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21)
736+
/// feature.
737+
///
738+
/// Prefer [`Controller::new`] or [`Controller::for_stream`] if you do not
739+
/// need to share the stream.
740+
///
741+
/// ## Warning:
742+
///
743+
/// You **must** ensure the root stream (i.e. stream created through a `reflector()`)
744+
/// is driven to readiness independently of this controller to ensure the
745+
/// watcher never deadlocks.
746+
///
747+
/// # Example:
748+
///
749+
/// ```no_run
750+
/// # use futures::StreamExt;
751+
/// # use k8s_openapi::api::apps::v1::Deployment;
752+
/// # use kube::runtime::controller::{Action, Controller};
753+
/// # use kube::runtime::{predicates, watcher, reflector, WatchStreamExt};
754+
/// # use kube::{Api, Client, Error, ResourceExt};
755+
/// # use std::sync::Arc;
756+
/// # async fn reconcile(_: Arc<Deployment>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
757+
/// # fn error_policy(_: Arc<Deployment>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
758+
/// # async fn doc(client: kube::Client) {
759+
/// let api: Api<Deployment> = Api::default_namespaced(client);
760+
/// let (reader, writer) = reflector::store_shared(128);
761+
/// let subscriber = writer
762+
/// .subscribe()
763+
/// .expect("subscribers can only be created from shared stores");
764+
/// let deploys = watcher(api, watcher::Config::default())
765+
/// .default_backoff()
766+
/// .reflect(writer)
767+
/// .applied_objects()
768+
/// .for_each(|ev| async move {
769+
/// match ev {
770+
/// Ok(obj) => tracing::info!("got obj {obj:?}"),
771+
/// Err(error) => tracing::error!(%error, "received error")
772+
/// }
773+
/// });
774+
///
775+
/// let controller = Controller::for_shared_stream(subscriber, reader)
776+
/// .run(reconcile, error_policy, Arc::new(()))
777+
/// .for_each(|ev| async move {
778+
/// tracing::info!("reconciled {ev:?}")
779+
/// });
780+
///
781+
/// // Drive streams using a select statement
782+
/// tokio::select! {
783+
/// _ = deploys => {},
784+
/// _ = controller => {},
785+
/// }
786+
/// # }
787+
#[cfg(feature = "unstable-runtime-subscribe")]
788+
pub fn for_shared_stream(trigger: impl Stream<Item = Arc<K>> + Send + 'static, reader: Store<K>) -> Self
789+
where
790+
K::DynamicType: Default,
791+
{
792+
Self::for_shared_stream_with(trigger, reader, Default::default())
793+
}
794+
795+
/// This is the same as [`Controller::for_stream`]. Instead of taking an
796+
/// `Api` (e.g. [`Controller::new`]), a stream of resources is used. Shared
797+
/// streams can be created out-of-band by subscribing on a store `Writer`.
798+
/// Through this interface, multiple controllers can use the same root
799+
/// (shared) input stream of resources to keep memory overheads smaller.
800+
///
801+
/// **N.B**: This constructor requires an
802+
/// [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21)
803+
/// feature.
804+
///
805+
/// Prefer [`Controller::new`] or [`Controller::for_stream`] if you do not
806+
/// need to share the stream.
807+
///
808+
/// This variant constructor is used for [`dynamic`] types found through
809+
/// discovery. Prefer [`Controller::for_shared_stream`] for static types (i.e.
810+
/// known at compile time).
811+
///
812+
/// [`dynamic`]: kube_client::core::dynamic
813+
#[cfg(feature = "unstable-runtime-subscribe")]
814+
pub fn for_shared_stream_with(
815+
trigger: impl Stream<Item = Arc<K>> + Send + 'static,
816+
reader: Store<K>,
817+
dyntype: K::DynamicType,
818+
) -> Self {
819+
let mut trigger_selector = stream::SelectAll::new();
820+
let self_watcher = trigger_self_shared(trigger.map(Ok), dyntype.clone()).boxed();
821+
trigger_selector.push(self_watcher);
822+
Self {
823+
trigger_selector,
824+
trigger_backoff: Box::<DefaultBackoff>::default(),
825+
graceful_shutdown_selector: vec![
826+
// Fallback future, ensuring that we never terminate if no additional futures are added to the selector
827+
future::pending().boxed(),
828+
],
829+
forceful_shutdown_selector: vec![
830+
// Fallback future, ensuring that we never terminate if no additional futures are added to the selector
831+
future::pending().boxed(),
832+
],
833+
dyntype,
834+
reader,
835+
config: Default::default(),
836+
}
837+
}
838+
706839
/// Specify the configuration for the controller's behavior.
707840
#[must_use]
708841
pub fn with_config(mut self, config: Config) -> Self {

0 commit comments

Comments
 (0)