-
Notifications
You must be signed in to change notification settings - Fork 1.9k
[ENH]: Execute task with no backfill or incremental #5867
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Reviewer ChecklistPlease leverage this checklist to ensure your code review is thorough before approving Testing, Bugs, Errors, Logs, Documentation
System Compatibility
Quality
|
This stack of pull requests is managed by Graphite. Learn more about stacking. |
This comment has been minimized.
This comment has been minimized.
23911b3 to
41adcf1
Compare
This comment has been minimized.
This comment has been minimized.
41adcf1 to
e9fd0f2
Compare
e9fd0f2 to
64a563e
Compare
9bb916b to
d0f93cf
Compare
64a563e to
9bd6743
Compare
cf18054 to
5b066ac
Compare
3ea4c7a to
56d4987
Compare
105f707 to
d9e7a48
Compare
56d4987 to
243f3f1
Compare
This comment has been minimized.
This comment has been minimized.
8406711 to
205dbe0
Compare
d9e7a48 to
175e00e
Compare
967ebb5 to
3646f99
Compare
b45774b to
fa6f167
Compare
3646f99 to
09a23f0
Compare
09a23f0 to
2bfcc0b
Compare
HammadB
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not hitting approve as I don't have bandwidth to review the size of diff and others have read it
|
|
||
| // AttachFunction creates a new attached function in the database | ||
| // AttachFunction creates an output collection and attached function in a single transaction | ||
| func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.AttachFunctionRequest) (*coordinatorpb.AttachFunctionResponse, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we handle soft deletes for
- Attaching
- Comapction flush?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, compaction flushing uses the same logic as before which fails the flush if is_deleted is true. Similarly, a flush on a function that doesn't exist or is soft deleted will abort the transaction.
Attaching a function does not interact with soft deleted functions and soft deleted functions are renamed with a "deleted" prefix just how collections do.
| for key, filePath := range flushSegmentCompaction.FilePaths { | ||
| filePaths[key] = filePath.Paths | ||
| } | ||
| segmentCompactionInfo = append(segmentCompactionInfo, &model.FlushSegmentCompaction{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we properly handle empty compactions and the like here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it's the same as normal compactions. That case is tested by compact.rs::test_compaction_with_empty_logs_from_inserts_and_deletes
| }; | ||
|
|
||
| Ok(Chunk::new(Arc::new([log_record]))) | ||
| Ok(Chunk::new(std::sync::Arc::from(vec![output_record]))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also please just import Arc
2bfcc0b to
cb021ee
Compare
rust/worker/src/execution/orchestration/attached_function_orchestrator.rs
Show resolved
Hide resolved
| collection_register_infos: Vec<CollectionRegisterInfo>, | ||
| function_context: Option<FunctionContext>, | ||
| ) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
Missing validation for collection_register_infos bounds: The function validates length is not 0 or >2, but then immediately calls .first() without handling the empty case that could theoretically reach line 207 if the validation is bypassed. While the validation should prevent this, defensive programming suggests:
let output_collection_register_info = self.collection_register_infos.first()
.ok_or_else(|| RegisterOrchestratorError::InvariantViolation(
"No collection register info found"
))?;This is already handled correctly. However, the validation message at line 138 says "Invalid number of collection register infos" which is vague. Be more specific: "Expected 1 or 2 collection register infos, got {count}"
Context for Agents
**Missing validation for collection_register_infos bounds**: The function validates length is not 0 or >2, but then immediately calls `.first()` without handling the empty case that could theoretically reach line 207 if the validation is bypassed. While the validation should prevent this, defensive programming suggests:
```rust
let output_collection_register_info = self.collection_register_infos.first()
.ok_or_else(|| RegisterOrchestratorError::InvariantViolation(
"No collection register info found"
))?;
```
This is already handled correctly. However, the validation message at line 138 says "Invalid number of collection register infos" which is vague. Be more specific: "Expected 1 or 2 collection register infos, got {count}"
File: rust/worker/src/execution/orchestration/register_orchestrator.rs
Line: 138
This comment has been minimized.
This comment has been minimized.
cb021ee to
af08c64
Compare
This comment has been minimized.
This comment has been minimized.
af08c64 to
cb99156
Compare
Sicheng-Pan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lgtm
cb99156 to
9b3a17d
Compare
| return nil, status.Errorf(codes.Internal, "attached function has invalid completion_offset: %d", attachedFunction.CompletionOffset) | ||
| } | ||
|
|
||
| if !attachedFunction.IsReady { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
The check if !attachedFunction.IsReady inside attachedFunctionToProto appears to be redundant. The callers of this function (GetAttachedFunctionByName, ListAttachedFunctions, GetAttachedFunctionByUuid) use data access methods (GetByName, GetByCollectionID, GetByID) that are already specified to return only "ready" attached functions.
This redundant check returns a generic codes.Internal error, which could be confusing for clients. If an unready function is passed here, it signifies an internal logic error. Consider removing this check to simplify the code and rely on the data access layer to enforce the "ready" contract. If the check is kept for defensiveness, a panic might be more appropriate to signal the internal invariant violation.
Context for Agents
The check `if !attachedFunction.IsReady` inside `attachedFunctionToProto` appears to be redundant. The callers of this function (`GetAttachedFunctionByName`, `ListAttachedFunctions`, `GetAttachedFunctionByUuid`) use data access methods (`GetByName`, `GetByCollectionID`, `GetByID`) that are already specified to return only "ready" attached functions.
This redundant check returns a generic `codes.Internal` error, which could be confusing for clients. If an unready function is passed here, it signifies an internal logic error. Consider removing this check to simplify the code and rely on the data access layer to enforce the "ready" contract. If the check is kept for defensiveness, a panic might be more appropriate to signal the internal invariant violation.
File: go/pkg/sysdb/coordinator/task.go
Line: 225| if let Some(function_context) = &self.function_context { | ||
| vec![( | ||
| wrap( | ||
| FinishAttachedFunctionOperator::new(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
There's a minor naming inconsistency. The PR description refers to a FinishAttachedTask operator, but the implementation is named FinishAttachedFunctionOperator. For consistency with the documentation and intent, consider renaming FinishAttachedFunctionOperator to FinishAttachedTaskOperator.
Context for Agents
There's a minor naming inconsistency. The PR description refers to a `FinishAttachedTask` operator, but the implementation is named `FinishAttachedFunctionOperator`. For consistency with the documentation and intent, consider renaming `FinishAttachedFunctionOperator` to `FinishAttachedTaskOperator`.
File: rust/worker/src/execution/orchestration/register_orchestrator.rs
Line: 1849b3a17d to
e1c52e2
Compare
e1c52e2 to
8659f3d
Compare

Description of changes
Summarize the changes made by this PR.
This change introduces an AttachedFunctionOrchestrator that does the following chain of operators
GetAttachedFunction -> GetCollectionAndSegments for output collection -> ExecuteAttachedFunction -> MaterializeLogs
It edits the RegisterOrchestrator to spawn a FinishAttachedTask operators that flushes compaction for the input + output collections and flushes updated function data.
The compact method in compact.rs has been edited to launch an
AttachedFunctionOrchestrator in parallel on the results of the initialLogFetchOrchestrator. This orchestrator returns a chunk of MaterializedLog records that get applied via another instance ofApplyLogOrchestrator.The above runs in parallel to the normal compaction workflow which simply runs an
ApplyLogsOrchestrator on the results of the initialLogFetchOrchestrator.The above two threads return a
CollectionRegisterInfo. The function-related thread also returns aFunctionContext. The two threads are joined and each of these structures are passed onto the RegisterOrchestrator for completion.CompactionContext.collection_info has been replaced byCompactionContext.input_collection_info, CompactionContext.output_collection_infoto reflect the fact that "compactions" can be pulling data from one collection and compacting to another. ApplyLogsOrchestrator always applies given logs to the collection specified by
CompactionContext.output_collection_infoHence, we take care to set this field to the appropriate collection before calling
run_apply_logs in each thread.Test plan
How are these changes tested?
pytestfor python,yarn testfor js,cargo testfor rustMigration plan
Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?
Observability plan
What is the plan to instrument and monitor this change?
Documentation Changes
Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the _docs section?_