-
-
Notifications
You must be signed in to change notification settings - Fork 341
Add shared stream interfaces #1449
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
mateiidavid
merged 40 commits into
kube-rs:main
from
mateiidavid:matei/fork-add-fut-impl
Apr 18, 2024
Merged
Changes from 33 commits
Commits
Show all changes
40 commits
Select commit
Hold shift + click to select a range
15ba09d
Add a simple controller example
mateiidavid fc2fc42
Merge branch 'main' of github.com:kube-rs/kube into matei/arc-watcher
mateiidavid 1657ddc
Add shared stream controller example
mateiidavid 98255dc
Try to get something working
mateiidavid 3685b9e
Rm my notes
mateiidavid 683e77d
Results or objectefs
mateiidavid af7a309
Working shared stream
mateiidavid 8d4d694
Different way of doing it
mateiidavid 8534770
Switch to async_broadcast
mateiidavid 9bbe8e1
Remove old, unused code
mateiidavid 3f874ce
Remove unused examples
mateiidavid 1e1e347
Gotta state machine this stuff
mateiidavid 15f6e1d
Take 1 with try_recv
mateiidavid 49eaf12
try_recv take 2
mateiidavid e7aad76
Working on names next
mateiidavid b6ff97f
Ok surprising this worked
mateiidavid 7a570fd
Write tests and rename file to reflect dispatch
mateiidavid 0256cb0
WIP
mateiidavid 74f09f7
WIP 2
mateiidavid 2d5a3b0
Start working on store side
mateiidavid 0cb816b
Merge branch 'main' of github.com:kube-rs/kube into matei/fork-add-fu…
mateiidavid 9bf111c
Tests are green
mateiidavid 04a53d1
rm redundant trait bounds
mateiidavid 6b5bd31
Update example with new interfaces
mateiidavid def0011
Add comments and a small todo
mateiidavid d69213a
Remove dispatch mod from utils
mateiidavid 21dbbae
@clux's feedback
mateiidavid c7fc333
@clux's feedback
mateiidavid 1b81f4c
Merge branch 'main' of github.com:kube-rs/kube into matei/fork-add-fu…
mateiidavid c6d1027
Fix tests & clippy warns
mateiidavid a30f2e6
Run fmt
mateiidavid fef5d83
Update examples/shared_stream_controllers.rs
mateiidavid 44c441e
@clux's feedback on examples
mateiidavid 8347103
Fix name in ns
mateiidavid 9f7edd1
Add comments and feature flags
mateiidavid e2399f1
Merge branch 'main' of github.com:kube-rs/kube into matei/fork-add-fu…
mateiidavid a14d6b4
Fix CI checks
mateiidavid de2eda1
Run rustfmt
mateiidavid 276b75e
@clux's feedback
mateiidavid eca6be1
Run fmt
mateiidavid File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,188 @@ | ||
use std::{sync::Arc, time::Duration}; | ||
|
||
use futures::StreamExt; | ||
use k8s_openapi::api::core::v1::{Pod, PodCondition}; | ||
use kube::{ | ||
api::{Patch, PatchParams}, | ||
runtime::{controller::Action, reflector, watcher, Config, Controller, WatchStreamExt}, | ||
Api, Client, ResourceExt, | ||
}; | ||
use tokio::sync::mpsc; | ||
use tracing::{debug, error, info, warn}; | ||
|
||
use thiserror::Error; | ||
|
||
// Helper module that namespaces two constants describing a Kubernetes status condition | ||
pub mod condition { | ||
pub static UNDOCUMENTED_TYPE: &str = "UndocumentedPort"; | ||
pub static STATUS_TRUE: &str = "True"; | ||
} | ||
|
||
const SUBSCRIBE_BUFFER_SIZE: usize = 256; | ||
|
||
#[derive(Debug, Error)] | ||
enum Error { | ||
#[error("Failed to patch pod: {0}")] | ||
WriteFailed(#[source] kube::Error), | ||
|
||
#[error("Missing po field: {0}")] | ||
MissingField(&'static str), | ||
} | ||
|
||
#[derive(Clone)] | ||
struct Data { | ||
client: Client, | ||
} | ||
|
||
/// A simple reconciliation function that will copy a pod's labels into the annotations. | ||
async fn reconcile_metadata(pod: Arc<Pod>, ctx: Arc<Data>) -> Result<Action, Error> { | ||
if pod.name_any() == "kube-system" { | ||
return Ok(Action::await_change()); | ||
} | ||
|
||
let labels = pod.labels(); | ||
if labels.is_empty() { | ||
return Ok(Action::await_change()); | ||
} | ||
|
||
let mut pod = (*pod).clone(); | ||
pod.metadata.managed_fields = None; | ||
// combine labels and annotations into a new map | ||
let labels = pod.labels().clone().into_iter(); | ||
pod.annotations_mut().extend(labels); | ||
|
||
let pod_api = Api::<Pod>::namespaced( | ||
ctx.client.clone(), | ||
pod.metadata | ||
.namespace | ||
.as_ref() | ||
.ok_or_else(|| Error::MissingField(".metadata.name"))?, | ||
); | ||
|
||
pod_api | ||
.patch( | ||
&pod.name_any(), | ||
&PatchParams::apply("controller-1"), | ||
&Patch::Apply(&pod), | ||
) | ||
.await | ||
.map_err(Error::WriteFailed)?; | ||
|
||
Ok(Action::requeue(Duration::from_secs(300))) | ||
} | ||
|
||
/// Another reconiliation function that will add an 'UndocumentedPort' condition to pods that do | ||
/// do not have any ports declared across all containers. | ||
async fn reconcile_status(pod: Arc<Pod>, ctx: Arc<Data>) -> Result<Action, Error> { | ||
for container in pod.spec.clone().unwrap_or_default().containers.iter() { | ||
if container.ports.clone().unwrap_or_default().len() != 0 { | ||
debug!(name = %pod.name_any(), "Skipped updating pod with documented ports"); | ||
return Ok(Action::await_change()); | ||
} | ||
} | ||
|
||
let pod_api = Api::<Pod>::namespaced( | ||
ctx.client.clone(), | ||
pod.metadata | ||
.namespace | ||
.as_ref() | ||
.ok_or_else(|| Error::MissingField(".metadata.name"))?, | ||
); | ||
|
||
let undocumented_condition = PodCondition { | ||
type_: condition::UNDOCUMENTED_TYPE.into(), | ||
status: condition::STATUS_TRUE.into(), | ||
..Default::default() | ||
}; | ||
let value = serde_json::json!({ | ||
"status": { | ||
"name": pod.name_any(), | ||
"kind": "Pod", | ||
"conditions": vec![undocumented_condition] | ||
} | ||
}); | ||
pod_api | ||
.patch_status( | ||
&pod.name_any(), | ||
&PatchParams::apply("controller-2"), | ||
&Patch::Strategic(value), | ||
) | ||
.await | ||
.map_err(Error::WriteFailed)?; | ||
|
||
Ok(Action::requeue(Duration::from_secs(300))) | ||
} | ||
|
||
fn error_policy(obj: Arc<Pod>, error: &Error, _ctx: Arc<Data>) -> Action { | ||
error!(%error, name = %obj.name_any(), "Failed reconciliation"); | ||
Action::requeue(Duration::from_secs(10)) | ||
} | ||
|
||
#[tokio::main] | ||
async fn main() -> anyhow::Result<()> { | ||
tracing_subscriber::fmt::init(); | ||
|
||
let client = Client::try_default().await?; | ||
let pods = Api::<Pod>::namespaced(client.clone(), "default"); | ||
let config = Config::default().concurrency(2); | ||
let ctx = Arc::new(Data { client }); | ||
|
||
// Create a shared store with a predefined buffer that will be shared between subscribers. | ||
let (reader, writer) = reflector::store_shared(SUBSCRIBE_BUFFER_SIZE); | ||
// Before threading an object watch through the store, create a subscriber. | ||
// Any number of subscribers can be created from one writer. | ||
let subscriber = writer | ||
.subscribe() | ||
.expect("subscribers can only be created from shared stores"); | ||
|
||
// Reflect a stream of pod watch events into the store and apply a backoff. For subscribers to | ||
// be able to consume updates, the reflector must be shared. | ||
let pod_watch = watcher(pods.clone(), Default::default()) | ||
.default_backoff() | ||
.reflect_shared(writer) | ||
.for_each(|res| async move { | ||
match res { | ||
Ok(event) => debug!("Received event on root stream {event:?}"), | ||
Err(error) => error!(%error, "Unexpected error when watching resource"), | ||
} | ||
}); | ||
|
||
// Create the first controller using the reconcile_metadata function. Controllers accept | ||
// subscribers through a dedicated interface. | ||
let metadata_controller = Controller::for_shared_stream(subscriber.clone(), reader) | ||
.with_config(config.clone()) | ||
.shutdown_on_signal() | ||
.run(reconcile_metadata, error_policy, ctx.clone()) | ||
.for_each(|res| async move { | ||
match res { | ||
Ok(v) => info!("Reconciled metadata {v:?}"), | ||
Err(error) => warn!(%error, "Failed to reconcile metadata"), | ||
} | ||
}); | ||
|
||
// Subscribers can be used to get a read handle on the store, if the initial handle has been | ||
// moved or dropped. | ||
let reader = subscriber.reader(); | ||
// Create the second controller using the reconcile_status function. | ||
let status_controller = Controller::for_shared_stream(subscriber, reader) | ||
.with_config(config) | ||
.shutdown_on_signal() | ||
.run(reconcile_status, error_policy, ctx) | ||
.for_each(|res| async move { | ||
match res { | ||
Ok(v) => info!("Reconciled status {v:?}"), | ||
Err(error) => warn!(%error, "Failed to reconcile status"), | ||
} | ||
}); | ||
|
||
// Drive streams to readiness. The initial watch (that is reflected) needs to be driven to | ||
// consume events from the API Server and forward them to subscribers. | ||
// | ||
// Both controllers will operate on shared objects. | ||
tokio::select! { | ||
_ = futures::future::join(metadata_controller, status_controller) => {}, | ||
_ = pod_watch => {} | ||
} | ||
|
||
Ok(()) | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.