Skip to content

Commit 30a0c39

Browse files
authored
Split ListParams and WatchParams (#1162)
* Add resourceVersion to ListParams Signed-off-by: m.nabokikh <[email protected]> * Apply rustfmt Signed-off-by: m.nabokikh <[email protected]> * More validations for resource version options Signed-off-by: m.nabokikh <[email protected]> * Add hack to allow the watcher to work Signed-off-by: m.nabokikh <[email protected]> * Split ListParams and WatchParams Signed-off-by: m.nabokikh <[email protected]> * Fix doc tests and rustfmt Signed-off-by: m.nabokikh <[email protected]> * Remove continue_tooken and limit from watcher config Signed-off-by: m.nabokikh <[email protected]> * Fix clippy errors, add more tests Signed-off-by: m.nabokikh <[email protected]> * Migrate to 4 varients enum Signed-off-by: m.nabokikh <[email protected]> * Add more tests and derive ListParams default Signed-off-by: m.nabokikh <[email protected]> --------- Signed-off-by: m.nabokikh <[email protected]>
1 parent 2e5e4de commit 30a0c39

26 files changed

+593
-247
lines changed

examples/configmapgen_controller.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@ use anyhow::Result;
55
use futures::StreamExt;
66
use k8s_openapi::api::core::v1::ConfigMap;
77
use kube::{
8-
api::{Api, ListParams, ObjectMeta, Patch, PatchParams, Resource},
9-
runtime::controller::{Action, Controller},
8+
api::{Api, ObjectMeta, Patch, PatchParams, Resource},
9+
runtime::{
10+
controller::{Action, Controller},
11+
watcher,
12+
},
1013
Client, CustomResource,
1114
};
1215
use schemars::JsonSchema;
@@ -99,8 +102,8 @@ async fn main() -> Result<()> {
99102
}
100103
});
101104

102-
Controller::new(cmgs, ListParams::default())
103-
.owns(cms, ListParams::default())
105+
Controller::new(cmgs, watcher::Config::default())
106+
.owns(cms, watcher::Config::default())
104107
.reconcile_all_on(reload_rx.map(|_| ()))
105108
.shutdown_on_signal()
106109
.run(reconcile, error_policy, Arc::new(Data { client }))

examples/crd_derive_schema.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ use futures::{StreamExt, TryStreamExt};
33
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
44
use kube::{
55
api::{
6-
Api, ApiResource, DeleteParams, DynamicObject, GroupVersionKind, ListParams, Patch, PatchParams,
7-
PostParams, WatchEvent,
6+
Api, ApiResource, DeleteParams, DynamicObject, GroupVersionKind, Patch, PatchParams, PostParams,
7+
WatchEvent, WatchParams,
88
},
99
runtime::wait::{await_condition, conditions},
1010
Client, CustomResource, CustomResourceExt,
@@ -241,10 +241,10 @@ async fn delete_crd(client: Client) -> Result<()> {
241241

242242
// Wait until deleted
243243
let timeout_secs = 15;
244-
let lp = ListParams::default()
244+
let wp = WatchParams::default()
245245
.fields("metadata.name=foos.clux.dev")
246246
.timeout(timeout_secs);
247-
let mut stream = api.watch(&lp, "0").await?.boxed_local();
247+
let mut stream = api.watch(&wp, "0").await?.boxed_local();
248248
while let Some(status) = stream.try_next().await? {
249249
if let WatchEvent::Deleted(_) = status {
250250
return Ok(());

examples/crd_reflector.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomRe
33
use tracing::*;
44

55
use kube::{
6-
api::{Api, ListParams, Patch, PatchParams, ResourceExt},
6+
api::{Api, Patch, PatchParams, ResourceExt},
77
runtime::{reflector, watcher, WatchStreamExt},
88
Client, CustomResource, CustomResourceExt,
99
};
@@ -36,8 +36,8 @@ async fn main() -> anyhow::Result<()> {
3636
let (reader, writer) = reflector::store::<Foo>();
3737

3838
let foos: Api<Foo> = Api::default_namespaced(client);
39-
let lp = ListParams::default().timeout(20); // low timeout in this example
40-
let rf = reflector(writer, watcher(foos, lp));
39+
let wc = watcher::Config::default().timeout(20); // low timeout in this example
40+
let rf = reflector(writer, watcher(foos, wc));
4141

4242
tokio::spawn(async move {
4343
loop {

examples/dynamic_watcher.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use futures::{Stream, StreamExt, TryStreamExt};
22
use kube::{
3-
api::{Api, DynamicObject, GroupVersionKind, ListParams, Resource, ResourceExt},
3+
api::{Api, DynamicObject, GroupVersionKind, Resource, ResourceExt},
44
runtime::{metadata_watcher, watcher, watcher::Event, WatchStreamExt},
55
};
66
use serde::de::DeserializeOwned;
@@ -28,13 +28,13 @@ async fn main() -> anyhow::Result<()> {
2828

2929
// Use the full resource info to create an Api with the ApiResource as its DynamicType
3030
let api = Api::<DynamicObject>::all_with(client, &ar);
31-
let lp = ListParams::default();
31+
let wc = watcher::Config::default();
3232

3333
// Start a metadata or a full resource watch
3434
if watch_metadata {
35-
handle_events(metadata_watcher(api, lp)).await
35+
handle_events(metadata_watcher(api, wc)).await
3636
} else {
37-
handle_events(watcher(api, lp)).await
37+
handle_events(watcher(api, wc)).await
3838
}
3939
}
4040

examples/event_watcher.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use futures::{pin_mut, TryStreamExt};
22
use k8s_openapi::api::core::v1::Event;
33
use kube::{
4-
api::{Api, ListParams},
4+
api::Api,
55
runtime::{watcher, WatchStreamExt},
66
Client,
77
};
@@ -13,9 +13,9 @@ async fn main() -> anyhow::Result<()> {
1313
let client = Client::try_default().await?;
1414

1515
let events: Api<Event> = Api::all(client);
16-
let lp = ListParams::default();
16+
let wc = watcher::Config::default();
1717

18-
let ew = watcher(events, lp).applied_objects();
18+
let ew = watcher(events, wc).applied_objects();
1919

2020
pin_mut!(ew);
2121
while let Some(event) = ew.try_next().await? {

examples/kubectl.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,12 @@ impl App {
123123
Ok(())
124124
}
125125

126-
async fn watch(&self, api: Api<DynamicObject>, mut lp: ListParams) -> Result<()> {
126+
async fn watch(&self, api: Api<DynamicObject>, mut wc: watcher::Config) -> Result<()> {
127127
if let Some(n) = &self.name {
128-
lp = lp.fields(&format!("metadata.name={n}"));
128+
wc = wc.fields(&format!("metadata.name={n}"));
129129
}
130130
// present a dumb table for it for now. kubectl does not do this anymore.
131-
let mut stream = watcher(api, lp).applied_objects().boxed();
131+
let mut stream = watcher(api, wc).applied_objects().boxed();
132132
println!("{0:<width$} {1:<20}", "NAME", "AGE", width = 63);
133133
while let Some(inst) = stream.try_next().await? {
134134
let age = format_creation_since(inst.creation_timestamp());
@@ -201,14 +201,20 @@ async fn main() -> Result<()> {
201201
if let Some(label) = &app.selector {
202202
lp = lp.labels(label);
203203
}
204+
205+
let mut wc = watcher::Config::default();
206+
if let Some(label) = &app.selector {
207+
wc = wc.labels(label);
208+
}
209+
204210
let api = dynamic_api(ar, caps, client, &app.namespace, app.all);
205211

206212
tracing::info!(?app.verb, ?resource, name = ?app.name.clone().unwrap_or_default(), "requested objects");
207213
match app.verb {
208214
Verb::Edit => app.edit(api).await?,
209215
Verb::Get => app.get(api, lp).await?,
210216
Verb::Delete => app.delete(api, lp).await?,
211-
Verb::Watch => app.watch(api, lp).await?,
217+
Verb::Watch => app.watch(api, wc).await?,
212218
Verb::Apply => bail!("verb {:?} cannot act on an explicit resource", app.verb),
213219
}
214220
} else if app.verb == Verb::Apply {

examples/multi_watcher.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use k8s_openapi::api::{
44
core::v1::{ConfigMap, Secret},
55
};
66
use kube::{
7-
api::{Api, ListParams, ResourceExt},
7+
api::{Api, ResourceExt},
88
runtime::{watcher, WatchStreamExt},
99
Client,
1010
};
@@ -18,9 +18,9 @@ async fn main() -> anyhow::Result<()> {
1818
let deploys: Api<Deployment> = Api::default_namespaced(client.clone());
1919
let cms: Api<ConfigMap> = Api::default_namespaced(client.clone());
2020
let secret: Api<Secret> = Api::default_namespaced(client.clone());
21-
let dep_watcher = watcher(deploys, ListParams::default());
22-
let cm_watcher = watcher(cms, ListParams::default());
23-
let sec_watcher = watcher(secret, ListParams::default());
21+
let dep_watcher = watcher(deploys, watcher::Config::default());
22+
let cm_watcher = watcher(cms, watcher::Config::default());
23+
let sec_watcher = watcher(secret, watcher::Config::default());
2424

2525
// select on applied events from all watchers
2626
let mut combo_stream = stream::select_all(vec![

examples/node_reflector.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use futures::{StreamExt, TryStreamExt};
22
use k8s_openapi::api::core::v1::Node;
33
use kube::{
4-
api::{Api, ListParams, ResourceExt},
4+
api::{Api, ResourceExt},
55
runtime::{reflector, watcher, WatchStreamExt},
66
Client,
77
};
@@ -13,12 +13,12 @@ async fn main() -> anyhow::Result<()> {
1313
let client = Client::try_default().await?;
1414

1515
let nodes: Api<Node> = Api::all(client.clone());
16-
let lp = ListParams::default()
16+
let wc = watcher::Config::default()
1717
.labels("kubernetes.io/arch=amd64") // filter instances by label
1818
.timeout(10); // short watch timeout in this example
1919

2020
let (reader, writer) = reflector::store();
21-
let rf = reflector(writer, watcher(nodes, lp));
21+
let rf = reflector(writer, watcher(nodes, wc));
2222

2323
// Periodically read our state in the background
2424
tokio::spawn(async move {

examples/node_watcher.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ async fn main() -> anyhow::Result<()> {
1515
let events: Api<Event> = Api::all(client.clone());
1616
let nodes: Api<Node> = Api::all(client.clone());
1717

18-
let lp = ListParams::default().labels("beta.kubernetes.io/arch=amd64");
19-
let obs = watcher(nodes, lp)
18+
let wc = watcher::Config::default().labels("beta.kubernetes.io/arch=amd64");
19+
let obs = watcher(nodes, wc)
2020
.backoff(ExponentialBackoff::default())
2121
.applied_objects();
2222

examples/pod_attach.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use k8s_openapi::api::core::v1::Pod;
66

77
use kube::{
88
api::{
9-
Api, AttachParams, AttachedProcess, DeleteParams, ListParams, PostParams, ResourceExt, WatchEvent,
9+
Api, AttachParams, AttachedProcess, DeleteParams, PostParams, ResourceExt, WatchEvent, WatchParams,
1010
},
1111
Client,
1212
};
@@ -35,8 +35,8 @@ async fn main() -> anyhow::Result<()> {
3535
pods.create(&PostParams::default(), &p).await?;
3636

3737
// Wait until the pod is running, otherwise we get 500 error.
38-
let lp = ListParams::default().fields("metadata.name=example").timeout(10);
39-
let mut stream = pods.watch(&lp, "0").await?.boxed();
38+
let wp = WatchParams::default().fields("metadata.name=example").timeout(10);
39+
let mut stream = pods.watch(&wp, "0").await?.boxed();
4040
while let Some(status) = stream.try_next().await? {
4141
match status {
4242
WatchEvent::Added(o) => {

0 commit comments

Comments
 (0)