Skip to content

Commit cf18054

Browse files
committed
[ENH]: Modified AttachFunction to do 2PC on a new is_ready column
1 parent ecd3278 commit cf18054

File tree

12 files changed

+545
-258
lines changed

12 files changed

+545
-258
lines changed

chromadb/test/distributed/test_task_api.py

Lines changed: 207 additions & 205 deletions
Large diffs are not rendered by default.

go/pkg/sysdb/coordinator/task.go

Lines changed: 133 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"google.golang.org/grpc/status"
1919
"google.golang.org/protobuf/proto"
2020
"google.golang.org/protobuf/types/known/structpb"
21-
"google.golang.org/protobuf/types/known/timestamppb"
2221
)
2322

2423
// validateAttachedFunctionMatchesRequest validates that an existing attached function's parameters match the request parameters.
@@ -88,10 +87,8 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
8887
}
8988

9089
var attachedFunctionID uuid.UUID = uuid.New()
91-
var nextRun time.Time
92-
var skipPhase2 bool // Flag to skip Phase 2 if task is already fully created
9390

94-
// ===== Phase 1: Create attached function (if needed) =====
91+
// ===== Step 1: Create attached function with is_ready = false =====
9592
err := s.catalog.txImpl.Transaction(ctx, func(txCtx context.Context) error {
9693
// Double-check attached function doesn't exist (race condition protection)
9794
concurrentAttachedFunction, err := s.catalog.metaDomain.AttachedFunctionDb(txCtx).GetByName(req.InputCollectionId, req.Name)
@@ -109,7 +106,7 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
109106
return err
110107
}
111108

112-
// Validation passed, reuse the concurrent attached function's data
109+
// Validation passed, reuse the concurrent attached function ID (idempotent)
113110
attachedFunctionID = concurrentAttachedFunction.ID
114111
// Already created, skip Phase 2
115112
skipPhase2 = true
@@ -137,7 +134,6 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
137134
log.Error("AttachFunction: function not found", zap.String("function_name", req.FunctionName))
138135
return common.ErrFunctionNotFound
139136
}
140-
141137
// Check if input collection exists
142138
collections, err := s.catalog.metaDomain.CollectionDb(txCtx).GetCollections([]string{req.InputCollectionId}, nil, req.TenantId, req.Database, nil, nil, false)
143139
if err != nil {
@@ -191,6 +187,7 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
191187
CreatedAt: now,
192188
UpdatedAt: now,
193189
OldestWrittenNonce: nil,
190+
IsReady: false, // **KEY: Set to false for 3-phase create**
194191
}
195192

196193
err = s.catalog.metaDomain.AttachedFunctionDb(txCtx).Insert(attachedFunction)
@@ -199,7 +196,7 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
199196
return err
200197
}
201198

202-
log.Debug("AttachFunction: Phase 1: attached function created",
199+
log.Debug("AttachFunction: attached function created with is_ready=false",
203200
zap.String("attached_function_id", attachedFunctionID.String()),
204201
zap.String("name", req.Name))
205202
return nil
@@ -209,45 +206,6 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
209206
return nil, err
210207
}
211208

212-
// If function is already created, return immediately (idempotency)
213-
if skipPhase2 {
214-
log.Info("AttachFunction: function already created, skipping Phase 2",
215-
zap.String("attached_function_id", attachedFunctionID.String()))
216-
return &coordinatorpb.AttachFunctionResponse{
217-
Id: attachedFunctionID.String(),
218-
}, nil
219-
}
220-
221-
// ===== Phase 2: Push initial schedule =====
222-
log.Debug("AttachFunction: Phase 2: pushing initial schedule",
223-
zap.String("attached_function_id", attachedFunctionID.String()))
224-
// Push initial schedule to heap service if enabled
225-
if s.heapClient == nil {
226-
return nil, common.ErrHeapServiceNotEnabled
227-
}
228-
229-
// Create schedule for the attached function
230-
schedule := &coordinatorpb.Schedule{
231-
Triggerable: &coordinatorpb.Triggerable{
232-
PartitioningUuid: req.InputCollectionId,
233-
SchedulingUuid: attachedFunctionID.String(),
234-
},
235-
NextScheduled: timestamppb.New(nextRun),
236-
}
237-
238-
err = s.heapClient.Push(ctx, req.InputCollectionId, []*coordinatorpb.Schedule{schedule})
239-
if err != nil {
240-
log.Error("AttachFunction: Phase 2: failed to push schedule to heap service",
241-
zap.Error(err),
242-
zap.String("attached_function_id", attachedFunctionID.String()),
243-
zap.String("collection_id", req.InputCollectionId))
244-
return nil, err
245-
}
246-
247-
log.Debug("AttachFunction: Phase 2: pushed schedule to heap service",
248-
zap.String("attached_function_id", attachedFunctionID.String()),
249-
zap.String("collection_id", req.InputCollectionId))
250-
251209
return &coordinatorpb.AttachFunctionResponse{
252210
Id: attachedFunctionID.String(),
253211
}, nil
@@ -287,6 +245,7 @@ func attachedFunctionToProto(attachedFunction *dbmodel.AttachedFunction, functio
287245
DatabaseId: attachedFunction.DatabaseID,
288246
CreatedAt: uint64(attachedFunction.CreatedAt.UnixMicro()),
289247
UpdatedAt: uint64(attachedFunction.UpdatedAt.UnixMicro()),
248+
IsReady: attachedFunction.IsReady,
290249
}
291250
if attachedFunction.OutputCollectionID != nil {
292251
attachedFunctionProto.OutputCollectionId = attachedFunction.OutputCollectionID
@@ -563,7 +522,6 @@ func (s *Coordinator) DetachFunction(ctx context.Context, req *coordinatorpb.Det
563522
// First get the attached function to check if we need to delete the output collection
564523
attachedFunction, err := s.catalog.metaDomain.AttachedFunctionDb(ctx).GetByID(attachedFunctionID)
565524
if err != nil {
566-
// If attached function is not ready (lowest_live_nonce == NULL), treat it as not found
567525
if errors.Is(err, common.ErrAttachedFunctionNotReady) {
568526
log.Error("DetachFunction: attached function not ready (not initialized)")
569527
return nil, status.Error(codes.NotFound, "attached function not found")
@@ -654,6 +612,134 @@ func (s *Coordinator) GetFunctions(ctx context.Context, req *coordinatorpb.GetFu
654612
}, nil
655613
}
656614

615+
// TODO(tanujnay112): Remove this
616+
func (s *Coordinator) FinishAttachedFunction(ctx context.Context, req *coordinatorpb.FinishAttachedFunctionRequest) (*coordinatorpb.FinishAttachedFunctionResponse, error) {
617+
attachedFunctionID, err := uuid.Parse(req.Id)
618+
if err != nil {
619+
log.Error("FinishAttachedFunction: invalid attached_function_id", zap.Error(err))
620+
return nil, err
621+
}
622+
623+
err = s.catalog.metaDomain.AttachedFunctionDb(ctx).Finish(attachedFunctionID)
624+
if err != nil {
625+
log.Error("FinishAttachedFunction: failed to finish attached function", zap.Error(err))
626+
return nil, err
627+
}
628+
629+
return &coordinatorpb.FinishAttachedFunctionResponse{}, nil
630+
}
631+
632+
// FinishCreateAttachedFunction creates the output collection and sets is_ready to true in a single transaction
633+
func (s *Coordinator) FinishCreateAttachedFunction(ctx context.Context, req *coordinatorpb.FinishCreateAttachedFunctionRequest) (*coordinatorpb.FinishCreateAttachedFunctionResponse, error) {
634+
attachedFunctionID, err := uuid.Parse(req.Id)
635+
if err != nil {
636+
log.Error("FinishCreateAttachedFunction: invalid attached_function_id", zap.Error(err))
637+
return nil, status.Errorf(codes.InvalidArgument, "invalid attached_function_id: %v", err)
638+
}
639+
640+
// Execute all operations in a transaction for atomicity
641+
err = s.catalog.txImpl.Transaction(ctx, func(txCtx context.Context) error {
642+
// 1. Get the attached function to retrieve metadata
643+
attachedFunction, err := s.catalog.metaDomain.AttachedFunctionDb(txCtx).GetByID(attachedFunctionID)
644+
if err != nil {
645+
log.Error("FinishCreateAttachedFunction: failed to get attached function", zap.Error(err))
646+
return err
647+
}
648+
if attachedFunction == nil {
649+
log.Error("FinishCreateAttachedFunction: attached function not found")
650+
return status.Errorf(codes.NotFound, "attached function not found")
651+
}
652+
653+
// 2. Check if output collection already exists (idempotency)
654+
if attachedFunction.OutputCollectionID != nil && *attachedFunction.OutputCollectionID != "" {
655+
log.Info("FinishCreateAttachedFunction: output collection already exists, skipping creation",
656+
zap.String("existing_collection_id", *attachedFunction.OutputCollectionID))
657+
// Still set is_ready in case it wasn't set before
658+
return s.catalog.metaDomain.AttachedFunctionDb(txCtx).SetReady(attachedFunctionID)
659+
}
660+
661+
// 3. Look up database by ID to get its name
662+
database, err := s.catalog.metaDomain.DatabaseDb(txCtx).GetByID(attachedFunction.DatabaseID)
663+
if err != nil {
664+
log.Error("FinishCreateAttachedFunction: failed to get database", zap.Error(err))
665+
return err
666+
}
667+
if database == nil {
668+
log.Error("FinishCreateAttachedFunction: database not found", zap.String("database_id", attachedFunction.DatabaseID), zap.String("tenant_id", attachedFunction.TenantID))
669+
return common.ErrDatabaseNotFound
670+
}
671+
672+
// 4. Generate new collection UUID
673+
collectionID := types.NewUniqueID()
674+
675+
// 5. Create the output collection with segments
676+
dimension := int32(1) // Default dimension for attached function output collections
677+
collection := &model.CreateCollection{
678+
ID: collectionID,
679+
Name: attachedFunction.OutputCollectionName,
680+
ConfigurationJsonStr: "{}", // Empty JSON object for default config
681+
TenantID: attachedFunction.TenantID,
682+
DatabaseName: database.Name,
683+
Dimension: &dimension,
684+
Metadata: nil,
685+
}
686+
687+
// Create segments for the collection (distributed setup)
688+
segments := []*model.Segment{
689+
{
690+
ID: types.NewUniqueID(),
691+
Type: "urn:chroma:segment/vector/hnsw-distributed",
692+
Scope: "VECTOR",
693+
CollectionID: collectionID,
694+
},
695+
{
696+
ID: types.NewUniqueID(),
697+
Type: "urn:chroma:segment/metadata/blockfile",
698+
Scope: "METADATA",
699+
CollectionID: collectionID,
700+
},
701+
{
702+
ID: types.NewUniqueID(),
703+
Type: "urn:chroma:segment/record/blockfile",
704+
Scope: "RECORD",
705+
CollectionID: collectionID,
706+
},
707+
}
708+
709+
_, _, err = s.catalog.CreateCollectionAndSegments(txCtx, collection, segments, 0)
710+
if err != nil {
711+
log.Error("FinishCreateAttachedFunction: failed to create collection", zap.Error(err))
712+
return err
713+
}
714+
715+
// 6. Update attached function with output_collection_id
716+
collectionIDStr := collectionID.String()
717+
err = s.catalog.metaDomain.AttachedFunctionDb(txCtx).UpdateOutputCollectionID(attachedFunctionID, &collectionIDStr)
718+
if err != nil {
719+
log.Error("FinishCreateAttachedFunction: failed to update output collection ID", zap.Error(err))
720+
return err
721+
}
722+
723+
// 7. Set is_ready to true
724+
err = s.catalog.metaDomain.AttachedFunctionDb(txCtx).SetReady(attachedFunctionID)
725+
if err != nil {
726+
log.Error("FinishCreateAttachedFunction: failed to set ready", zap.Error(err))
727+
return err
728+
}
729+
730+
log.Info("FinishCreateAttachedFunction: successfully created output collection and set is_ready=true",
731+
zap.String("attached_function_id", attachedFunctionID.String()),
732+
zap.String("output_collection_id", collectionID.String()))
733+
return nil
734+
})
735+
736+
if err != nil {
737+
return nil, err
738+
}
739+
740+
return &coordinatorpb.FinishCreateAttachedFunctionResponse{}, nil
741+
}
742+
657743
// CleanupExpiredPartialAttachedFunctions finds and soft deletes attached functions that were partially created
658744
// (output_collection_id IS NULL) and are older than the specified max age.
659745
// This is used to clean up attached functions that got stuck during creation.

go/pkg/sysdb/metastore/db/dao/task.go

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ func (s *attachedFunctionDb) GetByName(inputCollectionID string, name string) (*
4848
Where("input_collection_id = ?", inputCollectionID).
4949
Where("name = ?", name).
5050
Where("is_deleted = ?", false).
51+
Where("is_ready = ?", true).
5152
First(&attachedFunction).Error
5253

5354
if err != nil {
@@ -66,6 +67,7 @@ func (s *attachedFunctionDb) GetByID(id uuid.UUID) (*dbmodel.AttachedFunction, e
6667
err := s.db.
6768
Where("id = ?", id).
6869
Where("is_deleted = ?", false).
70+
Where("is_ready = ?", true).
6971
First(&attachedFunction).Error
7072

7173
if err != nil {
@@ -84,6 +86,7 @@ func (s *attachedFunctionDb) GetByCollectionID(inputCollectionID string) ([]*dbm
8486
err := s.db.
8587
Where("input_collection_id = ?", inputCollectionID).
8688
Where("is_deleted = ?", false).
89+
Where("is_ready = ?", true).
8790
Find(&attachedFunctions).Error
8891

8992
if err != nil {
@@ -97,7 +100,9 @@ func (s *attachedFunctionDb) GetByCollectionID(inputCollectionID string) ([]*dbm
97100
func (s *attachedFunctionDb) UpdateOutputCollectionID(id uuid.UUID, outputCollectionID *string) error {
98101
now := time.Now()
99102
result := s.db.Model(&dbmodel.AttachedFunction{}).
100-
Where("id = ? AND is_deleted = false", id).
103+
Where("id = ?", id).
104+
Where("is_deleted = false").
105+
Where("is_ready = true").
101106
Updates(map[string]interface{}{
102107
"output_collection_id": outputCollectionID,
103108
"updated_at": now,
@@ -164,6 +169,30 @@ func (s *attachedFunctionDb) SoftDeleteByID(id uuid.UUID) error {
164169
return nil
165170
}
166171

172+
// SetReady sets is_ready to true for an attached function
173+
func (s *attachedFunctionDb) SetReady(id uuid.UUID) error {
174+
now := time.Now()
175+
result := s.db.Model(&dbmodel.AttachedFunction{}).
176+
Where("id = ?", id).
177+
Where("is_deleted = false").
178+
UpdateColumns(map[string]interface{}{
179+
"is_ready": true,
180+
"updated_at": now,
181+
})
182+
183+
if result.Error != nil {
184+
log.Error("SetReady failed", zap.Error(result.Error), zap.String("id", id.String()))
185+
return result.Error
186+
}
187+
188+
if result.RowsAffected == 0 {
189+
log.Error("SetReady: no rows affected - attached function not found", zap.String("id", id.String()))
190+
return common.ErrAttachedFunctionNotFound
191+
}
192+
193+
return nil
194+
}
195+
167196
// Finish marks work as complete
168197
func (s *attachedFunctionDb) Finish(id uuid.UUID) error {
169198
now := time.Now()
@@ -332,3 +361,29 @@ func (s *attachedFunctionDb) HardDeleteAttachedFunction(id uuid.UUID) error {
332361

333362
return nil
334363
}
364+
365+
func (s *attachedFunctionDb) UpdateCompletionOffset(id uuid.UUID, completionOffset int64) error {
366+
result := s.db.Model(&dbmodel.AttachedFunction{}).
367+
Where("id = ? AND is_deleted = ?", id, false).
368+
Update("completion_offset", completionOffset)
369+
370+
if result.Error != nil {
371+
log.Error("UpdateCompletionOffset failed",
372+
zap.Error(result.Error),
373+
zap.String("id", id.String()),
374+
zap.Int64("completion_offset", completionOffset))
375+
return result.Error
376+
}
377+
378+
if result.RowsAffected == 0 {
379+
log.Warn("UpdateCompletionOffset: no rows affected (attached function not found)",
380+
zap.String("id", id.String()))
381+
return gorm.ErrRecordNotFound
382+
}
383+
384+
log.Debug("UpdateCompletionOffset succeeded",
385+
zap.String("id", id.String()),
386+
zap.Int64("completion_offset", completionOffset))
387+
388+
return nil
389+
}

go/pkg/sysdb/metastore/db/dbmodel/task.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type AttachedFunction struct {
2626
UpdatedAt time.Time `gorm:"column:updated_at;type:timestamp;not null;default:CURRENT_TIMESTAMP"`
2727
GlobalParent *uuid.UUID `gorm:"column:global_parent;type:uuid;default:null"`
2828
OldestWrittenNonce *uuid.UUID `gorm:"column:oldest_written_nonce;type:uuid;default:null"`
29+
IsReady bool `gorm:"column:is_ready;type:boolean;not null;default:false"`
2930
}
3031

3132
func (v AttachedFunction) TableName() string {
@@ -38,6 +39,9 @@ type IAttachedFunctionDb interface {
3839
GetByName(inputCollectionID string, name string) (*AttachedFunction, error)
3940
GetByID(id uuid.UUID) (*AttachedFunction, error)
4041
GetByCollectionID(inputCollectionID string) ([]*AttachedFunction, error)
42+
UpdateCompletionOffset(id uuid.UUID, completionOffset int64) error
43+
SetReady(id uuid.UUID) error
44+
Finish(id uuid.UUID) error
4145
UpdateOutputCollectionID(id uuid.UUID, outputCollectionID *string) error
4246
SoftDelete(inputCollectionID string, name string) error
4347
SoftDeleteByID(id uuid.UUID) error
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- Add is_ready column to attached_functions table to track initialization status
2+
ALTER TABLE "public"."attached_functions" ADD COLUMN "is_ready" boolean NOT NULL DEFAULT false;

go/pkg/sysdb/metastore/db/migrations/atlas.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:bIntlbZeBztzLttsjmX5fudfrCbxOd/mgF4fJ91j21s=
1+
h1:TI+7Y8LdtEPUUsspBjfrE9KdjS8XS8SLfKUoSJhmhEI=
22
20240313233558.sql h1:Gv0TiSYsqGoOZ2T2IWvX4BOasauxool8PrBOIjmmIdg=
33
20240321194713.sql h1:kVkNpqSFhrXGVGFFvL7JdK3Bw31twFcEhI6A0oCFCkg=
44
20240327075032.sql h1:nlr2J74XRU8erzHnKJgMr/tKqJxw9+R6RiiEBuvuzgo=
@@ -25,3 +25,4 @@ h1:bIntlbZeBztzLttsjmX5fudfrCbxOd/mgF4fJ91j21s=
2525
20251029223300.sql h1:7ptfELfwHoDLLGRz6hLRYAq3OZyzNqa73U5DjlrN/YE=
2626
20251114125442.sql h1:oRHN+AO+xYnYa3aF0QzSa3T/TRi8ETCydp2sDT/nSnI=
2727
20251114134400.sql h1:N30qnVNjR+d4RoArJ11YrixyIsNODxqXFpgiREEhczs=
28+
20251116154842.sql h1:G0qy4MPDayH+Y9/Dm9PS2xwvyLt+nmIw90uJTzZSJUM=

0 commit comments

Comments
 (0)