Skip to content

Commit aed07be

Browse files
authored
Minor docs improvements/fixes, deny fix + macros dep fix (#1166)
* Docs for reflector + better docs for category Also fixes a misleading import in the event recorder that made it seem like we were using core::v1::Event. Signed-off-by: clux <[email protected]> * better event recorder documentation + hermit abi deny Signed-off-by: clux <[email protected]> * skip syn for now Signed-off-by: clux <[email protected]> * fix build Signed-off-by: clux <[email protected]> * sometimes need to wait an extra second for all logs to come through Signed-off-by: clux <[email protected]> * add tokio/macros to ws depedencies - fixes #1168 Signed-off-by: clux <[email protected]> * line buffering changed from kubernetes 1.26 Signed-off-by: clux <[email protected]> * ok dont try to test any buffering Signed-off-by: clux <[email protected]> * fix new docs with watcher::Config Signed-off-by: clux <[email protected]> * fix doc issues from previous pr Signed-off-by: clux <[email protected]> --------- Signed-off-by: clux <[email protected]>
1 parent 30a0c39 commit aed07be

File tree

9 files changed

+128
-34
lines changed

9 files changed

+128
-34
lines changed

deny.toml

+7
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,13 @@ version = "0.2"
8888
name = "windows-sys"
8989
version = "0.42"
9090

91+
[[bans.skip]]
92+
name = "hermit-abi"
93+
94+
[[bans.skip]]
95+
# Needs a complicated upgrade
96+
name = "syn"
97+
9198
[[bans.skip]]
9299
# waiting for pem to bump base64
93100
# https://github.com/jcreekmore/pem-rs/blob/master/Cargo.toml#L16

kube-client/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ edition = "2021"
1919
default = ["client", "openssl-tls"]
2020
rustls-tls = ["rustls", "rustls-pemfile", "hyper-rustls"]
2121
openssl-tls = ["openssl", "hyper-openssl"]
22-
ws = ["client", "tokio-tungstenite", "rand", "kube-core/ws"]
22+
ws = ["client", "tokio-tungstenite", "rand", "kube-core/ws", "tokio/macros"]
2323
oauth = ["client", "tame-oauth"]
2424
gzip = ["client", "tower-http/decompression-gzip"]
2525
client = ["config", "__non_core", "hyper", "http-body", "tower", "tower-http", "hyper-timeout", "pin-project", "chrono", "jsonpath_lib", "bytes", "futures", "tokio", "tokio-util", "either"]

kube-client/src/lib.rs

+7-8
Original file line numberDiff line numberDiff line change
@@ -445,20 +445,19 @@ mod test {
445445
..LogParams::default()
446446
};
447447
let mut logs_stream = pods.log_stream("busybox-kube3", &lp).await?.boxed();
448-
let log_line = logs_stream.try_next().await?.unwrap();
449-
assert_eq!(log_line, "kube 1\n");
450448

451449
// wait for container to finish
452-
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
450+
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
453451

454452
let all_logs = pods.logs("busybox-kube3", &Default::default()).await?;
455453
assert_eq!(all_logs, "kube 1\nkube 2\nkube 3\nkube 4\nkube 5\n");
456454

457-
// remaining logs should have been buffered internally
458-
assert_eq!(logs_stream.try_next().await?.unwrap(), "kube 2\n");
459-
assert_eq!(logs_stream.try_next().await?.unwrap(), "kube 3\n");
460-
assert_eq!(logs_stream.try_next().await?.unwrap(), "kube 4\n");
461-
assert_eq!(logs_stream.try_next().await?.unwrap(), "kube 5\n");
455+
// individual logs may or may not buffer
456+
let mut output = String::new();
457+
while let Some(line) = logs_stream.try_next().await? {
458+
output.push_str(&String::from_utf8_lossy(&line));
459+
}
460+
assert_eq!(output, "kube 1\nkube 2\nkube 3\nkube 4\nkube 5\n");
462461

463462
// evict the pod
464463
let ep = EvictParams::default();

kube-core/src/params.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ use std::fmt;
44
use crate::request::Error;
55
use serde::Serialize;
66

7-
/// Specifies how the resourceVersion parameter is applied. resourceVersionMatch may only be set if resourceVersion is also set.
8-
/// See https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions for details.
7+
/// Controls how the resourceVersion parameter is applied
8+
///
9+
/// This embeds the resource version when using the `NotOlderThan` or `Exact` variants.
10+
/// See <https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list> for details.
911
#[derive(Clone, Debug, Default, PartialEq)]
1012
pub enum VersionMatch {
1113
/// Matches data with the latest version available in the kube-apiserver database (etcd) (quorum read required).
@@ -61,7 +63,7 @@ pub struct ListParams {
6163
pub continue_token: Option<String>,
6264

6365
/// Determines how resourceVersion is applied to list calls.
64-
/// See https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions for
66+
/// See <https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions> for
6567
/// details.
6668
pub version_match: VersionMatch,
6769
}

kube-derive/src/lib.rs

+3
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,9 @@ mod custom_resource;
139139
/// ## `#[kube(shortname = "sn")]`
140140
/// Add a single shortname to the generated crd.
141141
///
142+
/// ## `#[kube(category = "apps")]`
143+
/// Add a single category to `crd.spec.names.categories`.
144+
///
142145
/// ## Example with all properties
143146
///
144147
/// ```rust

kube-runtime/src/controller/mod.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -500,10 +500,10 @@ where
500500
///
501501
/// This variant constructor is for [`dynamic`] types found through discovery. Prefer [`Controller::new`] for static types.
502502
///
503-
/// [`Config`]: kube_runtime::watcher::Config
503+
/// [`Config`]: crate::watcher::Config
504504
/// [`Api`]: kube_client::Api
505505
/// [`dynamic`]: kube_client::core::dynamic
506-
/// [`Config::default`]: kube_runtime::watcher::Config::default
506+
/// [`Config::default`]: crate::watcher::Config::default
507507
pub fn new_with(owned_api: Api<K>, wc: Config, dyntype: K::DynamicType) -> Self {
508508
let writer = Writer::<K>::new(dyntype.clone());
509509
let reader = writer.as_reader();
@@ -552,9 +552,9 @@ where
552552
/// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `Child`.
553553
/// All owned `Child` objects **must** contain an [`OwnerReference`] pointing back to a `K`.
554554
///
555-
/// The [`ListParams`] refer to the possible subset of `Child` objects that you want the [`Api`]
556-
/// to watch - in the Api's configured scope - and receive reconcile events for.
557-
/// To watch the full set of `Child` objects in the given `Api` scope, you can use [`ListParams::default`].
555+
/// The [`watcher::Config`] controls the subset of `Child` objects that you want the [`Api`]
556+
/// to watch - in the Api's configured scope - and receive reconcile events for.
557+
/// To watch the full set of `Child` objects in the given `Api` scope, you can use [`watcher::Config::default`].
558558
///
559559
/// [`OwnerReference`]: k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference
560560
#[must_use]
@@ -593,9 +593,9 @@ where
593593
///
594594
/// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `Watched`.
595595
///
596-
/// The [`ListParams`] refer to the possible subset of `Watched` objects that you want the [`Api`]
596+
/// The [`watcher::Config`] controls the subset of `Watched` objects that you want the [`Api`]
597597
/// to watch - in the Api's configured scope - and run through the custom mapper.
598-
/// To watch the full set of `Watched` objects in given the `Api` scope, you can use [`ListParams::default`].
598+
/// To watch the full set of `Watched` objects in given the `Api` scope, you can use [`watcher::Config::default`].
599599
///
600600
/// # Example
601601
///

kube-runtime/src/events.rs

+20-10
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! Publishes events for objects for kubernetes >= 1.19
22
use k8s_openapi::{
3-
api::{core::v1::ObjectReference, events::v1::Event as CoreEvent},
3+
api::{core::v1::ObjectReference, events::v1::Event as K8sEvent},
44
apimachinery::pkg::apis::meta::v1::{MicroTime, ObjectMeta},
55
chrono::Utc,
66
};
@@ -21,6 +21,7 @@ pub struct Event {
2121
/// The short reason explaining why the `action` was taken.
2222
///
2323
/// This must be at most 128 characters, and is often PascalCased. Shows up in `kubectl describe` as `Reason`.
24+
/// Usually denoted
2425
pub reason: String,
2526

2627
/// A optional description of the status of the `action`.
@@ -31,6 +32,8 @@ pub struct Event {
3132
/// The action that was taken (either successfully or unsuccessfully) against main object
3233
///
3334
/// This must be at most 128 characters. It does not currently show up in `kubectl describe`.
35+
/// A common convention is a short identifier of the action that caused the outcome described in `reason`.
36+
/// Usually denoted in `PascalCase`.
3437
pub action: String,
3538

3639
/// Optional secondary object related to the main object
@@ -126,10 +129,7 @@ impl From<&str> for Reporter {
126129
/// specified when building the recorder using [`Recorder::new`].
127130
///
128131
/// ```
129-
/// use kube::{
130-
/// core::Resource,
131-
/// runtime::events::{Reporter, Recorder, Event, EventType}
132-
/// };
132+
/// use kube::runtime::events::{Reporter, Recorder, Event, EventType};
133133
/// use k8s_openapi::api::core::v1::ObjectReference;
134134
///
135135
/// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
@@ -161,9 +161,19 @@ impl From<&str> for Reporter {
161161
///
162162
/// Events attached to an object will be shown in the `Events` section of the output of
163163
/// of `kubectl describe` for that object.
164+
///
165+
/// ## RBAC
166+
///
167+
/// Note that usage of the event recorder minimally requires the following RBAC rules:
168+
///
169+
/// ```yaml
170+
/// - apiGroups: ["events.k8s.io"]
171+
/// resources: ["events"]
172+
/// verbs: ["create"]
173+
/// ```
164174
#[derive(Clone)]
165175
pub struct Recorder {
166-
events: Api<CoreEvent>,
176+
events: Api<K8sEvent>,
167177
reporter: Reporter,
168178
reference: ObjectReference,
169179
}
@@ -202,7 +212,7 @@ impl Recorder {
202212
// for more detail on the fields
203213
// and what's expected: https://kubernetes.io/docs/reference/using-api/deprecation-guide/#event-v125
204214
self.events
205-
.create(&PostParams::default(), &CoreEvent {
215+
.create(&PostParams::default(), &K8sEvent {
206216
action: Some(ev.action),
207217
reason: Some(ev.reason),
208218
deprecated_count: None,
@@ -241,7 +251,7 @@ mod test {
241251
#![allow(unused_imports)]
242252

243253
use k8s_openapi::api::{
244-
core::v1::{Event as CoreEvent, Service},
254+
core::v1::{Event as K8sEvent, Service},
245255
rbac::v1::ClusterRole,
246256
};
247257
use kube_client::{Api, Client, Resource};
@@ -265,7 +275,7 @@ mod test {
265275
secondary: None,
266276
})
267277
.await?;
268-
let events: Api<CoreEvent> = Api::namespaced(client, "default");
278+
let events: Api<K8sEvent> = Api::namespaced(client, "default");
269279

270280
let event_list = events.list(&Default::default()).await?;
271281
let found_event = event_list
@@ -294,7 +304,7 @@ mod test {
294304
secondary: None,
295305
})
296306
.await?;
297-
let events: Api<CoreEvent> = Api::namespaced(client, "kube-system");
307+
let events: Api<K8sEvent> = Api::namespaced(client, "kube-system");
298308

299309
let event_list = events.list(&Default::default()).await?;
300310
let found_event = event_list

kube-runtime/src/reflector/mod.rs

+77-4
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,85 @@ use kube_client::Resource;
1010
use std::hash::Hash;
1111
pub use store::{store, Store};
1212

13-
/// Caches objects from `watcher::Event`s to a local `Store`
13+
/// Cache objects from a [`watcher()`] stream into a local [`Store`]
1414
///
15-
/// Keep in mind that the `Store` is just a cache, and may be out of date.
15+
/// Observes the raw [`Stream`] of [`watcher::Event`] objects, and modifies the cache.
16+
/// It passes the raw [`watcher()`] stream through unmodified.
1617
///
17-
/// Note: It is a bad idea to feed a single `reflector` from multiple `watcher`s, since
18-
/// the whole `Store` will be cleared whenever any of them emits a `Restarted` event.
18+
/// ## Usage
19+
/// Create a [`Store`] through e.g. [`store::store()`]. The `writer` part is not-clonable,
20+
/// and must be moved into the reflector. The `reader` part is the [`Store`] interface
21+
/// that you can send to other parts of your program as state.
22+
///
23+
/// The cache contains the last-seen state of objects,
24+
/// which may lag slightly behind the actual state.
25+
///
26+
/// ## Example
27+
///
28+
/// Infinite watch of [`Node`](k8s_openapi::api::core::v1::Node) resources with a certain label.
29+
///
30+
/// The `reader` part being passed around to a webserver is omitted.
31+
/// For examples see [version-rs](https://github.com/kube-rs/version-rs) for integration with [axum](https://github.com/tokio-rs/axum),
32+
/// or [controller-rs](https://github.com/kube-rs/controller-rs) for the similar controller integration with [actix-web](https://actix.rs/).
33+
///
34+
/// ```no_run
35+
/// use k8s_openapi::api::core::v1::Node;
36+
/// use kube::runtime::{reflector, watcher, WatchStreamExt, watcher::Config};
37+
/// use futures::{StreamExt, future::ready};
38+
/// # use kube::api::Api;
39+
/// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
40+
/// # let client: kube::Client = todo!();
41+
///
42+
/// let nodes: Api<Node> = Api::all(client);
43+
/// let node_filter = Config::default().labels("kubernetes.io/arch=amd64");
44+
/// let (reader, writer) = reflector::store();
45+
///
46+
/// // Create the infinite reflector stream
47+
/// let rf = reflector(writer, watcher(nodes, node_filter));
48+
///
49+
/// // !!! pass reader to your webserver/manager as state !!!
50+
///
51+
/// // Poll the stream (needed to keep the store up-to-date)
52+
/// let infinite_watch = rf.applied_objects().for_each(|o| { ready(()) });
53+
/// infinite_watch.await;
54+
/// # Ok(())
55+
/// # }
56+
/// ```
57+
///
58+
///
59+
/// ## Memory Usage
60+
///
61+
/// A reflector often constitutes one of the biggest components of a controller's memory use.
62+
/// Given ~two thousand pods in a cluster, a reflector around that quickly consumes 1GB of memory.
63+
///
64+
/// While, sometimes acceptible, there are techniques you can leverage to reduce the memory usage
65+
/// depending on your use case.
66+
///
67+
/// 1. Reflect a [`PartialObjectMeta<K>`](kube_client::core::PartialObjectMeta) stream rather than a stream of `K`
68+
///
69+
/// You can send in a [`metadata_watcher()`](crate::watcher::metadata_watcher()) for a type rather than a [`watcher()`],
70+
/// and this will can drop your memory usage by more than a factor of two,
71+
/// depending on the size of `K`. 60% reduction seen for `Pod`. Usage is otherwise identical.
72+
///
73+
/// 2. Use `modify` the raw [`watcher::Event`] object stream to clear unneeded properties
74+
///
75+
/// For instance, managed fields typically constitutes around half the size of `ObjectMeta` and can often be dropped:
76+
///
77+
/// ```no_run
78+
/// # use futures::TryStreamExt;
79+
/// # use kube::{ResourceExt, Api, runtime::watcher};
80+
/// # let api: Api<k8s_openapi::api::core::v1::Node> = todo!();
81+
/// let stream = watcher(api, Default::default()).map_ok(|ev| {
82+
/// ev.modify(|pod| {
83+
/// pod.managed_fields_mut().clear();
84+
/// pod.annotations_mut().clear();
85+
/// pod.status = None;
86+
/// })
87+
/// });
88+
/// ```
89+
/// The `stream` can then be passed to `reflector` causing smaller objects to be written to its store.
90+
/// Note that you **cannot drop everything**; you minimally need the spec properties your app relies on.
91+
/// Additionally, only `labels`, `annotations` and `managed_fields` are safe to drop from `ObjectMeta`.
1992
pub fn reflector<K, W>(mut writer: store::Writer<K>, stream: W) -> impl Stream<Item = W::Item>
2093
where
2194
K: Resource + Clone,

kube-runtime/src/watcher.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ pub struct Config {
174174
pub timeout: Option<u32>,
175175

176176
/// Determines how resourceVersion is applied to list calls.
177-
/// See https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions for
177+
/// See <https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions> for
178178
/// details.
179179
pub version_match: VersionMatch,
180180

0 commit comments

Comments
 (0)