1- use crate :: { reflector :: ObjectRef , watcher:: Error } ;
1+ use crate :: watcher:: Error ;
22use core:: {
33 pin:: Pin ,
44 task:: { ready, Context , Poll } ,
55} ;
66use futures:: Stream ;
7- use kube_client:: Resource ;
7+ use kube_client:: { api :: ObjectMeta , Resource } ;
88use pin_project:: pin_project;
99use std:: {
1010 collections:: { hash_map:: DefaultHasher , HashMap } ,
1111 hash:: { Hash , Hasher } ,
12+ marker:: PhantomData ,
1213} ;
1314
1415fn hash < T : Hash + ?Sized > ( t : & T ) -> u64 {
@@ -17,6 +18,24 @@ fn hash<T: Hash + ?Sized>(t: &T) -> u64 {
1718 hasher. finish ( )
1819}
1920
21+ /// Private cache key that includes UID in equality for predicate filtering
22+ #[ derive( Debug , Clone , PartialEq , Eq , Hash ) ]
23+ struct PredicateCacheKey {
24+ name : String ,
25+ namespace : Option < String > ,
26+ uid : Option < String > ,
27+ }
28+
29+ impl From < & ObjectMeta > for PredicateCacheKey {
30+ fn from ( meta : & ObjectMeta ) -> Self {
31+ Self {
32+ name : meta. name . clone ( ) . unwrap_or_default ( ) ,
33+ namespace : meta. namespace . clone ( ) ,
34+ uid : meta. uid . clone ( ) ,
35+ }
36+ }
37+ }
38+
2039/// A predicate is a hasher of Kubernetes objects stream filtering
2140pub trait Predicate < K > {
2241 /// A predicate only needs to implement optional hashing when keys exist
@@ -103,7 +122,9 @@ pub struct PredicateFilter<St, K: Resource, P: Predicate<K>> {
103122 #[ pin]
104123 stream : St ,
105124 predicate : P ,
106- cache : HashMap < ObjectRef < K > , u64 > ,
125+ cache : HashMap < PredicateCacheKey , u64 > ,
126+ // K: Resource necessary to get .meta() of an object during polling
127+ _phantom : PhantomData < K > ,
107128}
108129impl < St , K , P > PredicateFilter < St , K , P >
109130where
@@ -116,6 +137,7 @@ where
116137 stream,
117138 predicate,
118139 cache : HashMap :: new ( ) ,
140+ _phantom : PhantomData ,
119141 }
120142 }
121143}
@@ -134,17 +156,9 @@ where
134156 break match ready ! ( me. stream. as_mut( ) . poll_next( cx) ) {
135157 Some ( Ok ( obj) ) => {
136158 if let Some ( val) = me. predicate . hash_property ( & obj) {
137- let key = ObjectRef :: from_obj ( & obj) ;
138- let changed = if let Some ( old) = me. cache . get ( & key) {
139- * old != val
140- } else {
141- true
142- } ;
143- if let Some ( old) = me. cache . get_mut ( & key) {
144- * old = val;
145- } else {
146- me. cache . insert ( key, val) ;
147- }
159+ let key = PredicateCacheKey :: from ( obj. meta ( ) ) ;
160+ let changed = me. cache . get ( & key) != Some ( & val) ;
161+ me. cache . insert ( key, val) ;
148162 if changed {
149163 Some ( Ok ( obj) )
150164 } else {
@@ -251,4 +265,58 @@ pub(crate) mod tests {
251265 assert_eq ! ( second. meta( ) . generation, Some ( 2 ) ) ;
252266 assert ! ( matches!( poll!( rx. next( ) ) , Poll :: Ready ( None ) ) ) ;
253267 }
268+
269+ #[ tokio:: test]
270+ async fn predicate_filtering_should_handle_recreated_resources_with_same_generation ( ) {
271+ use k8s_openapi:: api:: core:: v1:: Pod ;
272+
273+ let mkobj = |g : i32 , uid : & str | {
274+ let p: Pod = serde_json:: from_value ( json ! ( {
275+ "apiVersion" : "v1" ,
276+ "kind" : "Pod" ,
277+ "metadata" : {
278+ "name" : "blog" ,
279+ "namespace" : "default" ,
280+ "generation" : Some ( g) ,
281+ "uid" : uid,
282+ } ,
283+ "spec" : {
284+ "containers" : [ {
285+ "name" : "blog" ,
286+ "image" : "clux/blog:0.1.0"
287+ } ] ,
288+ }
289+ } ) )
290+ . unwrap ( ) ;
291+ p
292+ } ;
293+
294+ // Simulate: create (gen=1, uid=1) -> update (gen=1, uid=1) -> delete ->
295+ // create (gen=1, uid=2) -> delete -> create (gen=2, uid=3)
296+ let data = stream:: iter ( [
297+ Ok ( mkobj ( 1 , "uid-1" ) ) ,
298+ Ok ( mkobj ( 1 , "uid-1" ) ) ,
299+ Ok ( mkobj ( 1 , "uid-2" ) ) ,
300+ Ok ( mkobj ( 2 , "uid-3" ) ) ,
301+ ] ) ;
302+ let mut rx = pin ! ( PredicateFilter :: new( data, predicates:: generation) ) ;
303+
304+ // mkobj(1, uid-1) passed through
305+ let first = rx. next ( ) . now_or_never ( ) . unwrap ( ) . unwrap ( ) . unwrap ( ) ;
306+ assert_eq ! ( first. meta( ) . generation, Some ( 1 ) ) ;
307+ assert_eq ! ( first. meta( ) . uid. as_deref( ) , Some ( "uid-1" ) ) ;
308+
309+ // (no repeat mkobj(1, uid-1) - same UID and generation)
310+ // mkobj(1, uid-2) next - different UID detected
311+ let second = rx. next ( ) . now_or_never ( ) . unwrap ( ) . unwrap ( ) . unwrap ( ) ;
312+ assert_eq ! ( second. meta( ) . generation, Some ( 1 ) ) ;
313+ assert_eq ! ( second. meta( ) . uid. as_deref( ) , Some ( "uid-2" ) ) ;
314+
315+ // mkobj(2, uid-3) next
316+ let third = rx. next ( ) . now_or_never ( ) . unwrap ( ) . unwrap ( ) . unwrap ( ) ;
317+ assert_eq ! ( third. meta( ) . generation, Some ( 2 ) ) ;
318+ assert_eq ! ( third. meta( ) . uid. as_deref( ) , Some ( "uid-3" ) ) ;
319+
320+ assert ! ( matches!( poll!( rx. next( ) ) , Poll :: Ready ( None ) ) ) ;
321+ }
254322}
0 commit comments