Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 16 additions & 24 deletions chromadb/test/distributed/test_task_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@
from chromadb.api.client import Client as ClientCreator
from chromadb.config import System
from chromadb.errors import ChromaError, NotFoundError
from chromadb.test.utils.wait_for_version_increase import (
get_collection_version,
wait_for_version_increase,
)


def test_function_attach_and_detach(basic_http_client: System) -> None:
def test_count_function_attach_and_detach(basic_http_client: System) -> None:
"""Test creating and removing a function with the record_counter operator"""
client = ClientCreator.from_system(basic_http_client)
client.reset()
Expand All @@ -22,21 +26,6 @@ def test_function_attach_and_detach(basic_http_client: System) -> None:
metadata={"description": "Sample documents for task processing"},
)

# Add initial documents
collection.add(
ids=["doc1", "doc2", "doc3"],
documents=[
"The quick brown fox jumps over the lazy dog",
"Machine learning is a subset of artificial intelligence",
"Python is a popular programming language",
],
metadatas=[{"source": "proverb"}, {"source": "tech"}, {"source": "tech"}],
)

# Verify collection has documents
assert collection.count() == 3
# TODO(tanujnay112): Verify the output collection has the correct count

# Create a task that counts records in the collection
attached_fn = collection.attach_function(
name="count_my_docs",
Expand All @@ -47,19 +36,22 @@ def test_function_attach_and_detach(basic_http_client: System) -> None:

# Verify task creation succeeded
assert attached_fn is not None
initial_version = get_collection_version(client, collection.name)

# Add more documents
# Add documents
collection.add(
ids=["doc4", "doc5"],
documents=[
"Chroma is a vector database",
"Tasks automate data processing",
],
ids=["doc_{}".format(i) for i in range(0, 300)],
documents=["test document"] * 300,
)

# Verify documents were added
assert collection.count() == 5
# TODO(tanujnay112): Verify the output collection has the correct count
assert collection.count() == 300

wait_for_version_increase(client, collection.name, initial_version)

result = client.get_collection("my_documents_counts").get("function_output")
assert result["metadatas"] is not None
assert result["metadatas"][0]["total_count"] == 300

# Remove the task
success = attached_fn.detach(
Expand Down
10 changes: 10 additions & 0 deletions go/pkg/sysdb/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/chroma-core/chroma/go/pkg/sysdb/metastore/db/dbmodel"
s3metastore "github.com/chroma-core/chroma/go/pkg/sysdb/metastore/s3"
"github.com/chroma-core/chroma/go/pkg/types"
"github.com/google/uuid"
"github.com/pingcap/log"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -286,6 +287,15 @@ func (s *Coordinator) FlushCollectionCompaction(ctx context.Context, flushCollec
return s.catalog.FlushCollectionCompaction(ctx, flushCollectionCompaction)
}

func (s *Coordinator) FlushCollectionCompactionsAndAttachedFunction(
ctx context.Context,
collectionCompactions []*model.FlushCollectionCompaction,
attachedFunctionID uuid.UUID,
completionOffset int64,
) (*model.ExtendedFlushCollectionInfo, error) {
return s.catalog.FlushCollectionCompactionsAndAttachedFunction(ctx, collectionCompactions, attachedFunctionID, completionOffset)
}

func (s *Coordinator) ListCollectionsToGc(ctx context.Context, cutoffTimeSecs *uint64, limit *uint64, tenantID *string, minVersionsIfAlive *uint64) ([]*model.CollectionToGc, error) {
return s.catalog.ListCollectionsToGc(ctx, cutoffTimeSecs, limit, tenantID, minVersionsIfAlive)
}
Expand Down
19 changes: 4 additions & 15 deletions go/pkg/sysdb/coordinator/create_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,6 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_SuccessfulCreation() {
[]string{inputCollectionID}, (*string)(nil), tenantID, databaseName, (*int32)(nil), (*int32)(nil), false).
Return([]*dbmodel.CollectionAndMetadata{{Collection: &dbmodel.Collection{ID: inputCollectionID}}}, nil).Once()

// Check output collection doesn't exist
suite.mockMetaDomain.On("CollectionDb", mock.Anything).Return(suite.mockCollectionDb).Once()
suite.mockCollectionDb.On("GetCollections",
[]string(nil), &outputCollectionName, tenantID, databaseName, (*int32)(nil), (*int32)(nil), false).
Return([]*dbmodel.CollectionAndMetadata{}, nil).Once()

// Insert attached function with lowest_live_nonce = NULL
suite.mockMetaDomain.On("AttachedFunctionDb", mock.Anything).Return(suite.mockAttachedFunctionDb).Once()
suite.mockAttachedFunctionDb.On("Insert", mock.MatchedBy(func(attachedFunction *dbmodel.AttachedFunction) bool {
Expand Down Expand Up @@ -225,7 +219,7 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_SuccessfulCreation() {

suite.NoError(err)
suite.NotNil(response)
suite.NotEmpty(response.Id)
suite.NotEmpty(response.AttachedFunction.Id)

// Verify all mocks were called as expected
suite.mockMetaDomain.AssertExpectations(suite.T())
Expand Down Expand Up @@ -317,7 +311,7 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_IdempotentRequest_Alrea
// Assertions
suite.NoError(err)
suite.NotNil(response)
suite.Equal(existingAttachedFunctionID.String(), response.Id)
suite.Equal(existingAttachedFunctionID.String(), response.AttachedFunction.Id)

// Verify no writes occurred (no Insert, no heap Push)
// Note: Transaction IS called for idempotency check, but no writes happen inside it
Expand Down Expand Up @@ -390,11 +384,6 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_RecoveryFlow() {
[]string{inputCollectionID}, (*string)(nil), tenantID, databaseName, (*int32)(nil), (*int32)(nil), false).
Return([]*dbmodel.CollectionAndMetadata{{Collection: &dbmodel.Collection{ID: inputCollectionID}}}, nil).Once()

suite.mockMetaDomain.On("CollectionDb", mock.Anything).Return(suite.mockCollectionDb).Once()
suite.mockCollectionDb.On("GetCollections",
[]string(nil), &outputCollectionName, tenantID, databaseName, (*int32)(nil), (*int32)(nil), false).
Return([]*dbmodel.CollectionAndMetadata{}, nil).Once()

suite.mockMetaDomain.On("AttachedFunctionDb", mock.Anything).Return(suite.mockAttachedFunctionDb).Once()
suite.mockAttachedFunctionDb.On("Insert", mock.Anything).Return(nil).Once()

Expand All @@ -408,7 +397,7 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_RecoveryFlow() {
response1, err1 := suite.coordinator.AttachFunction(ctx, request)
suite.NoError(err1)
suite.NotNil(response1)
suite.NotEmpty(response1.Id)
suite.NotEmpty(response1.AttachedFunction.Id)

// ========== GetAttachedFunctionByName: Should Return ErrAttachedFunctionNotReady ==========

Expand Down Expand Up @@ -453,7 +442,7 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_RecoveryFlow() {
response2, err2 := suite.coordinator.AttachFunction(ctx, request)
suite.NoError(err2)
suite.NotNil(response2)
suite.Equal(incompleteAttachedFunctionID.String(), response2.Id)
suite.Equal(incompleteAttachedFunctionID.String(), response2.AttachedFunction.Id)

// Verify transaction was called in both attempts (idempotency check happens in transaction)
suite.mockTxImpl.AssertNumberOfCalls(suite.T(), "Transaction", 2) // First attempt + recovery attempt
Expand Down
10 changes: 5 additions & 5 deletions go/pkg/sysdb/coordinator/heap_client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (suite *HeapClientIntegrationTestSuite) TestAttachFunctionPushesScheduleToH
})
suite.NoError(err, "Should attached function successfully")
suite.NotNil(response)
suite.NotEmpty(response.Id, "Attached function ID should be returned")
suite.NotEmpty(response.AttachedFunction.Id, "Attached function ID should be returned")

// Get updated heap summary
updatedSummary, err := suite.heapClient.Summary(ctx, &coordinatorpb.HeapSummaryRequest{})
Expand Down Expand Up @@ -376,12 +376,12 @@ func (suite *HeapClientIntegrationTestSuite) TestPartialTaskCleanup_ThenRecreate
})
suite.NoError(err, "Task should still exist after cleanup")
suite.NotNil(getResp)
suite.Equal(taskResp.Id, getResp.AttachedFunction.Id)
suite.Equal(taskResp.AttachedFunction.Id, getResp.AttachedFunction.Id)
suite.T().Logf("Task still exists after cleanup: %s", getResp.AttachedFunction.Id)

// STEP 4: Delete the task
_, err = suite.sysdbClient.DetachFunction(ctx, &coordinatorpb.DetachFunctionRequest{
AttachedFunctionId: taskResp.Id,
AttachedFunctionId: taskResp.AttachedFunction.Id,
DeleteOutput: true,
})
suite.NoError(err, "Should delete task")
Expand All @@ -398,8 +398,8 @@ func (suite *HeapClientIntegrationTestSuite) TestPartialTaskCleanup_ThenRecreate
})
suite.NoError(err, "Should be able to recreate task after deletion")
suite.NotNil(taskResp2)
suite.NotEqual(taskResp.Id, taskResp2.Id, "New task should have different ID")
suite.T().Logf("Successfully recreated task: %s", taskResp2.Id)
suite.NotEqual(taskResp.AttachedFunction.Id, taskResp2.AttachedFunction.Id, "New task should have different ID")
suite.T().Logf("Successfully recreated task: %s", taskResp2.AttachedFunction.Id)
}

func TestHeapClientIntegrationSuite(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions go/pkg/sysdb/coordinator/list_attached_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (suite *ListAttachedFunctionsTestSuite) TestListAttachedFunctions_Success()
MinRecordsForInvocation: 5,
CreatedAt: now,
UpdatedAt: now,
IsReady: true,
},
{
ID: uuid.New(),
Expand All @@ -80,6 +81,7 @@ func (suite *ListAttachedFunctionsTestSuite) TestListAttachedFunctions_Success()
MinRecordsForInvocation: 15,
CreatedAt: now,
UpdatedAt: now,
IsReady: true,
},
}

Expand Down Expand Up @@ -157,6 +159,7 @@ func (suite *ListAttachedFunctionsTestSuite) TestListAttachedFunctions_FunctionD
MinRecordsForInvocation: 1,
CreatedAt: now,
UpdatedAt: now,
IsReady: true,
}

suite.mockMetaDomain.On("AttachedFunctionDb", ctx).Return(suite.mockAttachedFunctionDb).Once()
Expand Down Expand Up @@ -191,6 +194,7 @@ func (suite *ListAttachedFunctionsTestSuite) TestListAttachedFunctions_InvalidPa
MinRecordsForInvocation: 1,
CreatedAt: now,
UpdatedAt: now,
IsReady: true,
}

suite.mockMetaDomain.On("AttachedFunctionDb", ctx).Return(suite.mockAttachedFunctionDb).Once()
Expand Down
4 changes: 4 additions & 0 deletions go/pkg/sysdb/coordinator/model/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ type FlushCollectionInfo struct {
AttachedFunctionCompletionOffset *int64
}

type ExtendedFlushCollectionInfo struct {
Collections []*FlushCollectionInfo
}

func FilterCollection(collection *Collection, collectionID types.UniqueID, collectionName *string) bool {
if collectionID != types.NilUniqueID() && collectionID != collection.ID {
return false
Expand Down
67 changes: 67 additions & 0 deletions go/pkg/sysdb/coordinator/table_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -1736,6 +1736,73 @@ func (tc *Catalog) FlushCollectionCompaction(ctx context.Context, flushCollectio
return flushCollectionInfo, nil
}

// FlushCollectionCompactionsAndAttachedFunction atomically updates multiple collection compaction data
// and attached function completion offset in a single transaction.
func (tc *Catalog) FlushCollectionCompactionsAndAttachedFunction(
ctx context.Context,
collectionCompactions []*model.FlushCollectionCompaction,
attachedFunctionID uuid.UUID,
completionOffset int64,
) (*model.ExtendedFlushCollectionInfo, error) {
if !tc.versionFileEnabled {
// Attached-function-based compactions are only supported with versioned collections
log.Error("FlushCollectionCompactionsAndAttachedFunction is only supported for versioned collections")
return nil, errors.New("attached-function-based compaction requires versioned collections")
}

if len(collectionCompactions) == 0 {
return nil, errors.New("at least one collection compaction is required")
}

flushInfos := make([]*model.FlushCollectionInfo, 0, len(collectionCompactions))

err := tc.txImpl.Transaction(ctx, func(txCtx context.Context) error {
var err error
// Get the transaction from context to pass to FlushCollectionCompactionForVersionedCollection
tx := dbcore.GetDB(txCtx)

// Handle all collection compactions
for _, collectionCompaction := range collectionCompactions {
log.Info("FlushCollectionCompactionsAndAttachedFunction", zap.String("collection_id", collectionCompaction.ID.String()))
flushInfo, err := tc.FlushCollectionCompactionForVersionedCollection(txCtx, collectionCompaction, tx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note for myself: discuss the transaction setup for this

if err != nil {
return err
}
flushInfos = append(flushInfos, flushInfo)
}

err = tc.metaDomain.AttachedFunctionDb(txCtx).Update(&dbmodel.AttachedFunction{
ID: attachedFunctionID,
Comment on lines +1769 to +1775
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

Idempotency Issue: Direct UPDATE Without Existence Check

The Update call modifies completion_offset without verifying:

  1. The attached function exists
  2. The new offset is greater than or equal to the current offset (preventing backward progress)
err = tc.metaDomain.AttachedFunctionDb(txCtx).Update(&dbmodel.AttachedFunction{
    ID:               attachedFunctionID,
    CompletionOffset: completionOffset,
})

If this transaction is retried or replayed (e.g., due to network issues), it could:

  • Succeed silently even if the attached function was deleted
  • Overwrite a higher completion offset with a lower one if messages arrive out of order

Recommendation:

// First fetch current state
currentFunction, err := tc.metaDomain.AttachedFunctionDb(txCtx).GetByID(attachedFunctionID)
if err != nil {
    return err // Handle not found
}

// Verify offset is progressing forward
if completionOffset < currentFunction.CompletionOffset {
    return errors.New("completion offset cannot move backward")
}

// Then update
err = tc.metaDomain.AttachedFunctionDb(txCtx).Update(&dbmodel.AttachedFunction{
    ID:               attachedFunctionID,
    CompletionOffset: completionOffset,
})
Context for Agents
**Idempotency Issue: Direct UPDATE Without Existence Check**

The `Update` call modifies `completion_offset` without verifying:
1. The attached function exists
2. The new offset is greater than or equal to the current offset (preventing backward progress)

```go
err = tc.metaDomain.AttachedFunctionDb(txCtx).Update(&dbmodel.AttachedFunction{
    ID:               attachedFunctionID,
    CompletionOffset: completionOffset,
})
```

If this transaction is retried or replayed (e.g., due to network issues), it could:
- Succeed silently even if the attached function was deleted
- Overwrite a higher completion offset with a lower one if messages arrive out of order

**Recommendation**:
```go
// First fetch current state
currentFunction, err := tc.metaDomain.AttachedFunctionDb(txCtx).GetByID(attachedFunctionID)
if err != nil {
    return err // Handle not found
}

// Verify offset is progressing forward
if completionOffset < currentFunction.CompletionOffset {
    return errors.New("completion offset cannot move backward")
}

// Then update
err = tc.metaDomain.AttachedFunctionDb(txCtx).Update(&dbmodel.AttachedFunction{
    ID:               attachedFunctionID,
    CompletionOffset: completionOffset,
})
```

File: go/pkg/sysdb/coordinator/table_catalog.go
Line: 1775

CompletionOffset: completionOffset,
})
if err != nil {
return err
}

return nil
})

if err != nil {
return nil, err
}

// Populate attached function fields with authoritative values from database
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Documentation]

The comment mentions populating with "authoritative values from database", but the code uses the completionOffset value passed into the function. While this is correct if the transaction succeeds, the comment could be slightly more precise.

Consider rephrasing to clarify that it's populating the response with the successfully committed value.

Suggested change
// Populate attached function fields with authoritative values from database
// Populate attached function fields in the response with the value that was successfully committed.

Committable suggestion

Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Context for Agents
The comment mentions populating with "authoritative values from database", but the code uses the `completionOffset` value passed into the function. While this is correct if the transaction succeeds, the comment could be slightly more precise.

Consider rephrasing to clarify that it's populating the response with the successfully committed value.

```suggestion
	// Populate attached function fields in the response with the value that was successfully committed.
```

⚡ **Committable suggestion**

Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

File: go/pkg/sysdb/coordinator/table_catalog.go
Line: 1789

for _, flushInfo := range flushInfos {
flushInfo.AttachedFunctionCompletionOffset = &completionOffset
}

// Log with first collection ID (typically the output collection)
log.Info("FlushCollectionCompactionsAndAttachedFunction",
zap.String("first_collection_id", collectionCompactions[0].ID.String()),
zap.Int("collection_count", len(collectionCompactions)),
zap.String("attached_function_id", attachedFunctionID.String()),
zap.Int64("completion_offset", completionOffset))

return &model.ExtendedFlushCollectionInfo{
Collections: flushInfos,
}, nil
}

func (tc *Catalog) validateVersionFile(versionFile *coordinatorpb.CollectionVersionFile, collectionID string, version int64) error {
if versionFile.GetCollectionInfoImmutable().GetCollectionId() != collectionID {
log.Error("collection id mismatch", zap.String("collection_id", collectionID), zap.String("version_file_collection_id", versionFile.GetCollectionInfoImmutable().GetCollectionId()))
Expand Down
28 changes: 12 additions & 16 deletions go/pkg/sysdb/coordinator/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (s *Coordinator) validateAttachedFunctionMatchesRequest(ctx context.Context
return nil
}

// AttachFunction creates a new attached function in the database
// AttachFunction creates an output collection and attached function in a single transaction
func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.AttachFunctionRequest) (*coordinatorpb.AttachFunctionResponse, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we handle soft deletes for

  1. Attaching
  2. Comapction flush?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, compaction flushing uses the same logic as before which fails the flush if is_deleted is true. Similarly, a flush on a function that doesn't exist or is soft deleted will abort the transaction.

Attaching a function does not interact with soft deleted functions and soft deleted functions are renamed with a "deleted" prefix just how collections do.

log := log.With(zap.String("method", "AttachFunction"))

Expand Down Expand Up @@ -143,18 +143,6 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
return common.ErrCollectionNotFound
}

// Check if output collection already exists
outputCollectionName := req.OutputCollectionName
existingOutputCollections, err := s.catalog.metaDomain.CollectionDb(txCtx).GetCollections(nil, &outputCollectionName, req.TenantId, req.Database, nil, nil, false)
if err != nil {
log.Error("AttachFunction: failed to check output collection", zap.Error(err))
return err
}
if len(existingOutputCollections) > 0 {
log.Error("AttachFunction: output collection already exists")
return common.ErrCollectionUniqueConstraintViolation
}

// Serialize params
var paramsJSON string
if req.Params != nil {
Expand All @@ -168,6 +156,7 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
paramsJSON = "{}"
}

// Create attached function
now := time.Now()
attachedFunction := &dbmodel.AttachedFunction{
ID: attachedFunctionID,
Expand All @@ -176,6 +165,7 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
DatabaseID: databases[0].ID,
InputCollectionID: req.InputCollectionId,
OutputCollectionName: req.OutputCollectionName,
OutputCollectionID: nil,
FunctionID: function.ID,
FunctionParams: paramsJSON,
CompletionOffset: 0,
Expand All @@ -196,6 +186,7 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att

log.Debug("AttachFunction: attached function created with is_ready=false",
zap.String("attached_function_id", attachedFunctionID.String()),
zap.String("output_collection_name", req.OutputCollectionName),
zap.String("name", req.Name))
return nil
})
Expand All @@ -205,7 +196,9 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
}

return &coordinatorpb.AttachFunctionResponse{
Id: attachedFunctionID.String(),
AttachedFunction: &coordinatorpb.AttachedFunction{
Id: attachedFunctionID.String(),
},
}, nil
}

Comment on lines 198 to 204
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

The AttachFunctionResponse now returns a coordinatorpb.AttachedFunction object, but only the Id field is populated. This can be misleading for clients of this API, who might expect a fully populated object. If only the ID is needed, returning just the ID string would be more direct. If the full object is intended to be returned, it should be populated with the data available at creation time (name, function name, etc.).

Consider either adjusting the response message to only contain the ID or populating more fields in the returned AttachedFunction object for consistency with other endpoints like GetAttachedFunctionByName.

Context for Agents
The `AttachFunctionResponse` now returns a `coordinatorpb.AttachedFunction` object, but only the `Id` field is populated. This can be misleading for clients of this API, who might expect a fully populated object. If only the ID is needed, returning just the ID string would be more direct. If the full object is intended to be returned, it should be populated with the data available at creation time (name, function name, etc.).

Consider either adjusting the response message to only contain the ID or populating more fields in the returned `AttachedFunction` object for consistency with other endpoints like `GetAttachedFunctionByName`.

File: go/pkg/sysdb/coordinator/task.go
Line: 204

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah why is only id populated?

Expand All @@ -229,6 +222,10 @@ func attachedFunctionToProto(attachedFunction *dbmodel.AttachedFunction, functio
return nil, status.Errorf(codes.Internal, "attached function has invalid completion_offset: %d", attachedFunction.CompletionOffset)
}

if !attachedFunction.IsReady {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

The check if !attachedFunction.IsReady inside attachedFunctionToProto appears to be redundant. The callers of this function (GetAttachedFunctionByName, ListAttachedFunctions, GetAttachedFunctionByUuid) use data access methods (GetByName, GetByCollectionID, GetByID) that are already specified to return only "ready" attached functions.

This redundant check returns a generic codes.Internal error, which could be confusing for clients. If an unready function is passed here, it signifies an internal logic error. Consider removing this check to simplify the code and rely on the data access layer to enforce the "ready" contract. If the check is kept for defensiveness, a panic might be more appropriate to signal the internal invariant violation.

Context for Agents
The check `if !attachedFunction.IsReady` inside `attachedFunctionToProto` appears to be redundant. The callers of this function (`GetAttachedFunctionByName`, `ListAttachedFunctions`, `GetAttachedFunctionByUuid`) use data access methods (`GetByName`, `GetByCollectionID`, `GetByID`) that are already specified to return only "ready" attached functions.

This redundant check returns a generic `codes.Internal` error, which could be confusing for clients. If an unready function is passed here, it signifies an internal logic error. Consider removing this check to simplify the code and rely on the data access layer to enforce the "ready" contract. If the check is kept for defensiveness, a panic might be more appropriate to signal the internal invariant violation.

File: go/pkg/sysdb/coordinator/task.go
Line: 225

return nil, status.Errorf(codes.Internal, "serialized attached function is not ready")
}

attachedFunctionProto := &coordinatorpb.AttachedFunction{
Id: attachedFunction.ID.String(),
Name: attachedFunction.Name,
Expand All @@ -243,7 +240,6 @@ func attachedFunctionToProto(attachedFunction *dbmodel.AttachedFunction, functio
DatabaseId: attachedFunction.DatabaseID,
CreatedAt: uint64(attachedFunction.CreatedAt.UnixMicro()),
UpdatedAt: uint64(attachedFunction.UpdatedAt.UnixMicro()),
IsReady: attachedFunction.IsReady,
}
if attachedFunction.OutputCollectionID != nil {
attachedFunctionProto.OutputCollectionId = attachedFunction.OutputCollectionID
Expand Down Expand Up @@ -581,7 +577,7 @@ func (s *Coordinator) FinishCreateAttachedFunction(ctx context.Context, req *coo

_, _, err = s.catalog.CreateCollectionAndSegments(txCtx, collection, segments, 0)
if err != nil {
log.Error("FinishCreateAttachedFunction: failed to create collection", zap.Error(err))
log.Error("FinishCreateAttachedFunction: failed to create output collection", zap.Error(err))
return err
}

Expand Down
Loading
Loading