-
Notifications
You must be signed in to change notification settings - Fork 1.9k
[ENH]: Make all functions incremental #5893
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Reviewer ChecklistPlease leverage this checklist to ensure your code review is thorough before approving Testing, Bugs, Errors, Logs, Documentation
System Compatibility
Quality
|
This stack of pull requests is managed by Graphite. Learn more about stacking. |
|
Introduce incremental attached-function execution (statistics & record-counter) This PR refactors the statistics and record-counter attached functions so they operate incrementally instead of writing a full refresh on every run. Existing output is loaded via Key Changes• Redesigned trait Affected Areas• This summary was automatically generated by @propel-code-bot |
09a23f0 to
2bfcc0b
Compare
30a452d to
c7698f4
Compare
| let key = match metadata.get("key") { | ||
| Some(MetadataValue::Str(k)) => k.clone(), | ||
| _ => continue, | ||
| }; | ||
|
|
||
| let value_type = match metadata.get("type") { | ||
| Some(MetadataValue::Str(t)) => t.as_str(), | ||
| _ => continue, | ||
| }; | ||
|
|
||
| let value_str = match metadata.get("value") { | ||
| Some(MetadataValue::Str(v)) => v.as_str(), | ||
| _ => continue, | ||
| }; | ||
|
|
||
| let count = match metadata.get("count") { | ||
| Some(MetadataValue::Int(c)) => *c, | ||
| _ => continue, | ||
| }; | ||
|
|
||
| // Reconstruct the StatisticsValue from type and value | ||
| let stats_value = match value_type { | ||
| "bool" => match value_str { | ||
| "true" => StatisticsValue::Bool(true), | ||
| "false" => StatisticsValue::Bool(false), | ||
| _ => continue, | ||
| }, | ||
| "int" => match value_str.parse::<i64>() { | ||
| Ok(i) => StatisticsValue::Int(i), | ||
| _ => continue, | ||
| }, | ||
| "float" => match value_str.parse::<f64>() { | ||
| Ok(f) => StatisticsValue::Float(f), | ||
| _ => continue, | ||
| }, | ||
| "str" => StatisticsValue::Str(value_str.to_string()), | ||
| "sparse" => match value_str.parse::<u32>() { | ||
| Ok(index) => StatisticsValue::SparseVector(index), | ||
| _ => continue, | ||
| }, | ||
| _ => continue, | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
In load_existing_statistics, parsing errors for records read from the output segment are handled by silently continueing. This could hide issues with data corruption in the output segment and lead to incorrect statistics being calculated. Consider adding logging (e.g., tracing::warn!) when a record is skipped due to a parsing failure. This would improve observability into the health of the system.
Context for Agents
In `load_existing_statistics`, parsing errors for records read from the output segment are handled by silently `continue`ing. This could hide issues with data corruption in the output segment and lead to incorrect statistics being calculated. Consider adding logging (e.g., `tracing::warn!`) when a record is skipped due to a parsing failure. This would improve observability into the health of the system.
File: rust/worker/src/execution/functions/statistics.rs
Line: 261| async fn get_existing_count(output_reader: Option<&RecordSegmentReader<'_>>) -> i64 { | ||
| let Some(reader) = output_reader else { | ||
| return 0; | ||
| }; | ||
|
|
||
| // Try to get the existing record with the function output ID | ||
| let offset_id = match reader | ||
| .get_offset_id_for_user_id(COUNT_FUNCTION_OUTPUT_ID) | ||
| .await | ||
| { | ||
| Ok(Some(offset_id)) => offset_id, | ||
| _ => return 0, | ||
| }; | ||
|
|
||
| // Get the data record for this offset id | ||
| let data_record = match reader.get_data_for_offset_id(offset_id).await { | ||
| Ok(Some(data_record)) => data_record, | ||
| _ => return 0, | ||
| }; | ||
|
|
||
| // Extract total_count from metadata | ||
| if let Some(metadata) = &data_record.metadata { | ||
| if let Some(chroma_types::MetadataValue::Int(count)) = metadata.get(COUNT_METADATA_KEY) | ||
| { | ||
| return *count; | ||
| } | ||
| } | ||
|
|
||
| 0 | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
The get_existing_count function currently swallows errors from the RecordSegmentReader and defaults to returning 0. For instance, if reader.get_offset_id_for_user_id fails due to a transient I/O issue, the function will return 0 instead of propagating the error. This could cause the total count to be incorrectly reset.
It would be more robust to change the function signature to return a Result<i64, Box<dyn ChromaError>> and propagate any errors encountered during reading. The caller in execute can then handle the error appropriately using ?.
Context for Agents
The `get_existing_count` function currently swallows errors from the `RecordSegmentReader` and defaults to returning `0`. For instance, if `reader.get_offset_id_for_user_id` fails due to a transient I/O issue, the function will return `0` instead of propagating the error. This could cause the total count to be incorrectly reset.
It would be more robust to change the function signature to return a `Result<i64, Box<dyn ChromaError>>` and propagate any errors encountered during reading. The caller in `execute` can then handle the error appropriately using `?`.
File: rust/worker/src/execution/operators/execute_task.rs
Line: 79
This comment has been minimized.
This comment has been minimized.
c7698f4 to
1780f2e
Compare
This comment has been minimized.
This comment has been minimized.
2bfcc0b to
3fec52a
Compare
1780f2e to
f02e24a
Compare
f02e24a to
65d592d
Compare
This comment has been minimized.
This comment has been minimized.
65d592d to
fef8b53
Compare
rescrv
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder what happens if there's a delete of something that doesn't exist. Is it taken care of by hydrateLogRecords or something else?
rust/segment/src/types.rs
Outdated
| struct MaterializedLogRecord { | ||
| // False if the record exists only in the log, otherwise true. | ||
| offset_id_exists_in_segment: bool, | ||
| pub offset_id_exists_in_segment: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe make a getter method, unless you want this to change at any time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops, residue from when i was testing something
| pub trait StatisticsFunction: std::fmt::Debug + Send { | ||
| fn observe(&mut self, hydrated_record: &HydratedMaterializedLogRecord<'_, '_>); | ||
| fn observe_insert(&mut self, hydrated_record: &HydratedMaterializedLogRecord<'_, '_>); | ||
| fn observe_delete(&mut self, hydrated_record: &HydratedMaterializedLogRecord<'_, '_>); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels odd to pass in a variant. The old form recognized that HMLR would be variant. This has potential mismatch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline, leaving a TODO
| ), | ||
| ), | ||
| ( | ||
| "summary::s:total_count".to_string(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the ::s for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's from the id naming scheme that you had made in this file {key}::{type_id}:{value}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh. I didn't make that connection having not looked at that code recently. A comment would be nice, but is OK to do in follow-up.
|
Materialize_logs before this takes care of that |
fef8b53 to
0e07a04
Compare

Description of changes
Summarize the changes made by this PR.
This diff changes all functions (statistics, record_counter) to be incremental. Every run they read current data from the output and use incoming log data to produce updates to the output collection. This also adds total_count as a statistic record.
There was a bug where the AttachedFunctionOrchestrator didn't create a RecordSegment reader before this change that is fixed in this change.
Test plan
How are these changes tested?
pytestfor python,yarn testfor js,cargo testfor rustMigration plan
Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?
Observability plan
What is the plan to instrument and monitor this change?
Documentation Changes
Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the _docs section?_