Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/bits-and-blooms/bitset v1.24.2
github.com/blevesearch/bleve_index_api v1.3.11
github.com/blevesearch/geo v0.2.5
github.com/blevesearch/go-faiss v1.0.34
github.com/blevesearch/go-faiss v1.0.36
github.com/blevesearch/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/blevesearch/go-porterstemmer v1.0.3
github.com/blevesearch/goleveldb v1.0.1
Expand All @@ -25,7 +25,7 @@ require (
github.com/blevesearch/zapx/v14 v14.4.3
github.com/blevesearch/zapx/v15 v15.4.3
github.com/blevesearch/zapx/v16 v16.3.4
github.com/blevesearch/zapx/v17 v17.0.12-0.20260421145725-c120519962c1
github.com/blevesearch/zapx/v17 v17.1.1
github.com/couchbase/moss v0.2.0
github.com/spf13/cobra v1.10.2
go.etcd.io/bbolt v1.4.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ github.com/blevesearch/bleve_index_api v1.3.11 h1:x29vbV8OjWfLcrDVd7Lr1q+BkLNS0J
github.com/blevesearch/bleve_index_api v1.3.11/go.mod h1:xvd48t5XMeeioWQ5/jZvgLrV98flT2rdvEJ3l/ki4Ko=
github.com/blevesearch/geo v0.2.5 h1:yJg9FX1oRwLnjXSXF+ECHfXFTF4diF02Ca/qUGVjJhE=
github.com/blevesearch/geo v0.2.5/go.mod h1:Jhq7WE2K6mJTx1xS44M2pUO6Io+wjCSHh1+co3YOgH4=
github.com/blevesearch/go-faiss v1.0.34 h1:cFE1jRkjJfk7qMMsqXBqGEivbYQz/tjSf5yyoH50xbY=
github.com/blevesearch/go-faiss v1.0.34/go.mod h1:OMGQwOaRRYxrmeNdMrXJPvVx8gBnvE5RYrr0BahNnkk=
github.com/blevesearch/go-faiss v1.0.36 h1:qrP6LZX7xrQQ3pOF2B+t+5E+brlOzwQUzZrGLHz4IeU=
github.com/blevesearch/go-faiss v1.0.36/go.mod h1:OMGQwOaRRYxrmeNdMrXJPvVx8gBnvE5RYrr0BahNnkk=
github.com/blevesearch/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:kDy+zgJFJJoJYBvdfBSiZYBbdsUL0XcjHYWezpQBGPA=
github.com/blevesearch/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:9eJDeqxJ3E7WnLebQUlPD7ZjSce7AnDb9vjGmMCbD0A=
github.com/blevesearch/go-porterstemmer v1.0.3 h1:GtmsqID0aZdCSNiY8SkuPJ12pD4jI+DdXTAn4YRcHCo=
Expand Down Expand Up @@ -45,8 +45,8 @@ github.com/blevesearch/zapx/v15 v15.4.3 h1:iJiMJOHrz216jyO6lS0m9RTCEkprUnzvqAI2l
github.com/blevesearch/zapx/v15 v15.4.3/go.mod h1:1pssev/59FsuWcgSnTa0OeEpOzmhtmr/0/11H0Z8+Nw=
github.com/blevesearch/zapx/v16 v16.3.4 h1:hDAqA8qusZTNbPEL7//w5P65UZ2de6yhSeUaTbp0Po0=
github.com/blevesearch/zapx/v16 v16.3.4/go.mod h1:zqkPPqs9GS9FzVWzCO3Wf1X044yWAV17+4zb+FTiEHg=
github.com/blevesearch/zapx/v17 v17.0.12-0.20260421145725-c120519962c1 h1:1qM+d5vKedxmdL7rIldvQfgh68NevZXMNE7aQwkj5cU=
github.com/blevesearch/zapx/v17 v17.0.12-0.20260421145725-c120519962c1/go.mod h1:be77zp3wB5sTGTWo/6KwCEHnPRyOZYkIeQEr3YIO55E=
github.com/blevesearch/zapx/v17 v17.1.1 h1:Ltal7LsjzRerUg4hqVgMruKj3BAse+rrrDTe+9epJ2k=
github.com/blevesearch/zapx/v17 v17.1.1/go.mod h1:AfYxjApHf7JpQdW4yzFGisSKIrdkPesFn4yJ3vKKPQE=
github.com/couchbase/ghistogram v0.1.0 h1:b95QcQTCzjTUocDXp/uMgSNQi8oj1tGwnJ4bODWZnps=
github.com/couchbase/ghistogram v0.1.0/go.mod h1:s1Jhy76zqfEecpNWJfWUiKZookAFaiGOEoyzgHt9i7k=
github.com/couchbase/moss v0.2.0 h1:VCYrMzFwEryyhRSeI+/b3tRBSeTpi/8gn5Kf6dxqn+o=
Expand Down
2 changes: 1 addition & 1 deletion index/scorch/scorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,7 +1106,7 @@ func (s *Scorch) SetPathInBolt(key []byte, value []byte) error {
return err
}

// currently this is specific to centroid index file update
// currently this is specific to trained index file update
err = s.trainer.updateBolt(snapshotsBucket, key, value)
if err != nil {
return err
Expand Down
130 changes: 62 additions & 68 deletions index/scorch/train_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ type vectorTrainer struct {

m sync.RWMutex
// not a searchable segment in the sense that it won't return
// the data vectors. can return centroid vectors
centroidIndex *SegmentSnapshot
trainCh chan *trainRequest
// the data vectors, returns trained centroid layout
trainedIndex *SegmentSnapshot
trainCh chan *trainRequest
}

const IndexTrainedWithFastMerge = "vector_index_fast_merge"
Expand All @@ -67,8 +67,8 @@ func initTrainer(s *Scorch, config map[string]interface{}) *vectorTrainer {
config: maps.Clone(s.config),
trainCh: make(chan *trainRequest, 1),
}
// update the parent scorch config with the trainer's callback to fetch the centroid index
s.segmentConfig[index.TrainedIndexCallback] = index.TrainedIndexCallbackFn(trainer.getCentroidIndex)
// update the parent scorch config with the trainer's callback to fetch the trained index
s.segmentConfig[index.TrainedIndexCallback] = index.TrainedIndexCallbackFn(trainer.getTrainedIndex)
return &trainer
}
}
Expand Down Expand Up @@ -98,11 +98,11 @@ func (t *vectorTrainer) persistToBolt(trainReq *trainRequest) error {

trainerBucket, err := snapshotsBucket.CreateBucketIfNotExists(util.BoltTrainerKey)
if err != nil {
return fmt.Errorf("error creating centroid bucket: %v", err)
return fmt.Errorf("error creating trained index bucket: %v", err)
}
err = trainerBucket.Put(util.BoltPathKey, []byte(index.TrainedIndexFileName), nil)
if err != nil {
return fmt.Errorf("error updating centroid bucket: %v", err)
return fmt.Errorf("error updating trained index bucket: %v", err)
}

t.trainingComplete.Store(trainReq.finalSample)
Expand All @@ -128,13 +128,17 @@ func (t *vectorTrainer) persistToBolt(trainReq *trainRequest) error {
// this is not a routine that will be running throughout the lifetime of the index. It's purpose
// is to only train the vector index before the data ingestion starts.
func (t *vectorTrainer) trainLoop() {
defer func() {
t.parent.asyncTasks.Done()
}()
defer t.parent.asyncTasks.Done()

trainLoopStartTime := time.Now()
path := filepath.Join(t.parent.path, index.TrainedIndexFileName)
for {
// exit once the final sample set has been ingested and training is complete.
if t.trainingComplete.Load() {
atomic.StoreUint64(&t.parent.stats.TotTrainedSamples, t.trainedSamples)
atomic.StoreUint64(&t.parent.stats.TotTrainTime, uint64(time.Since(trainLoopStartTime).Milliseconds()))
return
}
select {
case <-t.parent.closeCh:
select {
Expand All @@ -146,85 +150,75 @@ func (t *vectorTrainer) trainLoop() {
return
case trainReq := <-t.trainCh:
sampleSeg := trainReq.sample
if t.centroidIndex == nil {
// no sample segment: just persist state if this is the final sample and move on.
if sampleSeg == nil {
if trainReq.finalSample {
if err := t.persistToBolt(trainReq); err != nil {
trainReq.ackCh <- fmt.Errorf("error persisting to bolt: %v", err)
close(trainReq.ackCh)
return
}
}
close(trainReq.ackCh)
continue
}

if t.trainedIndex == nil {
switch seg := sampleSeg.(type) {
case segment.UnpersistedSegment:
err := persistToDirectory(seg, nil, path)
if err != nil {
if err := persistToDirectory(seg, nil, path); err != nil {
trainReq.ackCh <- fmt.Errorf("error persisting segment: %v", err)
close(trainReq.ackCh)
continue
}
default:
}
} else {
// merge the new segment with the existing one, to create a new
// .tmp centroid index file and then move it to the actual
// centroid index file path (during the merge, Os.Open(centroidIndexPath)
// won't be safe since its still being used for merge)
// merge the new segment with the existing one into a .tmp file, then
// atomically rename it into place (Os.Open on the live path is unsafe
// during the merge).
t.config[index.TrainingKey] = true
_, _, err := t.parent.segPlugin.MergeUsing([]segment.Segment{t.centroidIndex.segment, sampleSeg},
_, _, err := t.parent.segPlugin.MergeUsing([]segment.Segment{t.trainedIndex.segment, sampleSeg},
[]*roaring.Bitmap{nil, nil}, path+".tmp", t.parent.closeCh, nil, t.config)
t.config[index.TrainingKey] = false
if err != nil {
trainReq.ackCh <- fmt.Errorf("error merging centroid index: %v", err)
trainReq.ackCh <- fmt.Errorf("error merging trained index: %v", err)
close(trainReq.ackCh)
return
}
// reset the training flag once completed
t.config[index.TrainingKey] = false

// close the existing centroid segment - it's supposed to be gc'd at this point
t.centroidIndex.segment.Close()
err = moveFile(path+".tmp", path)
if err != nil {
trainReq.ackCh <- fmt.Errorf("error renaming centroid index: %v", err)
t.trainedIndex.segment.Close()
if err = moveFile(path+".tmp", path); err != nil {
trainReq.ackCh <- fmt.Errorf("error renaming trained index: %v", err)
close(trainReq.ackCh)
return
}
}

// a bolt transaction is necessary for failover-recovery scenario and also serves as a checkpoint
// where we can be sure that the centroid index is available for the indexing operations downstream
//
// note: when the scale increases massively especially with real world dimensions of 1536+, this API
// will have to be refactored to persist in a more resource efficient way. so having this bolt related
// code will help in tracking the progress a lot better and avoid any redudant data streaming operations.
//
// bolt write acts as a checkpoint for failover-recovery: callers downstream
// can rely on the trained index being available once this completes.
// todo: rethink the frequency of bolt writes
err := t.persistToBolt(trainReq)
if err != nil {
if err := t.persistToBolt(trainReq); err != nil {
trainReq.ackCh <- fmt.Errorf("error persisting to bolt: %v", err)
close(trainReq.ackCh)
return
}

// update the centroid index pointer
centroidIndex, err := t.parent.segPlugin.OpenUsing(path, t.parent.segmentConfig)
trainedIndex, err := t.parent.segPlugin.OpenUsing(path, t.parent.segmentConfig)
if err != nil {
trainReq.ackCh <- fmt.Errorf("error opening centroid index: %v", err)
trainReq.ackCh <- fmt.Errorf("error opening trained index: %v", err)
close(trainReq.ackCh)
return
}

t.m.Lock()
t.centroidIndex = &SegmentSnapshot{
segment: centroidIndex,
}
t.trainedIndex = &SegmentSnapshot{segment: trainedIndex}
t.m.Unlock()
close(trainReq.ackCh)

// exit the trainer loop we've ingested the final sample set and training
// is assumed to be complete.
if t.trainingComplete.Load() {
atomic.StoreUint64(&t.parent.stats.TotTrainedSamples, t.trainedSamples)
atomic.StoreUint64(&t.parent.stats.TotTrainTime, uint64(time.Since(trainLoopStartTime).Milliseconds()))
return
}
}
}
}

// loads the metadata specific to the centroid index from boltdb, happens during init
// loads the metadata specific to the trained index from boltdb, happens during init
// no lock needed
func (t *vectorTrainer) loadTrainedData(bucket *util.BoltBucketImpl) error {
if bucket == nil {
Expand Down Expand Up @@ -262,7 +256,7 @@ func (t *vectorTrainer) loadTrainedData(bucket *util.BoltBucketImpl) error {

t.m.Lock()
defer t.m.Unlock()
t.centroidIndex = segmentSnapshot
t.trainedIndex = segmentSnapshot
return nil
}

Expand Down Expand Up @@ -290,6 +284,11 @@ func (t *vectorTrainer) train(batch *index.Batch) error {
return fmt.Errorf("error parsing train complete: %v", err)
}

trainReq := &trainRequest{
finalSample: fin,
sampleSize: len(trainData),
ackCh: make(chan error),
}
// just builds a new vector index out of the train data provided
// this is not necessarily the final train data since this is submitted
// as a request to the trainer component to be merged. once the training
Expand All @@ -298,16 +297,11 @@ func (t *vectorTrainer) train(batch *index.Batch) error {
//
// note: this might index text data too, how to handle this? s.segmentConfig?
// todo: updates/deletes -> data drift detection
seg, _, err := t.parent.segPlugin.NewUsing(trainData, t.parent.segmentConfig)
if err != nil {
return err
}

trainReq := &trainRequest{
finalSample: fin,
sampleSize: len(trainData),
ackCh: make(chan error),
sample: seg,
if len(trainData) > 0 {
trainReq.sample, _, err = t.parent.segPlugin.NewUsing(trainData, t.parent.segmentConfig)
if err != nil {
return err
}
}

t.trainCh <- trainReq
Expand All @@ -327,15 +321,15 @@ func (t *vectorTrainer) getInternal(key []byte) ([]byte, error) {
return nil, nil
}

func (t *vectorTrainer) getCentroidIndex(field string) (interface{}, error) {
func (t *vectorTrainer) getTrainedIndex(field string) (interface{}, error) {
// return the coarse quantizer of the trained faiss index belonging to the field
// if its not available then zap performs naive merge
t.m.RLock()
defer t.m.RUnlock()
if t.centroidIndex != nil {
trainedSegment, ok := t.centroidIndex.segment.(segment.TrainedSegment)
if t.trainedIndex != nil {
trainedSegment, ok := t.trainedIndex.segment.(segment.TrainedSegment)
if !ok {
return nil, fmt.Errorf("segment is not a centroid index segment")
return nil, fmt.Errorf("segment is not a trained index segment")
}

coarseQuantizer, err := trainedSegment.GetCoarseQuantizer(field)
Expand All @@ -349,7 +343,7 @@ func (t *vectorTrainer) getCentroidIndex(field string) (interface{}, error) {

func (t *vectorTrainer) copyFileLOCKED(file string, d index.IndexDirectory) error {
if strings.HasSuffix(file, index.TrainedIndexFileName) {
// centroid index file - this is outside the snapshots domain so the bolt update is different
// trained index file - this is outside the snapshots domain so the bolt update is different
err := d.SetPathInBolt(util.BoltTrainerKey, []byte(file))
if err != nil {
return fmt.Errorf("error updating dest index bolt: %w", err)
Expand Down Expand Up @@ -393,7 +387,7 @@ func (t *vectorTrainer) updateBolt(snapshotsBucket *util.BoltBucketImpl, key []b
}

// update the centroid index pointer
t.centroidIndex, err = t.parent.loadSegment(trainerBucket, reader)
t.trainedIndex, err = t.parent.loadSegment(trainerBucket, reader)
if err != nil {
return err
}
Expand Down
Loading