@@ -18,7 +18,6 @@ import (
1818 "google.golang.org/grpc/status"
1919 "google.golang.org/protobuf/proto"
2020 "google.golang.org/protobuf/types/known/structpb"
21- "google.golang.org/protobuf/types/known/timestamppb"
2221)
2322
2423// validateAttachedFunctionMatchesRequest validates that an existing attached function's parameters match the request parameters.
@@ -88,10 +87,8 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
8887 }
8988
9089 var attachedFunctionID uuid.UUID = uuid .New ()
91- var nextRun time.Time
92- var skipPhase2 bool // Flag to skip Phase 2 if task is already fully created
9390
94- // ===== Phase 1: Create attached function (if needed) =====
91+ // ===== Step 1: Create attached function with is_ready = false =====
9592 err := s .catalog .txImpl .Transaction (ctx , func (txCtx context.Context ) error {
9693 // Double-check attached function doesn't exist (race condition protection)
9794 concurrentAttachedFunction , err := s .catalog .metaDomain .AttachedFunctionDb (txCtx ).GetByName (req .InputCollectionId , req .Name )
@@ -109,10 +106,8 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
109106 return err
110107 }
111108
112- // Validation passed, reuse the concurrent attached function's data
109+ // Validation passed, reuse the concurrent attached function ID (idempotent)
113110 attachedFunctionID = concurrentAttachedFunction .ID
114- // Already created, skip Phase 2
115- skipPhase2 = true
116111 return nil
117112 }
118113
@@ -137,7 +132,6 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
137132 log .Error ("AttachFunction: function not found" , zap .String ("function_name" , req .FunctionName ))
138133 return common .ErrFunctionNotFound
139134 }
140-
141135 // Check if input collection exists
142136 collections , err := s .catalog .metaDomain .CollectionDb (txCtx ).GetCollections ([]string {req .InputCollectionId }, nil , req .TenantId , req .Database , nil , nil , false )
143137 if err != nil {
@@ -191,6 +185,7 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
191185 CreatedAt : now ,
192186 UpdatedAt : now ,
193187 OldestWrittenNonce : nil ,
188+ IsReady : false , // **KEY: Set to false for 3-phase create**
194189 }
195190
196191 err = s .catalog .metaDomain .AttachedFunctionDb (txCtx ).Insert (attachedFunction )
@@ -199,7 +194,7 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
199194 return err
200195 }
201196
202- log .Debug ("AttachFunction: Phase 1: attached function created" ,
197+ log .Debug ("AttachFunction: attached function created with is_ready=false " ,
203198 zap .String ("attached_function_id" , attachedFunctionID .String ()),
204199 zap .String ("name" , req .Name ))
205200 return nil
@@ -209,45 +204,6 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
209204 return nil , err
210205 }
211206
212- // If function is already created, return immediately (idempotency)
213- if skipPhase2 {
214- log .Info ("AttachFunction: function already created, skipping Phase 2" ,
215- zap .String ("attached_function_id" , attachedFunctionID .String ()))
216- return & coordinatorpb.AttachFunctionResponse {
217- Id : attachedFunctionID .String (),
218- }, nil
219- }
220-
221- // ===== Phase 2: Push initial schedule =====
222- log .Debug ("AttachFunction: Phase 2: pushing initial schedule" ,
223- zap .String ("attached_function_id" , attachedFunctionID .String ()))
224- // Push initial schedule to heap service if enabled
225- if s .heapClient == nil {
226- return nil , common .ErrHeapServiceNotEnabled
227- }
228-
229- // Create schedule for the attached function
230- schedule := & coordinatorpb.Schedule {
231- Triggerable : & coordinatorpb.Triggerable {
232- PartitioningUuid : req .InputCollectionId ,
233- SchedulingUuid : attachedFunctionID .String (),
234- },
235- NextScheduled : timestamppb .New (nextRun ),
236- }
237-
238- err = s .heapClient .Push (ctx , req .InputCollectionId , []* coordinatorpb.Schedule {schedule })
239- if err != nil {
240- log .Error ("AttachFunction: Phase 2: failed to push schedule to heap service" ,
241- zap .Error (err ),
242- zap .String ("attached_function_id" , attachedFunctionID .String ()),
243- zap .String ("collection_id" , req .InputCollectionId ))
244- return nil , err
245- }
246-
247- log .Debug ("AttachFunction: Phase 2: pushed schedule to heap service" ,
248- zap .String ("attached_function_id" , attachedFunctionID .String ()),
249- zap .String ("collection_id" , req .InputCollectionId ))
250-
251207 return & coordinatorpb.AttachFunctionResponse {
252208 Id : attachedFunctionID .String (),
253209 }, nil
@@ -287,6 +243,7 @@ func attachedFunctionToProto(attachedFunction *dbmodel.AttachedFunction, functio
287243 DatabaseId : attachedFunction .DatabaseID ,
288244 CreatedAt : uint64 (attachedFunction .CreatedAt .UnixMicro ()),
289245 UpdatedAt : uint64 (attachedFunction .UpdatedAt .UnixMicro ()),
246+ IsReady : attachedFunction .IsReady ,
290247 }
291248 if attachedFunction .OutputCollectionID != nil {
292249 attachedFunctionProto .OutputCollectionId = attachedFunction .OutputCollectionID
@@ -563,7 +520,6 @@ func (s *Coordinator) DetachFunction(ctx context.Context, req *coordinatorpb.Det
563520 // First get the attached function to check if we need to delete the output collection
564521 attachedFunction , err := s .catalog .metaDomain .AttachedFunctionDb (ctx ).GetByID (attachedFunctionID )
565522 if err != nil {
566- // If attached function is not ready (lowest_live_nonce == NULL), treat it as not found
567523 if errors .Is (err , common .ErrAttachedFunctionNotReady ) {
568524 log .Error ("DetachFunction: attached function not ready (not initialized)" )
569525 return nil , status .Error (codes .NotFound , "attached function not found" )
@@ -654,6 +610,134 @@ func (s *Coordinator) GetFunctions(ctx context.Context, req *coordinatorpb.GetFu
654610 }, nil
655611}
656612
613+ // TODO(tanujnay112): Remove this
614+ func (s * Coordinator ) FinishAttachedFunction (ctx context.Context , req * coordinatorpb.FinishAttachedFunctionRequest ) (* coordinatorpb.FinishAttachedFunctionResponse , error ) {
615+ attachedFunctionID , err := uuid .Parse (req .Id )
616+ if err != nil {
617+ log .Error ("FinishAttachedFunction: invalid attached_function_id" , zap .Error (err ))
618+ return nil , err
619+ }
620+
621+ err = s .catalog .metaDomain .AttachedFunctionDb (ctx ).Finish (attachedFunctionID )
622+ if err != nil {
623+ log .Error ("FinishAttachedFunction: failed to finish attached function" , zap .Error (err ))
624+ return nil , err
625+ }
626+
627+ return & coordinatorpb.FinishAttachedFunctionResponse {}, nil
628+ }
629+
630+ // FinishCreateAttachedFunction creates the output collection and sets is_ready to true in a single transaction
631+ func (s * Coordinator ) FinishCreateAttachedFunction (ctx context.Context , req * coordinatorpb.FinishCreateAttachedFunctionRequest ) (* coordinatorpb.FinishCreateAttachedFunctionResponse , error ) {
632+ attachedFunctionID , err := uuid .Parse (req .Id )
633+ if err != nil {
634+ log .Error ("FinishCreateAttachedFunction: invalid attached_function_id" , zap .Error (err ))
635+ return nil , status .Errorf (codes .InvalidArgument , "invalid attached_function_id: %v" , err )
636+ }
637+
638+ // Execute all operations in a transaction for atomicity
639+ err = s .catalog .txImpl .Transaction (ctx , func (txCtx context.Context ) error {
640+ // 1. Get the attached function to retrieve metadata
641+ attachedFunction , err := s .catalog .metaDomain .AttachedFunctionDb (txCtx ).GetByID (attachedFunctionID )
642+ if err != nil {
643+ log .Error ("FinishCreateAttachedFunction: failed to get attached function" , zap .Error (err ))
644+ return err
645+ }
646+ if attachedFunction == nil {
647+ log .Error ("FinishCreateAttachedFunction: attached function not found" )
648+ return status .Errorf (codes .NotFound , "attached function not found" )
649+ }
650+
651+ // 2. Check if output collection already exists (idempotency)
652+ if attachedFunction .OutputCollectionID != nil && * attachedFunction .OutputCollectionID != "" {
653+ log .Info ("FinishCreateAttachedFunction: output collection already exists, skipping creation" ,
654+ zap .String ("existing_collection_id" , * attachedFunction .OutputCollectionID ))
655+ // Still set is_ready in case it wasn't set before
656+ return s .catalog .metaDomain .AttachedFunctionDb (txCtx ).SetReady (attachedFunctionID )
657+ }
658+
659+ // 3. Look up database by ID to get its name
660+ database , err := s .catalog .metaDomain .DatabaseDb (txCtx ).GetByID (attachedFunction .DatabaseID )
661+ if err != nil {
662+ log .Error ("FinishCreateAttachedFunction: failed to get database" , zap .Error (err ))
663+ return err
664+ }
665+ if database == nil {
666+ log .Error ("FinishCreateAttachedFunction: database not found" , zap .String ("database_id" , attachedFunction .DatabaseID ), zap .String ("tenant_id" , attachedFunction .TenantID ))
667+ return common .ErrDatabaseNotFound
668+ }
669+
670+ // 4. Generate new collection UUID
671+ collectionID := types .NewUniqueID ()
672+
673+ // 5. Create the output collection with segments
674+ dimension := int32 (1 ) // Default dimension for attached function output collections
675+ collection := & model.CreateCollection {
676+ ID : collectionID ,
677+ Name : attachedFunction .OutputCollectionName ,
678+ ConfigurationJsonStr : "{}" , // Empty JSON object for default config
679+ TenantID : attachedFunction .TenantID ,
680+ DatabaseName : database .Name ,
681+ Dimension : & dimension ,
682+ Metadata : nil ,
683+ }
684+
685+ // Create segments for the collection (distributed setup)
686+ segments := []* model.Segment {
687+ {
688+ ID : types .NewUniqueID (),
689+ Type : "urn:chroma:segment/vector/hnsw-distributed" ,
690+ Scope : "VECTOR" ,
691+ CollectionID : collectionID ,
692+ },
693+ {
694+ ID : types .NewUniqueID (),
695+ Type : "urn:chroma:segment/metadata/blockfile" ,
696+ Scope : "METADATA" ,
697+ CollectionID : collectionID ,
698+ },
699+ {
700+ ID : types .NewUniqueID (),
701+ Type : "urn:chroma:segment/record/blockfile" ,
702+ Scope : "RECORD" ,
703+ CollectionID : collectionID ,
704+ },
705+ }
706+
707+ _ , _ , err = s .catalog .CreateCollectionAndSegments (txCtx , collection , segments , 0 )
708+ if err != nil {
709+ log .Error ("FinishCreateAttachedFunction: failed to create collection" , zap .Error (err ))
710+ return err
711+ }
712+
713+ // 6. Update attached function with output_collection_id
714+ collectionIDStr := collectionID .String ()
715+ err = s .catalog .metaDomain .AttachedFunctionDb (txCtx ).UpdateOutputCollectionID (attachedFunctionID , & collectionIDStr )
716+ if err != nil {
717+ log .Error ("FinishCreateAttachedFunction: failed to update output collection ID" , zap .Error (err ))
718+ return err
719+ }
720+
721+ // 7. Set is_ready to true
722+ err = s .catalog .metaDomain .AttachedFunctionDb (txCtx ).SetReady (attachedFunctionID )
723+ if err != nil {
724+ log .Error ("FinishCreateAttachedFunction: failed to set ready" , zap .Error (err ))
725+ return err
726+ }
727+
728+ log .Info ("FinishCreateAttachedFunction: successfully created output collection and set is_ready=true" ,
729+ zap .String ("attached_function_id" , attachedFunctionID .String ()),
730+ zap .String ("output_collection_id" , collectionID .String ()))
731+ return nil
732+ })
733+
734+ if err != nil {
735+ return nil , err
736+ }
737+
738+ return & coordinatorpb.FinishCreateAttachedFunctionResponse {}, nil
739+ }
740+
657741// CleanupExpiredPartialAttachedFunctions finds and soft deletes attached functions that were partially created
658742// (output_collection_id IS NULL) and are older than the specified max age.
659743// This is used to clean up attached functions that got stuck during creation.
0 commit comments