Skip to content

Commit 3fec52a

Browse files
authored
[ENH]: Execute task with no backfill or incremental (#5867)
## Description of changes _Summarize the changes made by this PR._ This change introduces an AttachedFunctionOrchestrator that does the following chain of operators GetAttachedFunction -> GetCollectionAndSegments for output collection -> ExecuteAttachedFunction ->  MaterializeLogs It edits the RegisterOrchestrator to spawn a FinishAttachedTask operators that flushes compaction for the input + output collections and flushes updated function data. The compact method in [compact.rs](http://compact.rs) has been edited to launch an `AttachedFunctionOrchestrator`​ in parallel on the results of the initial `LogFetchOrchestrator`​. This orchestrator returns a chunk of MaterializedLog records that get applied via another instance of `ApplyLogOrchestrator`​. The above runs in parallel to the normal compaction workflow which simply runs an `ApplyLogsOrchestrator`​ on the results of the initial `LogFetchOrchestrator`​. The above two threads return a `CollectionRegisterInfo`​. The function-related thread also returns a `FunctionContext`​. The two threads are joined and each of these structures are passed onto the RegisterOrchestrator for completion. `CompactionContext.collection_info`​ has been replaced by `CompactionContext.input_collection_info, CompactionContext.output_collection_info` to reflect the fact that "compactions" can be pulling data from one collection and compacting to another. ApplyLogsOrchestrator always applies given logs to the collection specified by `CompactionContext.output_collection_info` Hence, we take care to set this field to the appropriate collection before calling `run_apply_logs`​ in each thread. - Improvements & Bug fixes - ... - New functionality - ... ## Test plan _How are these changes tested?_ - [ ] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Migration plan _Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?_ ## Observability plan _What is the plan to instrument and monitor this change?_ ## Documentation Changes _Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the_ [_docs section](https://github.com/chroma-core/chroma/tree/main/docs/docs.trychroma.com)?_
1 parent 6edd60c commit 3fec52a

32 files changed

+2272
-347
lines changed

chromadb/test/distributed/test_task_api.py

Lines changed: 16 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,13 @@
99
from chromadb.api.client import Client as ClientCreator
1010
from chromadb.config import System
1111
from chromadb.errors import ChromaError, NotFoundError
12+
from chromadb.test.utils.wait_for_version_increase import (
13+
get_collection_version,
14+
wait_for_version_increase,
15+
)
1216

1317

14-
def test_function_attach_and_detach(basic_http_client: System) -> None:
18+
def test_count_function_attach_and_detach(basic_http_client: System) -> None:
1519
"""Test creating and removing a function with the record_counter operator"""
1620
client = ClientCreator.from_system(basic_http_client)
1721
client.reset()
@@ -22,21 +26,6 @@ def test_function_attach_and_detach(basic_http_client: System) -> None:
2226
metadata={"description": "Sample documents for task processing"},
2327
)
2428

25-
# Add initial documents
26-
collection.add(
27-
ids=["doc1", "doc2", "doc3"],
28-
documents=[
29-
"The quick brown fox jumps over the lazy dog",
30-
"Machine learning is a subset of artificial intelligence",
31-
"Python is a popular programming language",
32-
],
33-
metadatas=[{"source": "proverb"}, {"source": "tech"}, {"source": "tech"}],
34-
)
35-
36-
# Verify collection has documents
37-
assert collection.count() == 3
38-
# TODO(tanujnay112): Verify the output collection has the correct count
39-
4029
# Create a task that counts records in the collection
4130
attached_fn = collection.attach_function(
4231
name="count_my_docs",
@@ -47,19 +36,22 @@ def test_function_attach_and_detach(basic_http_client: System) -> None:
4736

4837
# Verify task creation succeeded
4938
assert attached_fn is not None
39+
initial_version = get_collection_version(client, collection.name)
5040

51-
# Add more documents
41+
# Add documents
5242
collection.add(
53-
ids=["doc4", "doc5"],
54-
documents=[
55-
"Chroma is a vector database",
56-
"Tasks automate data processing",
57-
],
43+
ids=["doc_{}".format(i) for i in range(0, 300)],
44+
documents=["test document"] * 300,
5845
)
5946

6047
# Verify documents were added
61-
assert collection.count() == 5
62-
# TODO(tanujnay112): Verify the output collection has the correct count
48+
assert collection.count() == 300
49+
50+
wait_for_version_increase(client, collection.name, initial_version)
51+
52+
result = client.get_collection("my_documents_counts").get("function_output")
53+
assert result["metadatas"] is not None
54+
assert result["metadatas"][0]["total_count"] == 300
6355

6456
# Remove the task
6557
success = attached_fn.detach(

go/pkg/sysdb/coordinator/coordinator.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/chroma-core/chroma/go/pkg/sysdb/metastore/db/dbmodel"
1414
s3metastore "github.com/chroma-core/chroma/go/pkg/sysdb/metastore/s3"
1515
"github.com/chroma-core/chroma/go/pkg/types"
16+
"github.com/google/uuid"
1617
"github.com/pingcap/log"
1718
"go.uber.org/zap"
1819
)
@@ -286,6 +287,15 @@ func (s *Coordinator) FlushCollectionCompaction(ctx context.Context, flushCollec
286287
return s.catalog.FlushCollectionCompaction(ctx, flushCollectionCompaction)
287288
}
288289

290+
func (s *Coordinator) FlushCollectionCompactionsAndAttachedFunction(
291+
ctx context.Context,
292+
collectionCompactions []*model.FlushCollectionCompaction,
293+
attachedFunctionID uuid.UUID,
294+
completionOffset int64,
295+
) (*model.ExtendedFlushCollectionInfo, error) {
296+
return s.catalog.FlushCollectionCompactionsAndAttachedFunction(ctx, collectionCompactions, attachedFunctionID, completionOffset)
297+
}
298+
289299
func (s *Coordinator) ListCollectionsToGc(ctx context.Context, cutoffTimeSecs *uint64, limit *uint64, tenantID *string, minVersionsIfAlive *uint64) ([]*model.CollectionToGc, error) {
290300
return s.catalog.ListCollectionsToGc(ctx, cutoffTimeSecs, limit, tenantID, minVersionsIfAlive)
291301
}

go/pkg/sysdb/coordinator/create_task_test.go

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -191,12 +191,6 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_SuccessfulCreation() {
191191
[]string{inputCollectionID}, (*string)(nil), tenantID, databaseName, (*int32)(nil), (*int32)(nil), false).
192192
Return([]*dbmodel.CollectionAndMetadata{{Collection: &dbmodel.Collection{ID: inputCollectionID}}}, nil).Once()
193193

194-
// Check output collection doesn't exist
195-
suite.mockMetaDomain.On("CollectionDb", mock.Anything).Return(suite.mockCollectionDb).Once()
196-
suite.mockCollectionDb.On("GetCollections",
197-
[]string(nil), &outputCollectionName, tenantID, databaseName, (*int32)(nil), (*int32)(nil), false).
198-
Return([]*dbmodel.CollectionAndMetadata{}, nil).Once()
199-
200194
// Insert attached function with lowest_live_nonce = NULL
201195
suite.mockMetaDomain.On("AttachedFunctionDb", mock.Anything).Return(suite.mockAttachedFunctionDb).Once()
202196
suite.mockAttachedFunctionDb.On("Insert", mock.MatchedBy(func(attachedFunction *dbmodel.AttachedFunction) bool {
@@ -225,7 +219,7 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_SuccessfulCreation() {
225219

226220
suite.NoError(err)
227221
suite.NotNil(response)
228-
suite.NotEmpty(response.Id)
222+
suite.NotEmpty(response.AttachedFunction.Id)
229223

230224
// Verify all mocks were called as expected
231225
suite.mockMetaDomain.AssertExpectations(suite.T())
@@ -317,7 +311,7 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_IdempotentRequest_Alrea
317311
// Assertions
318312
suite.NoError(err)
319313
suite.NotNil(response)
320-
suite.Equal(existingAttachedFunctionID.String(), response.Id)
314+
suite.Equal(existingAttachedFunctionID.String(), response.AttachedFunction.Id)
321315

322316
// Verify no writes occurred (no Insert, no heap Push)
323317
// Note: Transaction IS called for idempotency check, but no writes happen inside it
@@ -390,11 +384,6 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_RecoveryFlow() {
390384
[]string{inputCollectionID}, (*string)(nil), tenantID, databaseName, (*int32)(nil), (*int32)(nil), false).
391385
Return([]*dbmodel.CollectionAndMetadata{{Collection: &dbmodel.Collection{ID: inputCollectionID}}}, nil).Once()
392386

393-
suite.mockMetaDomain.On("CollectionDb", mock.Anything).Return(suite.mockCollectionDb).Once()
394-
suite.mockCollectionDb.On("GetCollections",
395-
[]string(nil), &outputCollectionName, tenantID, databaseName, (*int32)(nil), (*int32)(nil), false).
396-
Return([]*dbmodel.CollectionAndMetadata{}, nil).Once()
397-
398387
suite.mockMetaDomain.On("AttachedFunctionDb", mock.Anything).Return(suite.mockAttachedFunctionDb).Once()
399388
suite.mockAttachedFunctionDb.On("Insert", mock.Anything).Return(nil).Once()
400389

@@ -408,7 +397,7 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_RecoveryFlow() {
408397
response1, err1 := suite.coordinator.AttachFunction(ctx, request)
409398
suite.NoError(err1)
410399
suite.NotNil(response1)
411-
suite.NotEmpty(response1.Id)
400+
suite.NotEmpty(response1.AttachedFunction.Id)
412401

413402
// ========== GetAttachedFunctionByName: Should Return ErrAttachedFunctionNotReady ==========
414403

@@ -453,7 +442,7 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_RecoveryFlow() {
453442
response2, err2 := suite.coordinator.AttachFunction(ctx, request)
454443
suite.NoError(err2)
455444
suite.NotNil(response2)
456-
suite.Equal(incompleteAttachedFunctionID.String(), response2.Id)
445+
suite.Equal(incompleteAttachedFunctionID.String(), response2.AttachedFunction.Id)
457446

458447
// Verify transaction was called in both attempts (idempotency check happens in transaction)
459448
suite.mockTxImpl.AssertNumberOfCalls(suite.T(), "Transaction", 2) // First attempt + recovery attempt

go/pkg/sysdb/coordinator/heap_client_integration_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ func (suite *HeapClientIntegrationTestSuite) TestAttachFunctionPushesScheduleToH
176176
})
177177
suite.NoError(err, "Should attached function successfully")
178178
suite.NotNil(response)
179-
suite.NotEmpty(response.Id, "Attached function ID should be returned")
179+
suite.NotEmpty(response.AttachedFunction.Id, "Attached function ID should be returned")
180180

181181
// Get updated heap summary
182182
updatedSummary, err := suite.heapClient.Summary(ctx, &coordinatorpb.HeapSummaryRequest{})
@@ -376,12 +376,12 @@ func (suite *HeapClientIntegrationTestSuite) TestPartialTaskCleanup_ThenRecreate
376376
})
377377
suite.NoError(err, "Task should still exist after cleanup")
378378
suite.NotNil(getResp)
379-
suite.Equal(taskResp.Id, getResp.AttachedFunction.Id)
379+
suite.Equal(taskResp.AttachedFunction.Id, getResp.AttachedFunction.Id)
380380
suite.T().Logf("Task still exists after cleanup: %s", getResp.AttachedFunction.Id)
381381

382382
// STEP 4: Delete the task
383383
_, err = suite.sysdbClient.DetachFunction(ctx, &coordinatorpb.DetachFunctionRequest{
384-
AttachedFunctionId: taskResp.Id,
384+
AttachedFunctionId: taskResp.AttachedFunction.Id,
385385
DeleteOutput: true,
386386
})
387387
suite.NoError(err, "Should delete task")
@@ -398,8 +398,8 @@ func (suite *HeapClientIntegrationTestSuite) TestPartialTaskCleanup_ThenRecreate
398398
})
399399
suite.NoError(err, "Should be able to recreate task after deletion")
400400
suite.NotNil(taskResp2)
401-
suite.NotEqual(taskResp.Id, taskResp2.Id, "New task should have different ID")
402-
suite.T().Logf("Successfully recreated task: %s", taskResp2.Id)
401+
suite.NotEqual(taskResp.AttachedFunction.Id, taskResp2.AttachedFunction.Id, "New task should have different ID")
402+
suite.T().Logf("Successfully recreated task: %s", taskResp2.AttachedFunction.Id)
403403
}
404404

405405
func TestHeapClientIntegrationSuite(t *testing.T) {

go/pkg/sysdb/coordinator/list_attached_functions_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ func (suite *ListAttachedFunctionsTestSuite) TestListAttachedFunctions_Success()
6666
MinRecordsForInvocation: 5,
6767
CreatedAt: now,
6868
UpdatedAt: now,
69+
IsReady: true,
6970
},
7071
{
7172
ID: uuid.New(),
@@ -80,6 +81,7 @@ func (suite *ListAttachedFunctionsTestSuite) TestListAttachedFunctions_Success()
8081
MinRecordsForInvocation: 15,
8182
CreatedAt: now,
8283
UpdatedAt: now,
84+
IsReady: true,
8385
},
8486
}
8587

@@ -157,6 +159,7 @@ func (suite *ListAttachedFunctionsTestSuite) TestListAttachedFunctions_FunctionD
157159
MinRecordsForInvocation: 1,
158160
CreatedAt: now,
159161
UpdatedAt: now,
162+
IsReady: true,
160163
}
161164

162165
suite.mockMetaDomain.On("AttachedFunctionDb", ctx).Return(suite.mockAttachedFunctionDb).Once()
@@ -191,6 +194,7 @@ func (suite *ListAttachedFunctionsTestSuite) TestListAttachedFunctions_InvalidPa
191194
MinRecordsForInvocation: 1,
192195
CreatedAt: now,
193196
UpdatedAt: now,
197+
IsReady: true,
194198
}
195199

196200
suite.mockMetaDomain.On("AttachedFunctionDb", ctx).Return(suite.mockAttachedFunctionDb).Once()

go/pkg/sysdb/coordinator/model/collection.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,10 @@ type FlushCollectionInfo struct {
102102
AttachedFunctionCompletionOffset *int64
103103
}
104104

105+
type ExtendedFlushCollectionInfo struct {
106+
Collections []*FlushCollectionInfo
107+
}
108+
105109
func FilterCollection(collection *Collection, collectionID types.UniqueID, collectionName *string) bool {
106110
if collectionID != types.NilUniqueID() && collectionID != collection.ID {
107111
return false

go/pkg/sysdb/coordinator/table_catalog.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1736,6 +1736,73 @@ func (tc *Catalog) FlushCollectionCompaction(ctx context.Context, flushCollectio
17361736
return flushCollectionInfo, nil
17371737
}
17381738

1739+
// FlushCollectionCompactionsAndAttachedFunction atomically updates multiple collection compaction data
1740+
// and attached function completion offset in a single transaction.
1741+
func (tc *Catalog) FlushCollectionCompactionsAndAttachedFunction(
1742+
ctx context.Context,
1743+
collectionCompactions []*model.FlushCollectionCompaction,
1744+
attachedFunctionID uuid.UUID,
1745+
completionOffset int64,
1746+
) (*model.ExtendedFlushCollectionInfo, error) {
1747+
if !tc.versionFileEnabled {
1748+
// Attached-function-based compactions are only supported with versioned collections
1749+
log.Error("FlushCollectionCompactionsAndAttachedFunction is only supported for versioned collections")
1750+
return nil, errors.New("attached-function-based compaction requires versioned collections")
1751+
}
1752+
1753+
if len(collectionCompactions) == 0 {
1754+
return nil, errors.New("at least one collection compaction is required")
1755+
}
1756+
1757+
flushInfos := make([]*model.FlushCollectionInfo, 0, len(collectionCompactions))
1758+
1759+
err := tc.txImpl.Transaction(ctx, func(txCtx context.Context) error {
1760+
var err error
1761+
// Get the transaction from context to pass to FlushCollectionCompactionForVersionedCollection
1762+
tx := dbcore.GetDB(txCtx)
1763+
1764+
// Handle all collection compactions
1765+
for _, collectionCompaction := range collectionCompactions {
1766+
log.Info("FlushCollectionCompactionsAndAttachedFunction", zap.String("collection_id", collectionCompaction.ID.String()))
1767+
flushInfo, err := tc.FlushCollectionCompactionForVersionedCollection(txCtx, collectionCompaction, tx)
1768+
if err != nil {
1769+
return err
1770+
}
1771+
flushInfos = append(flushInfos, flushInfo)
1772+
}
1773+
1774+
err = tc.metaDomain.AttachedFunctionDb(txCtx).Update(&dbmodel.AttachedFunction{
1775+
ID: attachedFunctionID,
1776+
CompletionOffset: completionOffset,
1777+
})
1778+
if err != nil {
1779+
return err
1780+
}
1781+
1782+
return nil
1783+
})
1784+
1785+
if err != nil {
1786+
return nil, err
1787+
}
1788+
1789+
// Populate attached function fields with authoritative values from database
1790+
for _, flushInfo := range flushInfos {
1791+
flushInfo.AttachedFunctionCompletionOffset = &completionOffset
1792+
}
1793+
1794+
// Log with first collection ID (typically the output collection)
1795+
log.Info("FlushCollectionCompactionsAndAttachedFunction",
1796+
zap.String("first_collection_id", collectionCompactions[0].ID.String()),
1797+
zap.Int("collection_count", len(collectionCompactions)),
1798+
zap.String("attached_function_id", attachedFunctionID.String()),
1799+
zap.Int64("completion_offset", completionOffset))
1800+
1801+
return &model.ExtendedFlushCollectionInfo{
1802+
Collections: flushInfos,
1803+
}, nil
1804+
}
1805+
17391806
func (tc *Catalog) validateVersionFile(versionFile *coordinatorpb.CollectionVersionFile, collectionID string, version int64) error {
17401807
if versionFile.GetCollectionInfoImmutable().GetCollectionId() != collectionID {
17411808
log.Error("collection id mismatch", zap.String("collection_id", collectionID), zap.String("version_file_collection_id", versionFile.GetCollectionInfoImmutable().GetCollectionId()))

go/pkg/sysdb/coordinator/task.go

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func (s *Coordinator) validateAttachedFunctionMatchesRequest(ctx context.Context
7676
return nil
7777
}
7878

79-
// AttachFunction creates a new attached function in the database
79+
// AttachFunction creates an output collection and attached function in a single transaction
8080
func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.AttachFunctionRequest) (*coordinatorpb.AttachFunctionResponse, error) {
8181
log := log.With(zap.String("method", "AttachFunction"))
8282

@@ -143,18 +143,6 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
143143
return common.ErrCollectionNotFound
144144
}
145145

146-
// Check if output collection already exists
147-
outputCollectionName := req.OutputCollectionName
148-
existingOutputCollections, err := s.catalog.metaDomain.CollectionDb(txCtx).GetCollections(nil, &outputCollectionName, req.TenantId, req.Database, nil, nil, false)
149-
if err != nil {
150-
log.Error("AttachFunction: failed to check output collection", zap.Error(err))
151-
return err
152-
}
153-
if len(existingOutputCollections) > 0 {
154-
log.Error("AttachFunction: output collection already exists")
155-
return common.ErrCollectionUniqueConstraintViolation
156-
}
157-
158146
// Serialize params
159147
var paramsJSON string
160148
if req.Params != nil {
@@ -168,6 +156,7 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
168156
paramsJSON = "{}"
169157
}
170158

159+
// Create attached function
171160
now := time.Now()
172161
attachedFunction := &dbmodel.AttachedFunction{
173162
ID: attachedFunctionID,
@@ -176,6 +165,7 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
176165
DatabaseID: databases[0].ID,
177166
InputCollectionID: req.InputCollectionId,
178167
OutputCollectionName: req.OutputCollectionName,
168+
OutputCollectionID: nil,
179169
FunctionID: function.ID,
180170
FunctionParams: paramsJSON,
181171
CompletionOffset: 0,
@@ -196,6 +186,7 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
196186

197187
log.Debug("AttachFunction: attached function created with is_ready=false",
198188
zap.String("attached_function_id", attachedFunctionID.String()),
189+
zap.String("output_collection_name", req.OutputCollectionName),
199190
zap.String("name", req.Name))
200191
return nil
201192
})
@@ -205,7 +196,9 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
205196
}
206197

207198
return &coordinatorpb.AttachFunctionResponse{
208-
Id: attachedFunctionID.String(),
199+
AttachedFunction: &coordinatorpb.AttachedFunction{
200+
Id: attachedFunctionID.String(),
201+
},
209202
}, nil
210203
}
211204

@@ -229,6 +222,10 @@ func attachedFunctionToProto(attachedFunction *dbmodel.AttachedFunction, functio
229222
return nil, status.Errorf(codes.Internal, "attached function has invalid completion_offset: %d", attachedFunction.CompletionOffset)
230223
}
231224

225+
if !attachedFunction.IsReady {
226+
return nil, status.Errorf(codes.Internal, "serialized attached function is not ready")
227+
}
228+
232229
attachedFunctionProto := &coordinatorpb.AttachedFunction{
233230
Id: attachedFunction.ID.String(),
234231
Name: attachedFunction.Name,
@@ -243,7 +240,6 @@ func attachedFunctionToProto(attachedFunction *dbmodel.AttachedFunction, functio
243240
DatabaseId: attachedFunction.DatabaseID,
244241
CreatedAt: uint64(attachedFunction.CreatedAt.UnixMicro()),
245242
UpdatedAt: uint64(attachedFunction.UpdatedAt.UnixMicro()),
246-
IsReady: attachedFunction.IsReady,
247243
}
248244
if attachedFunction.OutputCollectionID != nil {
249245
attachedFunctionProto.OutputCollectionId = attachedFunction.OutputCollectionID
@@ -581,7 +577,7 @@ func (s *Coordinator) FinishCreateAttachedFunction(ctx context.Context, req *coo
581577

582578
_, _, err = s.catalog.CreateCollectionAndSegments(txCtx, collection, segments, 0)
583579
if err != nil {
584-
log.Error("FinishCreateAttachedFunction: failed to create collection", zap.Error(err))
580+
log.Error("FinishCreateAttachedFunction: failed to create output collection", zap.Error(err))
585581
return err
586582
}
587583

0 commit comments

Comments
 (0)