-
Notifications
You must be signed in to change notification settings - Fork 1.9k
[ENH]: Modified AttachFunction to do 2PC on a new is_ready column #5872
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Reviewer ChecklistPlease leverage this checklist to ensure your code review is thorough before approving Testing, Bugs, Errors, Logs, Documentation
System Compatibility
Quality
|
This stack of pull requests is managed by Graphite. Learn more about stacking. |
9bb916b to
d0f93cf
Compare
65ac80f to
7bfacfe
Compare
7bfacfe to
ecd3278
Compare
d0f93cf to
cf18054
Compare
cf18054 to
5b066ac
Compare
This comment has been minimized.
This comment has been minimized.
d9e7a48 to
175e00e
Compare
This comment has been minimized.
This comment has been minimized.
ecd3278 to
7de4094
Compare
175e00e to
0504bc0
Compare
This comment has been minimized.
This comment has been minimized.
0504bc0 to
67ac1ce
Compare
7de4094 to
4065355
Compare
This comment has been minimized.
This comment has been minimized.
|
Add two-phase commit support for AttachFunction via new This PR introduces a two-phase-commit (2PC) mechanism for creating attached functions. A new boolean column Key Changes• Schema migration adds Affected Areas• Database schema & migrations (Go & Rust) This summary was automatically generated by @propel-code-bot |
4065355 to
9eb8e4c
Compare
9eb8e4c to
7312e93
Compare
| err := s.catalog.txImpl.Transaction(ctx, func(txCtx context.Context) error { | ||
| // Double-check attached function doesn't exist (race condition protection) | ||
| concurrentAttachedFunction, err := s.catalog.metaDomain.AttachedFunctionDb(txCtx).GetByName(req.InputCollectionId, req.Name) | ||
| // Double-check attached function doesn't exist (check both ready and not-ready) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
Race condition in idempotency check: Between the GetAnyByName check (line 93) and the Insert (line 191), another concurrent request could create the same attached function. The transaction only protects database operations within its scope, but two transactions can both pass the existence check before either commits.
// Both Transaction A and Transaction B enter here
concurrentAttachedFunction, err := s.catalog.metaDomain.AttachedFunctionDb(txCtx).GetAnyByName(...)
// Both see nil (no existing function)
if concurrentAttachedFunction != nil {
// Neither enters this branch
}
// Both proceed to Insert() - second one will fail with duplicate key error
err = s.catalog.metaDomain.AttachedFunctionDb(txCtx).Insert(attachedFunction)Fix: Use INSERT ... ON CONFLICT DO NOTHING RETURNING id or add a unique constraint and handle the duplicate key error gracefully to achieve true idempotency.
Context for Agents
[**CriticalError**]
**Race condition in idempotency check**: Between the `GetAnyByName` check (line 93) and the `Insert` (line 191), another concurrent request could create the same attached function. The transaction only protects database operations within its scope, but two transactions can both pass the existence check before either commits.
```go
// Both Transaction A and Transaction B enter here
concurrentAttachedFunction, err := s.catalog.metaDomain.AttachedFunctionDb(txCtx).GetAnyByName(...)
// Both see nil (no existing function)
if concurrentAttachedFunction != nil {
// Neither enters this branch
}
// Both proceed to Insert() - second one will fail with duplicate key error
err = s.catalog.metaDomain.AttachedFunctionDb(txCtx).Insert(attachedFunction)
```
**Fix**: Use `INSERT ... ON CONFLICT DO NOTHING RETURNING id` or add a unique constraint and handle the duplicate key error gracefully to achieve true idempotency.
File: go/pkg/sysdb/coordinator/task.go
Line: 93| // Step 2: Start backfill (stub for now) | ||
| self.start_backfill(attached_function_id).await; | ||
|
|
||
| // Step 3: Create output collection and set is_ready = true | ||
| self.sysdb_client | ||
| .finish_create_attached_function(attached_function_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
Logic error - setting ready before backfill completes: finish_create_attached_function() sets is_ready=true immediately after starting backfill (line 1940), but backfill is asynchronous. Users will query the attached function thinking it's ready when backfill is still running, leading to incomplete or stale data in the output collection.
self.start_backfill(attached_function_id).await; // Starts async backfill
self.sysdb_client
.finish_create_attached_function(attached_function_id) // Sets is_ready=true immediately
.awaitFix: Only set is_ready=true after backfill completes. Either make backfill synchronous or use a callback/polling mechanism:
let backfill_handle = self.start_backfill(attached_function_id).await?;
backfill_handle.wait().await?; // Wait for completion
self.sysdb_client.finish_create_attached_function(attached_function_id).await?;Context for Agents
[**CriticalError**]
**Logic error - setting ready before backfill completes**: `finish_create_attached_function()` sets `is_ready=true` immediately after starting backfill (line 1940), but backfill is asynchronous. Users will query the attached function thinking it's ready when backfill is still running, leading to incomplete or stale data in the output collection.
```rust
self.start_backfill(attached_function_id).await; // Starts async backfill
self.sysdb_client
.finish_create_attached_function(attached_function_id) // Sets is_ready=true immediately
.await
```
**Fix**: Only set `is_ready=true` after backfill completes. Either make backfill synchronous or use a callback/polling mechanism:
```rust
let backfill_handle = self.start_backfill(attached_function_id).await?;
backfill_handle.wait().await?; // Wait for completion
self.sysdb_client.finish_create_attached_function(attached_function_id).await?;
```
File: rust/frontend/src/impls/service_based_frontend.rs
Line: 1944| } | ||
|
|
||
| // 2. Check if output collection already exists (idempotency) | ||
| if attachedFunction.IsReady { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
Idempotency issue - duplicate output collection creation: If FinishCreateAttachedFunction is called twice (retry after network failure), the check if attachedFunction.IsReady only prevents the second call from proceeding. But if the first call succeeded in creating the collection but failed to set is_ready=true, the second call will attempt to create the output collection again with the same name, causing a duplicate collection error.
if attachedFunction.IsReady {
return nil // Idempotent if already ready
}
// First call: creates collection, crashes before setting is_ready=true
collectionID := types.NewUniqueID()
_, _, err = s.catalog.CreateCollectionAndSegments(...) // Creates collection
// Crash here
// Second call: attachedFunction.IsReady is still false
collectionID := types.NewUniqueID() // Generates NEW ID
_, _, err = s.catalog.CreateCollectionAndSegments(...) // Tries to create SAME NAME again -> errorFix: Check if output collection already exists before attempting creation:
if attachedFunction.OutputCollectionID != nil {
return nil // Already created
}Context for Agents
[**CriticalError**]
**Idempotency issue - duplicate output collection creation**: If `FinishCreateAttachedFunction` is called twice (retry after network failure), the check `if attachedFunction.IsReady` only prevents the second call from proceeding. But if the first call succeeded in creating the collection but failed to set `is_ready=true`, the second call will attempt to create the output collection again with the same name, causing a duplicate collection error.
```go
if attachedFunction.IsReady {
return nil // Idempotent if already ready
}
// First call: creates collection, crashes before setting is_ready=true
collectionID := types.NewUniqueID()
_, _, err = s.catalog.CreateCollectionAndSegments(...) // Creates collection
// Crash here
// Second call: attachedFunction.IsReady is still false
collectionID := types.NewUniqueID() // Generates NEW ID
_, _, err = s.catalog.CreateCollectionAndSegments(...) // Tries to create SAME NAME again -> error
```
**Fix**: Check if output collection already exists before attempting creation:
```go
if attachedFunction.OutputCollectionID != nil {
return nil // Already created
}
```
File: go/pkg/sysdb/coordinator/task.go
Line: 546| OldestWrittenNonce: nil, | ||
| IsReady: false, // We will later set this to true in FinishAttachFunction | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
Race condition: GetAnyByName checks for existing attached functions, but between this check and the Insert call, another concurrent request could create the same attached function. This creates a window where duplicate attached functions with the same name can be inserted.
The unique constraint on (input_collection_id, name, is_deleted) will catch this at database level, but the function should handle the constraint violation gracefully:
err = s.catalog.metaDomain.AttachedFunctionDb(txCtx).Insert(attachedFunction)
if err != nil {
// Check if this is a duplicate name constraint violation
if strings.Contains(err.Error(), "duplicate") || strings.Contains(err.Error(), "unique constraint") {
// Another concurrent request created it, fetch and validate
existing, getErr := s.catalog.metaDomain.AttachedFunctionDb(txCtx).GetAnyByName(req.InputCollectionId, req.Name)
if getErr == nil && existing != nil {
if err := validateAttachedFunctionMatchesRequest(existing, req); err == nil {
attachedFunctionID = existing.ID
return nil // Idempotent success
}
}
}
log.Error("AttachFunction: failed to insert attached function", zap.Error(err))
return err
}Context for Agents
[**CriticalError**]
**Race condition**: `GetAnyByName` checks for existing attached functions, but between this check and the `Insert` call, another concurrent request could create the same attached function. This creates a window where duplicate attached functions with the same name can be inserted.
The unique constraint on `(input_collection_id, name, is_deleted)` will catch this at database level, but the function should handle the constraint violation gracefully:
```go
err = s.catalog.metaDomain.AttachedFunctionDb(txCtx).Insert(attachedFunction)
if err != nil {
// Check if this is a duplicate name constraint violation
if strings.Contains(err.Error(), "duplicate") || strings.Contains(err.Error(), "unique constraint") {
// Another concurrent request created it, fetch and validate
existing, getErr := s.catalog.metaDomain.AttachedFunctionDb(txCtx).GetAnyByName(req.InputCollectionId, req.Name)
if getErr == nil && existing != nil {
if err := validateAttachedFunctionMatchesRequest(existing, req); err == nil {
attachedFunctionID = existing.ID
return nil // Idempotent success
}
}
}
log.Error("AttachFunction: failed to insert attached function", zap.Error(err))
return err
}
```
File: go/pkg/sysdb/coordinator/task.go
Line: 190| return nil | ||
| } | ||
|
|
||
| func (s *attachedFunctionDb) Update(attachedFunction *dbmodel.AttachedFunction) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
State inconsistency: The Update() method updates all non-zero fields of AttachedFunction. If the caller accidentally sets IsReady=false after it was already true, this will revert a completed attached function back to incomplete state, causing data inconsistency.
Add validation:
func (s *attachedFunctionDb) Update(attachedFunction *dbmodel.AttachedFunction) error {
// Prevent reverting is_ready from true to false
if !attachedFunction.IsReady {
existing, err := s.GetAnyByID(attachedFunction.ID)
if err != nil {
return err
}
if existing != nil && existing.IsReady {
return errors.New("cannot revert is_ready from true to false")
}
}
result := s.db.Model(&dbmodel.AttachedFunction{}).
Where("id = ?", attachedFunction.ID).
Where("is_deleted = ?", false).
Updates(attachedFunction)
// ... rest of implementation
}Context for Agents
[**BestPractice**]
**State inconsistency**: The `Update()` method updates all non-zero fields of `AttachedFunction`. If the caller accidentally sets `IsReady=false` after it was already `true`, this will revert a completed attached function back to incomplete state, causing data inconsistency.
Add validation:
```go
func (s *attachedFunctionDb) Update(attachedFunction *dbmodel.AttachedFunction) error {
// Prevent reverting is_ready from true to false
if !attachedFunction.IsReady {
existing, err := s.GetAnyByID(attachedFunction.ID)
if err != nil {
return err
}
if existing != nil && existing.IsReady {
return errors.New("cannot revert is_ready from true to false")
}
}
result := s.db.Model(&dbmodel.AttachedFunction{}).
Where("id = ?", attachedFunction.ID).
Where("is_deleted = ?", false).
Updates(attachedFunction)
// ... rest of implementation
}
```
File: go/pkg/sysdb/metastore/db/dao/task.go
Line: 45
This comment has been minimized.
This comment has been minimized.
7312e93 to
15436fe
Compare
0d57ff1 to
e713158
Compare
e713158 to
b6edf1a
Compare
| @@ -0,0 +1,2 @@ | |||
| -- Add is_ready column to attached_functions table to track initialization status | |||
| ALTER TABLE "public"."attached_functions" ADD COLUMN "is_ready" boolean NOT NULL DEFAULT false; | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
There's a potential issue with the default value for the is_ready column. The PR description states the default should be true for backward compatibility, but the migration sets it to false.
If the default is false, all existing attached_functions will be marked as not ready after this migration runs. New versions of the service that filter on is_ready = true will not be able to see or use them, effectively disabling them.
To ensure backward compatibility for existing records during a rolling deployment, this should likely be DEFAULT true. New code will explicitly set is_ready = false upon insertion, so this change would only affect existing rows.
| ALTER TABLE "public"."attached_functions" ADD COLUMN "is_ready" boolean NOT NULL DEFAULT false; | |
| ALTER TABLE "public"."attached_functions" ADD COLUMN "is_ready" boolean NOT NULL DEFAULT true; |
⚡ Committable suggestion
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
Context for Agents
[**CriticalError**]
There's a potential issue with the default value for the `is_ready` column. The PR description states the default should be `true` for backward compatibility, but the migration sets it to `false`.
If the default is `false`, all existing `attached_functions` will be marked as not ready after this migration runs. New versions of the service that filter on `is_ready = true` will not be able to see or use them, effectively disabling them.
To ensure backward compatibility for existing records during a rolling deployment, this should likely be `DEFAULT true`. New code will explicitly set `is_ready = false` upon insertion, so this change would only affect existing rows.
```suggestion
ALTER TABLE "public"."attached_functions" ADD COLUMN "is_ready" boolean NOT NULL DEFAULT true;
```
⚡ **Committable suggestion**
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
File: go/pkg/sysdb/metastore/db/migrations/20251116154842.sql
Line: 2
This comment has been minimized.
This comment has been minimized.
27a7569 to
4d9d369
Compare
| suite.Error(err) | ||
| suite.Nil(response) | ||
| suite.ErrorIs(err, common.ErrHeapServiceNotEnabled) | ||
| // Assertions - AttachFunction now succeeds without heap service, creating attached function with is_ready=false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clean this up.
rescrv
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would add lower level unit or intg tests for dao.
4d9d369 to
b45774b
Compare

Description of changes
Summarize the changes made by this PR.
This change adds an
is_ready: bool column to the Attachedfunctions table to facilitate a 2-phase commit on creation.AttachFunction now operates in three phases after this change:
Creates a row in the attached_functions table with
is_ready = false.Calls backfill (a stub for now).
Creates the output collection for this function and sets
is_ready to true.Getters for the attached_function model in go have been adjusted to filter by
is_ready=true. There are two added getters calledGetAnyByIdandGetAnyByNameto get rows that can also haveis_ready set to false.Test plan
How are these changes tested?
test_task_api.py has been re-enabledpytestfor python,yarn testfor js,cargo testfor rustMigration plan
Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?
Observability plan
What is the plan to instrument and monitor this change?
Documentation Changes
Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the _docs section?_