Skip to content

Commit 0f532ca

Browse files
committed
Reduce collection write lock hold time and add batch insert regression tests
Collection insert path: - Move vector dimension validation and entry staging before acquireWrite/mu.Lock - Move Prometheus metric increments after unlock (counters are atomic) - Keep storage.Exists, AssignOrdinals, training prep, storage/index mutation, and checkAndSwitchIndexType serialized under lock - Flag checkAndSwitchIndexType as remaining serialized hotspot pending race-safety proof Storage engine: - Batch WAL commits with 64-entry threshold and 10ms flusher ticker - Fix error propagation through putRecordsInlocked and flushBatch - Fix re-queue suffix bug: only re-queue failed entries[i:], not committed prefix - Start flusher goroutine after successful init to prevent goroutine leak Tests: - Add TestHNSWBatchInsert_Regression: 25-entry HNSW batch with ChunkSize=5, MaxConcurrency=4 - Add TestInsertPreflightNoPartialWrite: invalid-dimension Insert fails preflight, no partial write - Add TestBatchInsertPreflightNoPartialWrite: mixed batch correctly handles per-item failures - Add TestBatchInsertDuplicateIDPreflight: duplicate IDs in batch, at most one entry committed
1 parent af35fab commit 0f532ca

File tree

5 files changed

+962
-41
lines changed

5 files changed

+962
-41
lines changed

internal/storage/singlefile/engine.go

Lines changed: 198 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@ var castagnoli = crc32.MakeTable(crc32.Castagnoli)
4444
const (
4545
checkpointThresholdBytes = 64 << 20
4646
checkpointThresholdOps = 16384
47+
// Batch WAL buffer settings: entries are buffered and flushed together
48+
// to reduce fsync overhead. A batch is flushed when it reaches batchSize
49+
// entries or after batchFlushInterval elapses.
50+
batchSize = 64 // flush when buffer reaches this many entries
51+
batchFlushInterval = 10 * time.Millisecond // flush periodically if buffer not full
4752
)
4853

4954
type fileHeader struct {
@@ -167,6 +172,28 @@ type Engine struct {
167172
replayedTxs uint64
168173
discardedTxs uint64
169174
closed bool
175+
176+
// WAL batch buffer: accumulates entries across multiple putRecords calls
177+
// and flushes them together to reduce fsync overhead.
178+
batchBuffer struct {
179+
mu sync.Mutex
180+
entries []batchEntry // accumulated entries awaiting flush
181+
flusher chan struct{} // signal to wake up flusher
182+
closed bool
183+
flushNow []chan error // completion channels for foreground flushes
184+
}
185+
}
186+
187+
// batchEntry holds a buffered record pending WAL flush.
188+
type batchEntry struct {
189+
collection string
190+
entries []*index.VectorEntry
191+
}
192+
193+
// startBatchFlusher is a hook for testing. It starts the background flusher goroutine.
194+
// Defaults to launching the goroutine directly; can be overridden in tests.
195+
var startBatchFlusher = func(e *Engine) {
196+
go e.batchFlusher()
170197
}
171198

172199
// RecoveryStats exposes WAL replay outcomes for debugging and tests.
@@ -201,6 +228,10 @@ func New(path string) (storage.Engine, error) {
201228
collections: make(map[string]*Collection),
202229
}
203230

231+
// Initialize WAL batch buffer channels (flusher goroutine started after init succeeds)
232+
engine.batchBuffer.flusher = make(chan struct{})
233+
engine.batchBuffer.entries = nil
234+
204235
stat, err := file.Stat()
205236
if err != nil {
206237
file.Close()
@@ -212,6 +243,8 @@ func New(path string) (storage.Engine, error) {
212243
file.Close()
213244
return nil, err
214245
}
246+
// Start background flusher only after successful initialization
247+
startBatchFlusher(engine)
215248
return engine, nil
216249
}
217250

@@ -220,6 +253,8 @@ func New(path string) (storage.Engine, error) {
220253
return nil, err
221254
}
222255

256+
// Start background flusher only after successful open
257+
startBatchFlusher(engine)
223258
return engine, nil
224259
}
225260

@@ -938,13 +973,99 @@ func (e *Engine) createCollection(name string, config storage.CollectionConfig)
938973
return e.maybeCheckpointLocked()
939974
}
940975

941-
func (e *Engine) putRecords(name string, entries []*index.VectorEntry) error {
976+
// batchFlusher is a background goroutine that periodically flushes the WAL batch buffer.
977+
// It wakes up every batchFlushInterval and flushes any accumulated entries.
978+
func (e *Engine) batchFlusher() {
979+
ticker := time.NewTicker(batchFlushInterval)
980+
defer ticker.Stop()
981+
982+
for {
983+
select {
984+
case <-ticker.C:
985+
e.flushBatch()
986+
case <-e.batchBuffer.flusher:
987+
e.flushBatch()
988+
case <-time.After(batchFlushInterval):
989+
// Double-check after timeout
990+
e.flushBatch()
991+
}
992+
// Check if the engine is closed
993+
e.batchBuffer.mu.Lock()
994+
closed := e.batchBuffer.closed
995+
e.batchBuffer.mu.Unlock()
996+
if closed {
997+
return
998+
}
999+
}
1000+
}
1001+
1002+
// flushBatch flushes all accumulated entries in the batch buffer to WAL.
1003+
// It acquires the engine mutex and writes all buffered entries as a single transaction.
1004+
// If the buffer is empty, this is a no-op.
1005+
// After flushing, it signals any waiting foreground flush completions.
1006+
// Returns the first error encountered during flush, or nil on success.
1007+
func (e *Engine) flushBatch() error {
1008+
e.batchBuffer.mu.Lock()
1009+
if len(e.batchBuffer.entries) == 0 && len(e.batchBuffer.flushNow) == 0 {
1010+
e.batchBuffer.mu.Unlock()
1011+
return nil
1012+
}
1013+
// Take ownership of the buffer and reset
1014+
entries := e.batchBuffer.entries
1015+
e.batchBuffer.entries = nil
1016+
// Take ownership of pending flush completions
1017+
pendingFlushes := e.batchBuffer.flushNow
1018+
e.batchBuffer.flushNow = nil
1019+
e.batchBuffer.mu.Unlock()
1020+
1021+
// Nothing to flush and no one waiting
1022+
if len(entries) == 0 && len(pendingFlushes) == 0 {
1023+
return nil
1024+
}
1025+
1026+
// Acquire engine lock for state modifications
9421027
e.mu.Lock()
9431028
defer e.mu.Unlock()
9441029

1030+
// Signal all waiters with the result
1031+
var firstErr error
1032+
signalErr := func(err error) {
1033+
if err != nil && firstErr == nil {
1034+
firstErr = err
1035+
}
1036+
for _, done := range pendingFlushes {
1037+
done <- err
1038+
}
1039+
}
1040+
9451041
if e.closed {
1042+
// Signal any waiting flushes with error
1043+
signalErr(fmt.Errorf("database is closed"))
9461044
return fmt.Errorf("database is closed")
9471045
}
1046+
1047+
// Write all entries to WAL - inline the put logic to avoid lock issues
1048+
for i, batch := range entries {
1049+
if err := e.putRecordsInlocked(batch.collection, batch.entries); err != nil {
1050+
// On error, re-queue only the failed suffix (entries[i:] onwards).
1051+
// Do NOT re-queue entries[:i] because they were already committed.
1052+
failedSuffix := entries[i:]
1053+
e.batchBuffer.mu.Lock()
1054+
e.batchBuffer.entries = append(failedSuffix, e.batchBuffer.entries...)
1055+
e.batchBuffer.mu.Unlock()
1056+
signalErr(err)
1057+
return err
1058+
}
1059+
}
1060+
1061+
// Signal all waiting foreground flush completions with success
1062+
signalErr(nil)
1063+
return nil
1064+
}
1065+
1066+
// putRecordsInlocked writes records to WAL and applies them to memory.
1067+
// Caller must hold e.mu. This is the internal batch-friendly variant.
1068+
func (e *Engine) putRecordsInlocked(name string, entries []*index.VectorEntry) error {
9481069
collection := e.state.Collections[name]
9491070
if collection == nil || collection.Deleted {
9501071
return fmt.Errorf("collection %s not found", name)
@@ -1005,7 +1126,60 @@ func (e *Engine) putRecords(name string, entries []*index.VectorEntry) error {
10051126
}
10061127
}
10071128
e.markDirtyLocked(written, len(ownedVectors))
1008-
return e.maybeCheckpointLocked()
1129+
if err := e.maybeCheckpointLocked(); err != nil {
1130+
return err
1131+
}
1132+
return nil
1133+
}
1134+
1135+
func (e *Engine) putRecords(name string, entries []*index.VectorEntry) error {
1136+
if e.closed {
1137+
return fmt.Errorf("database is closed")
1138+
}
1139+
1140+
// Add entries to batch buffer
1141+
e.batchBuffer.mu.Lock()
1142+
shouldFlush := len(e.batchBuffer.entries)+1 >= batchSize
1143+
e.batchBuffer.entries = append(e.batchBuffer.entries, batchEntry{
1144+
collection: name,
1145+
entries: entries,
1146+
})
1147+
e.batchBuffer.mu.Unlock()
1148+
1149+
// Signal the flusher and return immediately
1150+
select {
1151+
case e.batchBuffer.flusher <- struct{}{}:
1152+
default:
1153+
}
1154+
1155+
// If we've reached batch size, do a synchronous flush before returning.
1156+
// This ensures durability for this caller's data.
1157+
if shouldFlush {
1158+
return e.flushBatch()
1159+
}
1160+
1161+
return nil
1162+
}
1163+
1164+
// flushNow forces an immediate flush of the batch buffer and waits for completion.
1165+
// This is used by single-record Insert to ensure immediate durability.
1166+
// Returns the error from the flush operation, or nil on success.
1167+
func (e *Engine) flushNow() error {
1168+
done := make(chan error)
1169+
e.batchBuffer.mu.Lock()
1170+
// Only add to pending flushes if there's actually something to flush
1171+
// or if we want to ensure any in-progress flush completes
1172+
e.batchBuffer.flushNow = append(e.batchBuffer.flushNow, done)
1173+
e.batchBuffer.mu.Unlock()
1174+
1175+
// Signal the flusher
1176+
select {
1177+
case e.batchBuffer.flusher <- struct{}{}:
1178+
default:
1179+
}
1180+
1181+
// Wait for flush to complete and return the error
1182+
return <-done
10091183
}
10101184

10111185
func (e *Engine) deleteRecord(name, id string) error {
@@ -1291,6 +1465,17 @@ func (e *Engine) DeleteCollection(name string) error {
12911465

12921466
// Close checkpoints and closes the database file.
12931467
func (e *Engine) Close() error {
1468+
// Signal the batch flusher to stop and flush remaining entries.
1469+
e.batchBuffer.mu.Lock()
1470+
if !e.batchBuffer.closed {
1471+
e.batchBuffer.closed = true
1472+
close(e.batchBuffer.flusher)
1473+
}
1474+
e.batchBuffer.mu.Unlock()
1475+
1476+
// Flush any remaining buffered entries before close.
1477+
e.flushBatch()
1478+
12941479
e.mu.Lock()
12951480
defer e.mu.Unlock()
12961481
if e.closed {
@@ -1398,15 +1583,24 @@ func (c *Collection) AssignOrdinals(ctx context.Context, entries []*index.Vector
13981583
}
13991584

14001585
// Insert persists a single vector entry.
1586+
// It ensures immediate durability by forcing a flush before returning.
14011587
func (c *Collection) Insert(ctx context.Context, entry *index.VectorEntry) error {
14021588
_ = ctx
1403-
return c.engine.putRecords(c.name, []*index.VectorEntry{entry})
1589+
if err := c.engine.putRecords(c.name, []*index.VectorEntry{entry}); err != nil {
1590+
return err
1591+
}
1592+
return c.engine.flushNow()
14041593
}
14051594

14061595
// InsertBatch persists multiple vector entries.
1596+
// It uses buffered batching for better throughput, but ensures data is flushed
1597+
// before returning so callers can immediately see the inserted data.
14071598
func (c *Collection) InsertBatch(ctx context.Context, entries []*index.VectorEntry) error {
14081599
_ = ctx
1409-
return c.engine.putRecords(c.name, entries)
1600+
if err := c.engine.putRecords(c.name, entries); err != nil {
1601+
return err
1602+
}
1603+
return c.engine.flushNow()
14101604
}
14111605

14121606
// Get returns a persisted entry by ID.

0 commit comments

Comments
 (0)