Skip to content

Commit 61b3889

Browse files
authored
Merge pull request #104 from ipfs/fix/no-txns-for-batch
Do not implement batches using transactions
2 parents ff1c156 + bfd7676 commit 61b3889

File tree

2 files changed

+135
-2
lines changed

2 files changed

+135
-2
lines changed

datastore.go

+70-2
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,13 @@ type Datastore struct {
4444
syncWrites bool
4545
}
4646

47+
// Implements the datastore.Batch interface, enabling batching support for
48+
// the badger Datastore.
49+
type batch struct {
50+
ds *Datastore
51+
writeBatch *badger.WriteBatch
52+
}
53+
4754
// Implements the datastore.Txn interface, enabling transaction support for
4855
// the badger Datastore.
4956
type txn struct {
@@ -112,6 +119,7 @@ var _ ds.Datastore = (*Datastore)(nil)
112119
var _ ds.TxnDatastore = (*Datastore)(nil)
113120
var _ ds.TTLDatastore = (*Datastore)(nil)
114121
var _ ds.GCDatastore = (*Datastore)(nil)
122+
var _ ds.Batching = (*Datastore)(nil)
115123

116124
// NewDatastore creates a new badger datastore.
117125
//
@@ -388,9 +396,21 @@ func (d *Datastore) Close() error {
388396
return d.DB.Close()
389397
}
390398

399+
// Batch creats a new Batch object. This provides a way to do many writes, when
400+
// there may be too many to fit into a single transaction.
401+
//
402+
// After writing to a Batch, always call Commit whether or not writing to the
403+
// batch was completed successfully or not. This is necessary to flush any
404+
// remaining data and free any resources associated with an incomplete
405+
// transaction.
391406
func (d *Datastore) Batch() (ds.Batch, error) {
392-
tx, _ := d.NewTransaction(false)
393-
return tx, nil
407+
d.closeLk.RLock()
408+
defer d.closeLk.RUnlock()
409+
if d.closed {
410+
return nil, ErrClosed
411+
}
412+
413+
return &batch{d, d.DB.NewWriteBatch()}, nil
394414
}
395415

396416
func (d *Datastore) CollectGarbage() (err error) {
@@ -416,6 +436,54 @@ func (d *Datastore) gcOnce() error {
416436
return d.DB.RunValueLogGC(d.gcDiscardRatio)
417437
}
418438

439+
var _ ds.Batch = (*batch)(nil)
440+
441+
func (b *batch) Put(key ds.Key, value []byte) error {
442+
b.ds.closeLk.RLock()
443+
defer b.ds.closeLk.RUnlock()
444+
if b.ds.closed {
445+
return ErrClosed
446+
}
447+
return b.put(key, value)
448+
}
449+
450+
func (b *batch) put(key ds.Key, value []byte) error {
451+
return b.writeBatch.Set(key.Bytes(), value)
452+
}
453+
454+
func (b *batch) Delete(key ds.Key) error {
455+
b.ds.closeLk.RLock()
456+
defer b.ds.closeLk.RUnlock()
457+
if b.ds.closed {
458+
return ErrClosed
459+
}
460+
461+
return b.delete(key)
462+
}
463+
464+
func (b *batch) delete(key ds.Key) error {
465+
return b.writeBatch.Delete(key.Bytes())
466+
}
467+
468+
func (b *batch) Commit() error {
469+
b.ds.closeLk.RLock()
470+
defer b.ds.closeLk.RUnlock()
471+
if b.ds.closed {
472+
return ErrClosed
473+
}
474+
475+
return b.commit()
476+
}
477+
478+
func (b *batch) commit() error {
479+
err := b.writeBatch.Flush()
480+
if err != nil {
481+
// Discard incomplete transaction held by b.writeBatch
482+
b.writeBatch.Cancel()
483+
}
484+
return err
485+
}
486+
419487
var _ ds.Datastore = (*txn)(nil)
420488
var _ ds.TTLDatastore = (*txn)(nil)
421489

ds_test.go

+65
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,71 @@ func TestBatching(t *testing.T) {
309309

310310
}
311311

312+
func TestBatchingRequired(t *testing.T) {
313+
path, err := ioutil.TempDir(os.TempDir(), "testing_badger_")
314+
if err != nil {
315+
t.Fatal(err)
316+
}
317+
318+
dsOpts := DefaultOptions
319+
d, err := NewDatastore(path, &dsOpts)
320+
if err != nil {
321+
t.Fatal(err)
322+
}
323+
defer func() {
324+
d.Close()
325+
os.RemoveAll(path)
326+
}()
327+
328+
const valSize = 1000
329+
330+
// Check that transaction fails when there are too many writes. This is
331+
// not testing batching logic, but is here to prove that batching works
332+
// where a transaction fails.
333+
t.Logf("putting %d byte values until transaction overflows", valSize)
334+
tx, err := d.NewTransaction(false)
335+
if err != nil {
336+
t.Fatal(err)
337+
}
338+
var puts int
339+
for ; puts < 10000000; puts++ {
340+
buf := make([]byte, valSize)
341+
rand.Read(buf)
342+
err = tx.Put(ds.NewKey(fmt.Sprintf("/key%d", puts)), buf)
343+
if err != nil {
344+
break
345+
}
346+
puts++
347+
}
348+
if err == nil {
349+
t.Error("expected transaction to fail")
350+
} else {
351+
t.Logf("OK - transaction cannot handle %d puts: %s", puts, err)
352+
}
353+
tx.Discard()
354+
355+
// Check that batch succeeds with the same number of writes that caused a
356+
// transaction to fail.
357+
t.Logf("putting %d %d byte values using batch", puts, valSize)
358+
b, err := d.Batch()
359+
if err != nil {
360+
t.Fatal(err)
361+
}
362+
for i := 0; i < puts; i++ {
363+
buf := make([]byte, valSize)
364+
rand.Read(buf)
365+
err = b.Put(ds.NewKey(fmt.Sprintf("/key%d", i)), buf)
366+
if err != nil {
367+
t.Fatal(err)
368+
}
369+
}
370+
371+
err = b.Commit()
372+
if err != nil {
373+
t.Fatal(err)
374+
}
375+
}
376+
312377
// Tests from basic_tests from go-datastore
313378

314379
func TestBasicPutGet(t *testing.T) {

0 commit comments

Comments
 (0)