33//! The core idea is the following: For each key-value pair associated with a record, aggregate so
44//! (key, value) -> count. This gives a count of how frequently each key appears.
55//!
6- //! For now it's not incremental.
6+ //! The statistics executor is incremental - it loads existing counts from the output_reader
7+ //! and updates them with new records.
78
89use std:: collections:: { HashMap , HashSet } ;
910use std:: hash:: { Hash , Hasher } ;
@@ -27,8 +28,10 @@ pub trait StatisticsFunctionFactory: std::fmt::Debug + Send + Sync {
2728
2829/// Accumulate statistics. Must be an associative and commutative over a sequence of `observe` calls.
2930pub trait StatisticsFunction : std:: fmt:: Debug + Send {
30- fn observe ( & mut self , hydrated_record : & HydratedMaterializedLogRecord < ' _ , ' _ > ) ;
31+ fn observe_insert ( & mut self , hydrated_record : & HydratedMaterializedLogRecord < ' _ , ' _ > ) ;
32+ fn observe_delete ( & mut self , hydrated_record : & HydratedMaterializedLogRecord < ' _ , ' _ > ) ;
3133 fn output ( & self ) -> UpdateMetadataValue ;
34+ fn as_any_mut ( & mut self ) -> & mut dyn std:: any:: Any ;
3235}
3336
3437#[ derive( Debug , Default ) ]
@@ -45,14 +48,29 @@ pub struct CounterFunction {
4548 acc : i64 ,
4649}
4750
51+ impl CounterFunction {
52+ /// Create a CounterFunction with an initial value.
53+ pub fn with_initial_value ( value : i64 ) -> Self {
54+ Self { acc : value }
55+ }
56+ }
57+
4858impl StatisticsFunction for CounterFunction {
49- fn observe ( & mut self , _: & HydratedMaterializedLogRecord < ' _ , ' _ > ) {
59+ fn observe_insert ( & mut self , _: & HydratedMaterializedLogRecord < ' _ , ' _ > ) {
5060 self . acc = self . acc . saturating_add ( 1 ) ;
5161 }
5262
63+ fn observe_delete ( & mut self , _: & HydratedMaterializedLogRecord < ' _ , ' _ > ) {
64+ self . acc = self . acc . saturating_sub ( 1 ) ;
65+ }
66+
5367 fn output ( & self ) -> UpdateMetadataValue {
5468 UpdateMetadataValue :: Int ( self . acc )
5569 }
70+
71+ fn as_any_mut ( & mut self ) -> & mut dyn std:: any:: Any {
72+ self
73+ }
5674}
5775
5876/// Canonical representation of metadata values tracked by the statistics executor.
@@ -171,31 +189,153 @@ impl Hash for StatisticsValue {
171189#[ derive( Debug ) ]
172190pub struct StatisticsFunctionExecutor ( pub Box < dyn StatisticsFunctionFactory > ) ;
173191
192+ impl StatisticsFunctionExecutor {
193+ /// Load existing statistics from the output reader.
194+ /// Returns a HashMap with the same structure as the counts HashMap.
195+ async fn load_existing_statistics (
196+ & self ,
197+ output_reader : Option < & RecordSegmentReader < ' _ > > ,
198+ ) -> Result <
199+ HashMap < String , HashMap < StatisticsValue , Box < dyn StatisticsFunction > > > ,
200+ Box < dyn ChromaError > ,
201+ > {
202+ let mut counts: HashMap < String , HashMap < StatisticsValue , Box < dyn StatisticsFunction > > > =
203+ HashMap :: default ( ) ;
204+
205+ let Some ( reader) = output_reader else {
206+ return Ok ( counts) ;
207+ } ;
208+
209+ let max_offset_id = reader. get_max_offset_id ( ) ;
210+ let mut stream = reader. get_data_stream ( 0 ..=max_offset_id) . await ;
211+
212+ while let Some ( record_result) = stream. next ( ) . await {
213+ let ( _, record) = record_result?;
214+
215+ // Parse the record to extract key, value, type, and count
216+ let Some ( metadata) = & record. metadata else {
217+ continue ;
218+ } ;
219+
220+ let key = match metadata. get ( "key" ) {
221+ Some ( MetadataValue :: Str ( k) ) => k. clone ( ) ,
222+ _ => continue ,
223+ } ;
224+
225+ let value_type = match metadata. get ( "type" ) {
226+ Some ( MetadataValue :: Str ( t) ) => t. as_str ( ) ,
227+ _ => continue ,
228+ } ;
229+
230+ let value_str = match metadata. get ( "value" ) {
231+ Some ( MetadataValue :: Str ( v) ) => v. as_str ( ) ,
232+ _ => continue ,
233+ } ;
234+
235+ let count = match metadata. get ( "count" ) {
236+ Some ( MetadataValue :: Int ( c) ) => * c,
237+ _ => continue ,
238+ } ;
239+
240+ // Reconstruct the StatisticsValue from type and value
241+ let stats_value = match value_type {
242+ "bool" => match value_str {
243+ "true" => StatisticsValue :: Bool ( true ) ,
244+ "false" => StatisticsValue :: Bool ( false ) ,
245+ _ => continue ,
246+ } ,
247+ "int" => match value_str. parse :: < i64 > ( ) {
248+ Ok ( i) => StatisticsValue :: Int ( i) ,
249+ _ => continue ,
250+ } ,
251+ "float" => match value_str. parse :: < f64 > ( ) {
252+ Ok ( f) => StatisticsValue :: Float ( f) ,
253+ _ => continue ,
254+ } ,
255+ "str" => StatisticsValue :: Str ( value_str. to_string ( ) ) ,
256+ "sparse" => match value_str. parse :: < u32 > ( ) {
257+ Ok ( index) => StatisticsValue :: SparseVector ( index) ,
258+ _ => continue ,
259+ } ,
260+ _ => continue ,
261+ } ;
262+
263+ // Create a statistics function initialized with the existing count
264+ let stats_function =
265+ Box :: new ( CounterFunction :: with_initial_value ( count) ) as Box < dyn StatisticsFunction > ;
266+
267+ counts
268+ . entry ( key)
269+ . or_default ( )
270+ . insert ( stats_value, stats_function) ;
271+ }
272+
273+ Ok ( counts)
274+ }
275+ }
276+
174277#[ async_trait]
175278impl AttachedFunctionExecutor for StatisticsFunctionExecutor {
176279 async fn execute (
177280 & self ,
178281 input_records : Chunk < HydratedMaterializedLogRecord < ' _ , ' _ > > ,
179282 output_reader : Option < & RecordSegmentReader < ' _ > > ,
180283 ) -> Result < Chunk < LogRecord > , Box < dyn ChromaError > > {
181- let mut counts: HashMap < String , HashMap < StatisticsValue , Box < dyn StatisticsFunction > > > =
182- HashMap :: default ( ) ;
284+ // Load existing statistics from output_reader if available
285+ let mut counts = self . load_existing_statistics ( output_reader) . await ?;
286+
287+ // Process new input records and update counts
183288 for ( hydrated_record, _index) in input_records. iter ( ) {
184- // This is only applicable for non-incremental statistics.
185- // TODO(tanujnay112): Change this when we make incremental statistics work.
186- if hydrated_record. get_operation ( ) == MaterializedLogOperation :: DeleteExisting {
187- continue ;
289+ let metadata_delta = hydrated_record. compute_metadata_delta ( ) ;
290+
291+ // Decrement counts for deleted metadata
292+ for ( key, old_value) in metadata_delta. metadata_to_delete {
293+ for stats_value in StatisticsValue :: from_metadata_value ( old_value) {
294+ let inner_map = counts. entry ( key. to_string ( ) ) . or_default ( ) ;
295+ inner_map
296+ . entry ( stats_value)
297+ . or_insert_with ( || self . 0 . create ( ) )
298+ . observe_delete ( hydrated_record) ;
299+ }
300+ }
301+
302+ // Decrement counts for old values in updates
303+ for ( key, ( old_value, new_value) ) in & metadata_delta. metadata_to_update {
304+ for stats_value in StatisticsValue :: from_metadata_value ( old_value) {
305+ let inner_map = counts. entry ( key. to_string ( ) ) . or_default ( ) ;
306+ inner_map
307+ . entry ( stats_value)
308+ . or_insert_with ( || self . 0 . create ( ) )
309+ . observe_delete ( hydrated_record) ;
310+ }
311+
312+ for stats_value in StatisticsValue :: from_metadata_value ( new_value) {
313+ let inner_map = counts. entry ( key. to_string ( ) ) . or_default ( ) ;
314+ inner_map
315+ . entry ( stats_value)
316+ . or_insert_with ( || self . 0 . create ( ) )
317+ . observe_insert ( hydrated_record) ;
318+ }
188319 }
189320
190- // Use merged_metadata to get the metadata from the hydrated record
191- let metadata = hydrated_record. merged_metadata ( ) ;
192- for ( key, value) in metadata. iter ( ) {
193- let inner_map = counts. entry ( key. clone ( ) ) . or_default ( ) ;
321+ // Increment counts for new values in both updates and inserts
322+ for ( key, value) in metadata_delta
323+ . metadata_to_update
324+ . iter ( )
325+ . map ( |( k, ( _old, new) ) | ( * k, * new) )
326+ . chain (
327+ metadata_delta
328+ . metadata_to_insert
329+ . iter ( )
330+ . map ( |( k, v) | ( * k, * v) ) ,
331+ )
332+ {
194333 for stats_value in StatisticsValue :: from_metadata_value ( value) {
334+ let inner_map = counts. entry ( key. to_string ( ) ) . or_default ( ) ;
195335 inner_map
196336 . entry ( stats_value)
197337 . or_insert_with ( || self . 0 . create ( ) )
198- . observe ( hydrated_record) ;
338+ . observe_insert ( hydrated_record) ;
199339 }
200340 }
201341 }
0 commit comments