Skip to content

Commit af35fab

Browse files
committed
Add version-based CAS semantics
1 parent 0c801b2 commit af35fab

13 files changed

Lines changed: 606 additions & 39 deletions

File tree

internal/index/flat/flat.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ type VectorEntry struct {
1717
ID string `json:"id"`
1818
Vector []float32 `json:"vector"`
1919
Metadata map[string]interface{} `json:"metadata"`
20+
Version uint64 `json:"version"`
2021
}
2122

2223
// SearchResult represents a search result from the flat index
@@ -25,6 +26,7 @@ type SearchResult struct {
2526
Score float32 `json:"score"`
2627
Vector []float32 `json:"vector"`
2728
Metadata map[string]interface{} `json:"metadata"`
29+
Version uint64 `json:"version"`
2830
}
2931

3032
// Config holds configuration for the flat index

internal/index/hnsw/hnsw.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type VectorEntry struct {
1818
Ordinal uint32
1919
Vector []float32
2020
Metadata map[string]interface{}
21+
Version uint64
2122
}
2223

2324
// SearchResult represents a search result from HNSW
@@ -27,6 +28,7 @@ type SearchResult struct {
2728
Score float32
2829
Vector []float32
2930
Metadata map[string]interface{}
31+
Version uint64
3032
}
3133

3234
type VectorProvider interface {

internal/index/interfaces.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ type VectorEntry struct {
3333
Ordinal uint32
3434
Vector []float32
3535
Metadata map[string]interface{}
36+
Version uint64
3637
}
3738

3839
// SearchResult represents a search result (avoid circular imports)
@@ -42,6 +43,7 @@ type SearchResult struct {
4243
Score float32
4344
Vector []float32
4445
Metadata map[string]interface{}
46+
Version uint64
4547
}
4648

4749
// NEW: PersistenceMetadata holds metadata about persisted index
@@ -129,6 +131,7 @@ func (w *hnswWrapper) Insert(ctx context.Context, entry *VectorEntry) error {
129131
Ordinal: entry.Ordinal,
130132
Vector: entry.Vector,
131133
Metadata: entry.Metadata,
134+
Version: entry.Version,
132135
}
133136
return w.index.Insert(ctx, hnswEntry)
134137
}
@@ -142,6 +145,7 @@ func (w *hnswWrapper) BatchInsert(ctx context.Context, entries []*VectorEntry) e
142145
Ordinal: entry.Ordinal,
143146
Vector: entry.Vector,
144147
Metadata: entry.Metadata,
148+
Version: entry.Version,
145149
}
146150
}
147151
return w.index.BatchInsert(ctx, hnswEntries)
@@ -162,6 +166,7 @@ func (w *hnswWrapper) Search(ctx context.Context, query []float32, k int) ([]*Se
162166
Score: r.Score,
163167
Vector: r.Vector,
164168
Metadata: r.Metadata,
169+
Version: r.Version,
165170
}
166171
}
167172
return results, nil
@@ -261,6 +266,7 @@ func (w *ivfpqWrapper) Insert(ctx context.Context, entry *VectorEntry) error {
261266
ID: entry.ID,
262267
Vector: entry.Vector,
263268
Metadata: entry.Metadata,
269+
Version: entry.Version,
264270
}
265271
return w.index.Insert(ctx, ivfpqEntry)
266272
}
@@ -289,6 +295,7 @@ func (w *ivfpqWrapper) Search(ctx context.Context, query []float32, k int) ([]*S
289295
Score: r.Score,
290296
Vector: r.Vector,
291297
Metadata: r.Metadata,
298+
Version: r.Version,
292299
}
293300
}
294301
return results, nil
@@ -378,6 +385,7 @@ func (w *flatWrapper) Insert(ctx context.Context, entry *VectorEntry) error {
378385
ID: entry.ID,
379386
Vector: entry.Vector,
380387
Metadata: entry.Metadata,
388+
Version: entry.Version,
381389
}
382390
return w.index.Insert(ctx, flatEntry)
383391
}
@@ -406,6 +414,7 @@ func (w *flatWrapper) Search(ctx context.Context, query []float32, k int) ([]*Se
406414
Score: r.Score,
407415
Vector: r.Vector,
408416
Metadata: r.Metadata,
417+
Version: r.Version,
409418
}
410419
}
411420
return results, nil

internal/index/ivfpq/ivfpq.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ type VectorEntry struct {
3131
ID string
3232
Vector []float32
3333
Metadata map[string]interface{}
34+
Version uint64
3435
}
3536

3637
// SearchResult represents a search result
@@ -39,6 +40,7 @@ type SearchResult struct {
3940
Score float32
4041
Vector []float32
4142
Metadata map[string]interface{}
43+
Version uint64
4244
}
4345

4446
// DefaultConfig returns a default IVF-PQ configuration

internal/obs/metrics.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ type Metrics struct {
1616
TxCommits prometheus.Counter
1717
TxRollbacks prometheus.Counter
1818
TxConflicts prometheus.Counter
19+
CASSuccesses prometheus.Counter
20+
CASConflicts prometheus.Counter
21+
CASAborts prometheus.Counter
1922
TxCommitOps prometheus.Histogram
2023
TxCommitLatency prometheus.Histogram
2124
SearchQueries prometheus.Counter
@@ -65,6 +68,18 @@ func NewMetrics() *Metrics {
6568
Name: "libravdb_transaction_conflicts_total",
6669
Help: "Total transaction conflicts or validation failures",
6770
}),
71+
CASSuccesses: factory.NewCounter(prometheus.CounterOpts{
72+
Name: "libravdb_cas_success_total",
73+
Help: "Total successful compare-and-swap writes",
74+
}),
75+
CASConflicts: factory.NewCounter(prometheus.CounterOpts{
76+
Name: "libravdb_cas_conflict_total",
77+
Help: "Total compare-and-swap version conflicts",
78+
}),
79+
CASAborts: factory.NewCounter(prometheus.CounterOpts{
80+
Name: "libravdb_cas_abort_total",
81+
Help: "Total transaction aborts caused by compare-and-swap precondition failures",
82+
}),
6883
TxCommitOps: factory.NewHistogram(prometheus.HistogramOpts{
6984
Name: "libravdb_transaction_commit_ops",
7085
Help: "Number of staged operations per committed transaction",

internal/storage/interfaces.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,14 @@ const (
4141

4242
// TxOperation represents one row-level mutation in a transactional batch.
4343
type TxOperation struct {
44-
Type TxOperationType
45-
Collection string
46-
ID string
47-
Ordinal uint32
48-
Vector []float32
49-
Metadata map[string]interface{}
44+
Type TxOperationType
45+
Collection string
46+
ID string
47+
Ordinal uint32
48+
Vector []float32
49+
Metadata map[string]interface{}
50+
ExpectedVersion uint64
51+
HasExpectedVersion bool
5052
}
5153

5254
// TransactionalEngine extends Engine with atomic multi-collection commit support.

internal/storage/singlefile/engine.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1072,12 +1072,14 @@ func (e *Engine) PrepareTx(ctx context.Context, ops []storage.TxOperation) ([]st
10721072
}
10731073

10741074
prepared[i] = storage.TxOperation{
1075-
Type: op.Type,
1076-
Collection: op.Collection,
1077-
ID: op.ID,
1078-
Ordinal: op.Ordinal,
1079-
Vector: append([]float32(nil), op.Vector...),
1080-
Metadata: cloneMetadata(op.Metadata),
1075+
Type: op.Type,
1076+
Collection: op.Collection,
1077+
ID: op.ID,
1078+
Ordinal: op.Ordinal,
1079+
Vector: append([]float32(nil), op.Vector...),
1080+
Metadata: cloneMetadata(op.Metadata),
1081+
ExpectedVersion: op.ExpectedVersion,
1082+
HasExpectedVersion: op.HasExpectedVersion,
10811083
}
10821084

10831085
if op.Type != storage.TxOperationPut {
@@ -1353,6 +1355,7 @@ func cloneEntry(record *recordValue) *index.VectorEntry {
13531355
Ordinal: record.Ordinal,
13541356
Vector: append([]float32(nil), record.Vector...),
13551357
Metadata: cloneMetadata(record.Metadata),
1358+
Version: record.Version,
13561359
}
13571360
}
13581361

libravdb/collection.go

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
// Collection represents a named collection of vectors with a specific schema
2020
type Collection struct {
2121
mu sync.RWMutex
22+
db *Database
2223
name string
2324
config *CollectionConfig
2425
index index.Index
@@ -798,6 +799,46 @@ func (c *Collection) DeleteBatch(ctx context.Context, ids []string) error {
798799
return nil
799800
}
800801

802+
// Get returns a persisted record by ID.
803+
func (c *Collection) Get(ctx context.Context, id string) (Record, error) {
804+
c.mu.RLock()
805+
if c.closed {
806+
c.mu.RUnlock()
807+
return Record{}, ErrCollectionClosed
808+
}
809+
c.mu.RUnlock()
810+
811+
entry, err := c.storage.Get(ctx, id)
812+
if err != nil {
813+
return Record{}, fmt.Errorf("%w: %s", ErrRecordNotFound, id)
814+
}
815+
return recordFromIndexEntry(entry), nil
816+
}
817+
818+
// UpdateIfVersion updates a record only if its current committed version matches expectedVersion.
819+
func (c *Collection) UpdateIfVersion(ctx context.Context, id string, vector []float32, metadata map[string]interface{}, expectedVersion uint64) error {
820+
return c.withCAS(ctx, func(tx Tx) error {
821+
return tx.UpdateIfVersion(ctx, c.name, id, vector, metadata, expectedVersion)
822+
})
823+
}
824+
825+
// DeleteIfVersion deletes a record only if its current committed version matches expectedVersion.
826+
func (c *Collection) DeleteIfVersion(ctx context.Context, id string, expectedVersion uint64) error {
827+
return c.withCAS(ctx, func(tx Tx) error {
828+
return tx.DeleteIfVersion(ctx, c.name, id, expectedVersion)
829+
})
830+
}
831+
832+
func (c *Collection) withCAS(ctx context.Context, fn func(tx Tx) error) error {
833+
if c == nil {
834+
return ErrCollectionClosed
835+
}
836+
if c.db == nil {
837+
return ErrTxEngineUnsupported
838+
}
839+
return c.db.WithTx(ctx, fn)
840+
}
841+
801842
// Iterate walks all persisted records in the collection.
802843
func (c *Collection) Iterate(ctx context.Context, fn func(Record) error) error {
803844
c.mu.RLock()
@@ -923,8 +964,9 @@ func (c *Collection) Search(ctx context.Context, vector []float32, k int) (*Sear
923964
results := make([]*SearchResult, len(indexResults))
924965
for i, r := range indexResults {
925966
result := &SearchResult{
926-
ID: r.ID,
927-
Score: r.Score,
967+
ID: r.ID,
968+
Score: r.Score,
969+
Version: r.Version,
928970
}
929971
if len(r.Vector) > 0 {
930972
result.Vector = cloneVector(r.Vector)
@@ -941,10 +983,11 @@ func (c *Collection) Search(ctx context.Context, vector []float32, k int) (*Sear
941983
}
942984
}
943985
}
944-
if result.Vector == nil || result.Metadata == nil {
986+
if result.Vector == nil || result.Metadata == nil || result.Version == 0 {
945987
entry, getErr := c.storage.Get(ctx, result.ID)
946988
if getErr == nil {
947989
result.ID = entry.ID
990+
result.Version = entry.Version
948991
if result.Vector == nil {
949992
result.Vector = cloneVector(entry.Vector)
950993
}
@@ -1502,6 +1545,7 @@ func (c *Collection) getAllVectors(ctx context.Context) ([]*index.VectorEntry, e
15021545
Ordinal: entry.Ordinal,
15031546
Vector: make([]float32, len(entry.Vector)),
15041547
Metadata: make(map[string]interface{}),
1548+
Version: entry.Version,
15051549
}
15061550
copy(vectorCopy.Vector, entry.Vector)
15071551
for k, v := range entry.Metadata {
@@ -1527,6 +1571,7 @@ func recordFromIndexEntry(entry *index.VectorEntry) Record {
15271571
ID: entry.ID,
15281572
Vector: cloneVector(entry.Vector),
15291573
Metadata: cloneMetadata(entry.Metadata),
1574+
Version: entry.Version,
15301575
}
15311576
}
15321577

libravdb/database.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ func (db *Database) CreateCollection(ctx context.Context, name string, opts ...C
104104
if err != nil {
105105
return nil, fmt.Errorf("failed to create collection: %w", err)
106106
}
107+
collection.db = db
107108

108109
db.collections[name] = collection
109110
return collection, nil
@@ -420,6 +421,7 @@ func (db *Database) loadCollectionFromStorage(name string, engine interface {
420421
if err != nil {
421422
return nil, fmt.Errorf("failed to create collection from storage: %w", err)
422423
}
424+
collection.db = db
423425

424426
return collection, nil
425427
}

libravdb/query.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -455,8 +455,10 @@ func (qb *QueryBuilder) applyFilters(results []*SearchResult, filters []filter.F
455455
// Convert back to libravdb.SearchResult, preserving scores
456456
// Create a map for quick lookup of original scores
457457
scoreMap := make(map[string]float32)
458+
versionMap := make(map[string]uint64)
458459
for _, result := range results {
459460
scoreMap[result.ID] = result.Score
461+
versionMap[result.ID] = result.Version
460462
}
461463

462464
filteredResults := make([]*SearchResult, len(filterEntries))
@@ -466,6 +468,7 @@ func (qb *QueryBuilder) applyFilters(results []*SearchResult, filters []filter.F
466468
Score: scoreMap[entry.ID], // Preserve original score
467469
Vector: entry.Vector,
468470
Metadata: entry.Metadata,
471+
Version: versionMap[entry.ID],
469472
}
470473
}
471474

@@ -561,6 +564,7 @@ func recordsFromSearchResults(results []*SearchResult) []Record {
561564
ID: result.ID,
562565
Vector: cloneVector(result.Vector),
563566
Metadata: cloneMetadata(result.Metadata),
567+
Version: result.Version,
564568
})
565569
}
566570
return records

0 commit comments

Comments
 (0)