Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Bug Fixes

- [#1007](https://github.com/cosmos/iavl/pull/1007) Add the extra check for the reformatted root node in `GetNode`
- [#]() nodedb latest version should not set to a partial committed version.

### Improvements

Expand Down
85 changes: 85 additions & 0 deletions batch_flusher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package iavl

import (
"fmt"
"math/rand"
"testing"

dbm "github.com/cosmos/iavl/db"
"github.com/stretchr/testify/require"
)

type MockDBBatch struct {
dbm.Batch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to corestore


// simulate low level system error
err error
}

func (b *MockDBBatch) Write() error {
if b.err != nil {
return b.err
}
return b.Batch.Write()
}

func (b *MockDBBatch) WriteSync() error {
if b.err != nil {
return b.err
}
return b.Batch.WriteSync()
}

type MockDB struct {
dbm.DB

batchIndex int
// simulate low level system error at i-th batch write
errors map[int]error
}

func (db *MockDB) NewBatch() dbm.Batch {
err, _ := db.errors[db.batchIndex]
batch := &MockDBBatch{Batch: db.DB.NewBatch(), err: err}
db.batchIndex++
return batch
}

func (db *MockDB) NewBatchWithSize(size int) dbm.Batch {
return db.NewBatch()
}

func TestBatchFlusher(t *testing.T) {
db := &MockDB{DB: dbm.NewMemDB()}

{
tree := NewMutableTree(db, 10000, true, NewNopLogger())
v, err := tree.Load()
require.NoError(t, err)
require.Equal(t, int64(0), v)

// the batch size exceeds the threshold, and persist into db in two batches
for i := 0; i < 1000; i++ {
// random key value pairs
key := []byte(fmt.Sprintf("key-%064d", rand.Intn(100000000)))
value := []byte(fmt.Sprintf("value-%064d", rand.Intn(100000000)))
_, err := tree.Set(key, value)
require.NoError(t, err)
}

// the first batch write will success,
// the second batch write will fail
db.errors = map[int]error{
1: fmt.Errorf("filesystem failure"),
}
_, _, err = tree.SaveVersion()
require.Error(t, err)
}

{
tree := NewMutableTree(db, 10000, true, NewNopLogger())
v, err := tree.Load()
require.NoError(t, err)
require.Equal(t, int64(0), v)
}
}
12 changes: 8 additions & 4 deletions nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -913,13 +913,17 @@ func (ndb *nodeDB) getLatestVersion() (bool, int64, error) {
}
defer itr.Close()

if itr.Valid() {
for ; itr.Valid(); itr.Next() {
k := itr.Key()
var nk []byte
nodeKeyFormat.Scan(k, &nk)
latestVersion = GetNodeKey(nk).version
ndb.resetLatestVersion(latestVersion)
return true, latestVersion, nil
nodeKey := GetNodeKey(nk)
if bytes.Equal(nk, GetRootKey(nodeKey.version)) {
// only find the latest version when we find the root node
latestVersion = nodeKey.version
ndb.resetLatestVersion(latestVersion)
return true, latestVersion, nil
}
}

if err := itr.Error(); err != nil {
Expand Down