Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions index/scorch/introducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
segment: next.data, // take ownership of next.data's ref-count
stats: next.stats,
cachedDocs: &cachedDocs{cache: nil},
cachedMeta: &cachedMeta{meta: nil},
cachedMeta: newCachedMeta(),
internal: make(map[string][]byte),
creator: "introduceSegment",
}
Expand Down Expand Up @@ -460,7 +460,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
deleted: newSegmentDeleted[i],
stats: stats,
cachedDocs: &cachedDocs{cache: nil},
cachedMeta: &cachedMeta{meta: nil},
cachedMeta: newCachedMeta(),
creator: "introduceMerge",
mmaped: nextMerge.mmaped,
})
Expand Down
165 changes: 85 additions & 80 deletions index/scorch/optimize_knn.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"

"github.com/blevesearch/bleve/v2/search"
index "github.com/blevesearch/bleve_index_api"
Expand All @@ -36,9 +35,6 @@ type OptimizeVR struct {
vrs map[string][]*IndexSnapshotVectorReader
}

// This setting _MUST_ only be changed during init and not after.
var BleveMaxKNNConcurrency = 10

func (o *OptimizeVR) invokeSearcherEndCallback() {
if o.ctx != nil {
if cb := o.ctx.Value(search.SearcherEndCallbackKey); cb != nil {
Expand All @@ -54,88 +50,98 @@ func (o *OptimizeVR) invokeSearcherEndCallback() {
}
}

func (o *OptimizeVR) Finish() error {
// search runs the configured kNN searches for every vector
// reader against a single segment, populating the per-segment postings and
// iterators on each reader.
func (o *OptimizeVR) search(segID int) error {
snapshot := o.snapshot.segment[segID]
vecSeg, ok := snapshot.segment.(segment_api.VectorSegment)
if !ok {
return nil
}
meta := snapshot.cachedMeta
// for each field, get the vector index --> invoke the zap func.
// for each VR, populate postings list and iterators
// by passing the obtained vector index and getting similar vectors.
// defer close index - just once.
var errorsM sync.Mutex
var errors []error
for field, vrs := range o.vrs {
// Early exit if the field is supposed to be completely deleted or
// if it's index data has been deleted
if info, ok := o.snapshot.updatedFields[field]; ok && (info.Deleted || info.Index) {
continue
}
vecIndex, err := vecSeg.InterpretVectorIndex(field, snapshot.deleted)
if err != nil {
return err
}
// update the vector index size as a meta value in the segment snapshot
// if not already present
if !meta.contains(field) {
meta.store(field, vecIndex.Size())
}
Comment thread
CascadingRadium marked this conversation as resolved.
for _, vr := range vrs {
var pl segment_api.VecPostingsList
var err error
// for each VR, populate postings list and iterators
// by passing the obtained vector index and getting similar vectors.

// check if the vector reader is configured to use a pre-filter
// to filter out ineligible documents before performing
// kNN search.
if vr.eligibleSelector != nil {
pl, err = vecIndex.SearchWithFilter(
vr.vector,
vr.k,
vr.eligibleSelector.SegmentEligibleDocuments(segID),
vr.searchParams,
)
} else {
pl, err = vecIndex.Search(
vr.vector,
vr.k,
vr.searchParams,
)
}
if err != nil {
vecIndex.Close()
return err
}
// postings and iterators are already alloc'ed when
// IndexSnapshotVectorReader is created, here we are just
// populating them with the obtained postings list.
vr.postings[segID] = pl
vr.iterators[segID] = pl.Iterator(vr.iterators[segID])
}
go vecIndex.Close()
}
return nil
}

func (o *OptimizeVR) Finish() error {
defer o.invokeSearcherEndCallback()

wg := sync.WaitGroup{}
semaphore := make(chan struct{}, BleveMaxKNNConcurrency)
// Launch goroutines to get vector index for each segment
for i, seg := range o.snapshot.segment {
if sv, ok := seg.segment.(segment_api.VectorSegment); ok {
wg.Add(1)
semaphore <- struct{}{} // Acquire a semaphore slot
go func(index int, segment segment_api.VectorSegment, origSeg *SegmentSnapshot) {
defer func() {
<-semaphore // Release the semaphore slot
wg.Done()
}()
for field, vrs := range o.vrs {
// Early exit if the field is supposed to be completely deleted or
// if it's index data has been deleted
if info, ok := o.snapshot.updatedFields[field]; ok && (info.Deleted || info.Index) {
continue
}

vecIndex, err := segment.InterpretVectorIndex(field, origSeg.deleted)
if err != nil {
errorsM.Lock()
errors = append(errors, err)
errorsM.Unlock()
return
}

// update the vector index size as a meta value in the segment snapshot
vectorIndexSize := vecIndex.Size()
origSeg.cachedMeta.updateMeta(field, vectorIndexSize)
for _, vr := range vrs {
var pl segment_api.VecPostingsList
var err error

// for each VR, populate postings list and iterators
// by passing the obtained vector index and getting similar vectors.

// check if the vector reader is configured to use a pre-filter
// to filter out ineligible documents before performing
// kNN search.
if vr.eligibleSelector != nil {
pl, err = vecIndex.SearchWithFilter(vr.vector, vr.k,
vr.eligibleSelector.SegmentEligibleDocuments(index), vr.searchParams)
} else {
pl, err = vecIndex.Search(vr.vector, vr.k, vr.searchParams)
}

if err != nil {
errorsM.Lock()
errors = append(errors, err)
errorsM.Unlock()
go vecIndex.Close()
return
}

atomic.AddUint64(&o.snapshot.parent.stats.TotKNNSearches, uint64(1))

// postings and iterators are already alloc'ed when
// IndexSnapshotVectorReader is created
vr.postings[index] = pl
vr.iterators[index] = pl.Iterator(vr.iterators[index])
}
go vecIndex.Close()
}
}(i, sv, seg)
}
numSegments := len(o.snapshot.segment)

var wg sync.WaitGroup
wg.Add(numSegments)
var errM sync.Mutex
var searchErr error
// launch goroutines to search the vector index for each segment
Comment thread
CascadingRadium marked this conversation as resolved.
for i := 0; i < numSegments; i++ {
go func(segID int) {
defer wg.Done()
if err := o.search(segID); err != nil {
errM.Lock()
searchErr = err
errM.Unlock()
}
}(i)
}
// wait until all the launched goroutines finish and collect errors if any
wg.Wait()
close(semaphore)
if len(errors) > 0 {
return errors[0]
// report the error, if any
if searchErr != nil {
return searchErr
}

return nil
}

Expand Down Expand Up @@ -173,8 +179,7 @@ func (s *IndexSnapshotVectorReader) VectorOptimize(ctx context.Context,
// Finish() logic to even occur or not.
var sumVectorIndexSize uint64
for _, seg := range o.snapshot.segment {
vecIndexSize := seg.cachedMeta.fetchMeta(s.field)
if vecIndexSize != nil {
if vecIndexSize, ok := seg.cachedMeta.load(s.field); ok {
sumVectorIndexSize += vecIndexSize.(uint64)
}
}
Expand Down
2 changes: 1 addition & 1 deletion index/scorch/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -1112,7 +1112,7 @@ func (s *Scorch) loadSegment(segmentBucket *util.BoltBucketImpl, reader util.Fil
rv := &SegmentSnapshot{
segment: seg,
cachedDocs: &cachedDocs{cache: nil},
cachedMeta: &cachedMeta{meta: nil},
cachedMeta: newCachedMeta(),
}
deletedBytes, err := segmentBucket.Get(util.BoltDeletedKey, reader)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion index/scorch/scorch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3571,7 +3571,7 @@ func makeSegmentSnapshot(id uint64, seg segment.Segment) *SegmentSnapshot {
id: id,
segment: seg,
cachedDocs: &cachedDocs{cache: nil},
cachedMeta: &cachedMeta{meta: nil},
cachedMeta: newCachedMeta(),
}
}

Expand Down
5 changes: 4 additions & 1 deletion index/scorch/snapshot_index_vr.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"reflect"
"sort"
"sync/atomic"

"github.com/blevesearch/bleve/v2/size"
index "github.com/blevesearch/bleve_index_api"
Expand Down Expand Up @@ -164,7 +165,9 @@ func (i *IndexSnapshotVectorReader) Count() uint64 {
}

func (i *IndexSnapshotVectorReader) Close() error {
// TODO Consider if any scope of recycling here.
if i.snapshot != nil {
atomic.AddUint64(&i.snapshot.parent.stats.TotKNNSearches, 1)
}
return nil
Comment thread
CascadingRadium marked this conversation as resolved.
}

Expand Down
42 changes: 20 additions & 22 deletions index/scorch/snapshot_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,34 +350,32 @@ func (c *cachedDocs) visitDoc(localDocNum uint64,
c.m.RUnlock()
}

// the purpose of the cachedMeta is to simply allow the user of this type to record
// and cache certain meta data information (specific to the segment) that can be
// used across calls to save compute on the same.
// for example searcher creations on the same index snapshot can use this struct
// to help and fetch the backing index size information which can be used in
// memory usage calculation thereby deciding whether to allow a query or not.
// cachedMeta is a simple wrapper around sync.Map to provide typed
// access to cached metadata values for segments.
type cachedMeta struct {
m sync.RWMutex
meta map[string]interface{}
meta sync.Map
}
Comment thread
CascadingRadium marked this conversation as resolved.

func (c *cachedMeta) updateMeta(field string, val interface{}) {
c.m.Lock()
if c.meta == nil {
c.meta = make(map[string]interface{})
func newCachedMeta() *cachedMeta {
return &cachedMeta{
meta: sync.Map{},
}
c.meta[field] = val
c.m.Unlock()
}

func (c *cachedMeta) fetchMeta(field string) (rv interface{}) {
c.m.RLock()
defer c.m.RUnlock()
if c.meta == nil {
return nil
}
rv = c.meta[field]
return rv
// store the value for a field in the cache, overwriting any existing value.
func (c *cachedMeta) store(field string, val interface{}) {
c.meta.Store(field, val)
}

// fetch the value for a field from the cache, returns nil if the field is not present in the cache.
func (c *cachedMeta) load(field string) (rv interface{}, ok bool) {
return c.meta.Load(field)
}

// contains reports whether the cache has an entry for the given field.
func (c *cachedMeta) contains(field string) bool {
_, ok := c.meta.Load(field)
return ok
}

func (s *SegmentSnapshot) Ancestors(docNum uint64, prealloc []index.AncestorID) []index.AncestorID {
Expand Down
Loading