Skip to content

Commit 9bd6743

Browse files
committed
[ENH]: Execute task with no backfill or incremental
1 parent cf18054 commit 9bd6743

31 files changed

+3375
-1148
lines changed

go/pkg/sysdb/coordinator/coordinator.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/chroma-core/chroma/go/pkg/sysdb/metastore/db/dbmodel"
1414
s3metastore "github.com/chroma-core/chroma/go/pkg/sysdb/metastore/s3"
1515
"github.com/chroma-core/chroma/go/pkg/types"
16+
"github.com/google/uuid"
1617
"github.com/pingcap/log"
1718
"go.uber.org/zap"
1819
)
@@ -286,6 +287,24 @@ func (s *Coordinator) FlushCollectionCompaction(ctx context.Context, flushCollec
286287
return s.catalog.FlushCollectionCompaction(ctx, flushCollectionCompaction)
287288
}
288289

290+
func (s *Coordinator) FlushCollectionCompactionAndAttachedFunction(
291+
ctx context.Context,
292+
flushCollectionCompaction *model.FlushCollectionCompaction,
293+
attachedFunctionID uuid.UUID,
294+
completionOffset int64,
295+
) (*model.FlushCollectionInfo, error) {
296+
return s.catalog.FlushCollectionCompactionAndAttachedFunction(ctx, flushCollectionCompaction, attachedFunctionID, completionOffset)
297+
}
298+
299+
func (s *Coordinator) FlushCollectionCompactionAndAttachedFunctionExtended(
300+
ctx context.Context,
301+
collectionCompactions []*model.FlushCollectionCompaction,
302+
attachedFunctionID uuid.UUID,
303+
completionOffset int64,
304+
) (*model.ExtendedFlushCollectionInfo, error) {
305+
return s.catalog.FlushCollectionCompactionAndAttachedFunctionExtended(ctx, collectionCompactions, attachedFunctionID, completionOffset)
306+
}
307+
289308
func (s *Coordinator) ListCollectionsToGc(ctx context.Context, cutoffTimeSecs *uint64, limit *uint64, tenantID *string, minVersionsIfAlive *uint64) ([]*model.CollectionToGc, error) {
290309
return s.catalog.ListCollectionsToGc(ctx, cutoffTimeSecs, limit, tenantID, minVersionsIfAlive)
291310
}

go/pkg/sysdb/coordinator/create_task_test.go

Lines changed: 178 additions & 63 deletions
Large diffs are not rendered by default.

go/pkg/sysdb/coordinator/heap_client_integration_test.go

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,8 @@ func (suite *HeapClientIntegrationTestSuite) TestAttachFunctionPushesScheduleToH
176176
})
177177
suite.NoError(err, "Should attached function successfully")
178178
suite.NotNil(response)
179-
suite.NotEmpty(response.Id, "Attached function ID should be returned")
179+
suite.NotNil(response.AttachedFunction)
180+
suite.NotEmpty(response.AttachedFunction.Id, "Attached function ID should be returned")
180181

181182
// Get updated heap summary
182183
updatedSummary, err := suite.heapClient.Summary(ctx, &coordinatorpb.HeapSummaryRequest{})
@@ -263,9 +264,15 @@ func (suite *HeapClientIntegrationTestSuite) TestPartialTaskRecovery_HybridAppro
263264
return
264265
}
265266
suite.NotNil(taskResp)
266-
// TODO: Uncomment after proto regeneration
267-
// originalTaskID := taskResp.Id
268-
suite.T().Skip("Test requires proto regeneration for AttachFunctionResponse.Id field")
267+
suite.NotNil(taskResp.AttachedFunction)
268+
originalTaskID := taskResp.AttachedFunction.Id
269+
suite.T().Logf("Created fully initialized task: %s", originalTaskID)
270+
271+
// STEP 2: Directly UPDATE database to make task partial (simulate Phase 3 failure)
272+
// Set lowest_live_nonce = NULL to simulate the task being stuck
273+
_, err = db.Exec(`UPDATE public.tasks SET lowest_live_nonce = NULL WHERE task_id = $1`, originalTaskID)
274+
suite.NoError(err, "Should be able to corrupt task in database")
275+
suite.T().Logf("Made task partial by setting lowest_live_nonce = NULL")
269276

270277
// STEP 3: Try to create task with same name but DIFFERENT parameters → should fail
271278
_, err = suite.sysdbClient.AttachFunction(ctx, &coordinatorpb.AttachFunctionRequest{
@@ -358,8 +365,8 @@ func (suite *HeapClientIntegrationTestSuite) TestPartialTaskCleanup_ThenRecreate
358365
return
359366
}
360367
suite.NotNil(taskResp)
361-
// TODO: Uncomment after proto regeneration
362-
suite.T().Skip("Test requires proto regeneration for AttachFunctionResponse.Id field")
368+
suite.NotNil(taskResp.AttachedFunction)
369+
suite.T().Logf("Created task: %s", taskResp.AttachedFunction.Id)
363370

364371
// STEP 2: Call CleanupExpiredPartialAttachedFunctions (with short timeout to test it doesn't affect complete tasks)
365372
cleanupResp, err := suite.sysdbClient.CleanupExpiredPartialAttachedFunctions(ctx, &coordinatorpb.CleanupExpiredPartialAttachedFunctionsRequest{
@@ -377,12 +384,12 @@ func (suite *HeapClientIntegrationTestSuite) TestPartialTaskCleanup_ThenRecreate
377384
})
378385
suite.NoError(err, "Task should still exist after cleanup")
379386
suite.NotNil(getResp)
380-
suite.Equal(taskResp.Id, getResp.AttachedFunction.Id)
387+
suite.Equal(taskResp.AttachedFunction.Id, getResp.AttachedFunction.Id)
381388
suite.T().Logf("Task still exists after cleanup: %s", getResp.AttachedFunction.Id)
382389

383390
// STEP 4: Delete the task
384391
_, err = suite.sysdbClient.DetachFunction(ctx, &coordinatorpb.DetachFunctionRequest{
385-
AttachedFunctionId: taskResp.Id,
392+
AttachedFunctionId: taskResp.AttachedFunction.Id,
386393
DeleteOutput: true,
387394
})
388395
suite.NoError(err, "Should delete task")
@@ -399,8 +406,9 @@ func (suite *HeapClientIntegrationTestSuite) TestPartialTaskCleanup_ThenRecreate
399406
})
400407
suite.NoError(err, "Should be able to recreate task after deletion")
401408
suite.NotNil(taskResp2)
402-
suite.NotEqual(taskResp.Id, taskResp2.Id, "New task should have different ID")
403-
suite.T().Logf("Successfully recreated task: %s", taskResp2.Id)
409+
suite.NotNil(taskResp2.AttachedFunction)
410+
suite.NotEqual(taskResp.AttachedFunction.Id, taskResp2.AttachedFunction.Id, "New task should have different ID")
411+
suite.T().Logf("Successfully recreated task: %s", taskResp2.AttachedFunction.Id)
404412
}
405413

406414
func TestHeapClientIntegrationSuite(t *testing.T) {

go/pkg/sysdb/coordinator/model/collection.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,10 @@ type FlushCollectionInfo struct {
102102
AttachedFunctionCompletionOffset *int64
103103
}
104104

105+
type ExtendedFlushCollectionInfo struct {
106+
Collections []*FlushCollectionInfo
107+
}
108+
105109
func FilterCollection(collection *Collection, collectionID types.UniqueID, collectionName *string) bool {
106110
if collectionID != types.NilUniqueID() && collectionID != collection.ID {
107111
return false

go/pkg/sysdb/coordinator/table_catalog.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1736,6 +1736,119 @@ func (tc *Catalog) FlushCollectionCompaction(ctx context.Context, flushCollectio
17361736
return flushCollectionInfo, nil
17371737
}
17381738

1739+
// FlushCollectionCompactionAndAttachedFunction atomically updates collection compaction data and attached function completion offset.
1740+
// TODO(tanujnay112): Deprecate this
1741+
func (tc *Catalog) FlushCollectionCompactionAndAttachedFunction(
1742+
ctx context.Context,
1743+
flushCollectionCompaction *model.FlushCollectionCompaction,
1744+
attachedFunctionID uuid.UUID,
1745+
completionOffset int64,
1746+
) (*model.FlushCollectionInfo, error) {
1747+
if !tc.versionFileEnabled {
1748+
// Attached-function-based compactions are only supported with versioned collections
1749+
log.Error("FlushCollectionCompactionAndAttachedFunction is only supported for versioned collections")
1750+
return nil, errors.New("attached-function-based compaction requires versioned collections")
1751+
}
1752+
1753+
var flushCollectionInfo *model.FlushCollectionInfo
1754+
1755+
err := tc.txImpl.Transaction(ctx, func(txCtx context.Context) error {
1756+
var err error
1757+
// Get the transaction from context to pass to FlushCollectionCompactionForVersionedCollection
1758+
tx := dbcore.GetDB(txCtx)
1759+
flushCollectionInfo, err = tc.FlushCollectionCompactionForVersionedCollection(txCtx, flushCollectionCompaction, tx)
1760+
if err != nil {
1761+
return err
1762+
}
1763+
1764+
// Update ONLY completion_offset
1765+
err = tc.metaDomain.AttachedFunctionDb(txCtx).UpdateCompletionOffset(attachedFunctionID, completionOffset)
1766+
if err != nil {
1767+
return err
1768+
}
1769+
1770+
return nil
1771+
})
1772+
1773+
if err != nil {
1774+
return nil, err
1775+
}
1776+
1777+
// Populate attached function fields with authoritative values from database
1778+
flushCollectionInfo.AttachedFunctionCompletionOffset = &completionOffset
1779+
1780+
log.Info("FlushCollectionCompactionAndAttachedFunction",
1781+
zap.String("collection_id", flushCollectionCompaction.ID.String()),
1782+
zap.String("attached_function_id", attachedFunctionID.String()),
1783+
zap.Int64("completion_offset", completionOffset))
1784+
1785+
return flushCollectionInfo, nil
1786+
}
1787+
1788+
// FlushCollectionCompactionAndAttachedFunctionExtended atomically updates multiple collection compaction data
1789+
// and attached function completion offset in a single transaction.
1790+
func (tc *Catalog) FlushCollectionCompactionAndAttachedFunctionExtended(
1791+
ctx context.Context,
1792+
collectionCompactions []*model.FlushCollectionCompaction,
1793+
attachedFunctionID uuid.UUID,
1794+
completionOffset int64,
1795+
) (*model.ExtendedFlushCollectionInfo, error) {
1796+
if !tc.versionFileEnabled {
1797+
// Attached-function-based compactions are only supported with versioned collections
1798+
log.Error("FlushCollectionCompactionAndAttachedFunctionExtended is only supported for versioned collections")
1799+
return nil, errors.New("attached-function-based compaction requires versioned collections")
1800+
}
1801+
1802+
if len(collectionCompactions) == 0 {
1803+
return nil, errors.New("at least one collection compaction is required")
1804+
}
1805+
1806+
flushInfos := make([]*model.FlushCollectionInfo, 0, len(collectionCompactions))
1807+
1808+
err := tc.txImpl.Transaction(ctx, func(txCtx context.Context) error {
1809+
var err error
1810+
// Get the transaction from context to pass to FlushCollectionCompactionForVersionedCollection
1811+
tx := dbcore.GetDB(txCtx)
1812+
1813+
// Handle all collection compactions
1814+
for _, collectionCompaction := range collectionCompactions {
1815+
log.Info("FlushCollectionCompactionAndAttachedFunctionExtended", zap.String("collection_id", collectionCompaction.ID.String()))
1816+
flushInfo, err := tc.FlushCollectionCompactionForVersionedCollection(txCtx, collectionCompaction, tx)
1817+
if err != nil {
1818+
return err
1819+
}
1820+
flushInfos = append(flushInfos, flushInfo)
1821+
}
1822+
1823+
err = tc.metaDomain.AttachedFunctionDb(txCtx).UpdateCompletionOffset(attachedFunctionID, completionOffset)
1824+
if err != nil {
1825+
return err
1826+
}
1827+
1828+
return nil
1829+
})
1830+
1831+
if err != nil {
1832+
return nil, err
1833+
}
1834+
1835+
// Populate attached function fields with authoritative values from database
1836+
for _, flushInfo := range flushInfos {
1837+
flushInfo.AttachedFunctionCompletionOffset = &completionOffset
1838+
}
1839+
1840+
// Log with first collection ID (typically the output collection)
1841+
log.Info("FlushCollectionCompactionAndAttachedFunctionExtended",
1842+
zap.String("first_collection_id", collectionCompactions[0].ID.String()),
1843+
zap.Int("collection_count", len(collectionCompactions)),
1844+
zap.String("attached_function_id", attachedFunctionID.String()),
1845+
zap.Int64("completion_offset", completionOffset))
1846+
1847+
return &model.ExtendedFlushCollectionInfo{
1848+
Collections: flushInfos,
1849+
}, nil
1850+
}
1851+
17391852
func (tc *Catalog) validateVersionFile(versionFile *coordinatorpb.CollectionVersionFile, collectionID string, version int64) error {
17401853
if versionFile.GetCollectionInfoImmutable().GetCollectionId() != collectionID {
17411854
log.Error("collection id mismatch", zap.String("collection_id", collectionID), zap.String("version_file_collection_id", versionFile.GetCollectionInfoImmutable().GetCollectionId()))

go/pkg/sysdb/coordinator/task.go

Lines changed: 57 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func (s *Coordinator) validateAttachedFunctionMatchesRequest(ctx context.Context
7676
return nil
7777
}
7878

79-
// AttachFunction creates a new attached function in the database
79+
// AttachFunction creates an output collection and attached function in a single transaction
8080
func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.AttachFunctionRequest) (*coordinatorpb.AttachFunctionResponse, error) {
8181
log := log.With(zap.String("method", "AttachFunction"))
8282

@@ -108,8 +108,6 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
108108

109109
// Validation passed, reuse the concurrent attached function ID (idempotent)
110110
attachedFunctionID = concurrentAttachedFunction.ID
111-
// Already created, skip Phase 2
112-
skipPhase2 = true
113111
return nil
114112
}
115113

@@ -145,16 +143,59 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
145143
return common.ErrCollectionNotFound
146144
}
147145

148-
// Check if output collection already exists
149-
outputCollectionName := req.OutputCollectionName
150-
existingOutputCollections, err := s.catalog.metaDomain.CollectionDb(txCtx).GetCollections(nil, &outputCollectionName, req.TenantId, req.Database, nil, nil, false)
146+
// Create output collection with segments
147+
outputCollectionID := types.NewUniqueID()
148+
149+
// Set a default dimension to ensure segment writers can be initialized
150+
dimension := int32(1) // Default dimension for attached function output collections
151+
152+
createCollection := &model.CreateCollection{
153+
ID: outputCollectionID,
154+
Name: req.OutputCollectionName,
155+
ConfigurationJsonStr: `{"hnsw": {"space": "cosine", "M": 16, "ef_construction": 64}}`,
156+
DatabaseName: req.Database,
157+
TenantID: req.TenantId,
158+
GetOrCreate: false, // We want to fail if it already exists
159+
Dimension: &dimension,
160+
Metadata: nil,
161+
}
162+
163+
// Create segments for the collection (distributed setup)
164+
segments := []*model.Segment{
165+
{
166+
ID: types.NewUniqueID(),
167+
Type: "urn:chroma:segment/vector/hnsw-distributed",
168+
Scope: "VECTOR",
169+
CollectionID: outputCollectionID,
170+
},
171+
{
172+
ID: types.NewUniqueID(),
173+
Type: "urn:chroma:segment/metadata/blockfile",
174+
Scope: "METADATA",
175+
CollectionID: outputCollectionID,
176+
},
177+
{
178+
ID: types.NewUniqueID(),
179+
Type: "urn:chroma:segment/record/blockfile",
180+
Scope: "RECORD",
181+
CollectionID: outputCollectionID,
182+
},
183+
}
184+
185+
// Create output collection and segments directly to avoid nested transaction
186+
outputCollection, _, err := s.catalog.createCollectionImpl(txCtx, createCollection, "", 0)
151187
if err != nil {
152-
log.Error("AttachFunction: failed to check output collection", zap.Error(err))
188+
log.Error("AttachFunction: failed to create output collection", zap.Error(err))
153189
return err
154190
}
155-
if len(existingOutputCollections) > 0 {
156-
log.Error("AttachFunction: output collection already exists")
157-
return common.ErrCollectionUniqueConstraintViolation
191+
192+
// Create segments for the collection
193+
for _, segment := range segments {
194+
_, err := s.catalog.createSegmentImpl(txCtx, segment, 0)
195+
if err != nil {
196+
log.Error("AttachFunction: failed to create segment", zap.Error(err))
197+
return err
198+
}
158199
}
159200

160201
// Serialize params
@@ -170,6 +211,7 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
170211
paramsJSON = "{}"
171212
}
172213

214+
// Create attached function
173215
now := time.Now()
174216
attachedFunction := &dbmodel.AttachedFunction{
175217
ID: attachedFunctionID,
@@ -178,6 +220,7 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
178220
DatabaseID: databases[0].ID,
179221
InputCollectionID: req.InputCollectionId,
180222
OutputCollectionName: req.OutputCollectionName,
223+
OutputCollectionID: &[]string{outputCollection.ID.String()}[0],
181224
FunctionID: function.ID,
182225
FunctionParams: paramsJSON,
183226
CompletionOffset: 0,
@@ -198,6 +241,7 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
198241

199242
log.Debug("AttachFunction: attached function created with is_ready=false",
200243
zap.String("attached_function_id", attachedFunctionID.String()),
244+
zap.String("output_collection_name", req.OutputCollectionName),
201245
zap.String("name", req.Name))
202246
return nil
203247
})
@@ -207,7 +251,9 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
207251
}
208252

209253
return &coordinatorpb.AttachFunctionResponse{
210-
Id: attachedFunctionID.String(),
254+
AttachedFunction: &coordinatorpb.AttachedFunction{
255+
Id: attachedFunctionID.String(),
256+
},
211257
}, nil
212258
}
213259

0 commit comments

Comments
 (0)