diff --git a/CHANGELOG.md b/CHANGELOG.md index b018c8799..d1e0c0372 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ ### Bug Fixes - [#1007](https://github.com/cosmos/iavl/pull/1007) Add the extra check for the reformatted root node in `GetNode` +- [#1106](https://github.com/cosmos/iavl/pull/1106) Fix unlock of unlocked mutex panic. +- [#1107](https://github.com/cosmos/iavl/pull/1107) nodedb latest version should not set to a partial committed version. ### Improvements diff --git a/batch.go b/batch.go index 7c898a9c9..3c44fae5a 100644 --- a/batch.go +++ b/batch.go @@ -57,11 +57,9 @@ func (b *BatchWithFlusher) Set(key, value []byte) error { return err } if batchSizeAfter > b.flushThreshold { - b.mtx.Unlock() - if err := b.Write(); err != nil { + if err := b.write(); err != nil { return err } - b.mtx.Lock() } return b.batch.Set(key, value) } @@ -79,19 +77,14 @@ func (b *BatchWithFlusher) Delete(key []byte) error { return err } if batchSizeAfter > b.flushThreshold { - b.mtx.Unlock() - if err := b.Write(); err != nil { + if err := b.write(); err != nil { return err } - b.mtx.Lock() } return b.batch.Delete(key) } -func (b *BatchWithFlusher) Write() error { - b.mtx.Lock() - defer b.mtx.Unlock() - +func (b *BatchWithFlusher) write() error { if err := b.batch.Write(); err != nil { return err } @@ -102,6 +95,13 @@ func (b *BatchWithFlusher) Write() error { return nil } +func (b *BatchWithFlusher) Write() error { + b.mtx.Lock() + defer b.mtx.Unlock() + + return b.write() +} + func (b *BatchWithFlusher) WriteSync() error { b.mtx.Lock() defer b.mtx.Unlock() diff --git a/batch_flusher_test.go b/batch_flusher_test.go new file mode 100644 index 000000000..c0c8ade07 --- /dev/null +++ b/batch_flusher_test.go @@ -0,0 +1,86 @@ +package iavl + +import ( + "fmt" + "math/rand" + "testing" + + corestore "cosmossdk.io/core/store" + dbm "github.com/cosmos/iavl/db" + "github.com/stretchr/testify/require" +) + +type MockDBBatch struct { + corestore.Batch + + // simulate low level system error + err error +} + +func (b *MockDBBatch) Write() error { + if b.err != nil { + return b.err + } + return b.Batch.Write() +} + +func (b *MockDBBatch) WriteSync() error { + if b.err != nil { + return b.err + } + return b.Batch.WriteSync() +} + +type MockDB struct { + dbm.DB + + batchIndex int + // simulate low level system error at i-th batch write + errors map[int]error +} + +func (db *MockDB) NewBatch() corestore.Batch { + err, _ := db.errors[db.batchIndex] + batch := &MockDBBatch{Batch: db.DB.NewBatch(), err: err} + db.batchIndex++ + return batch +} + +func (db *MockDB) NewBatchWithSize(size int) corestore.Batch { + return db.NewBatch() +} + +func TestBatchFlusher(t *testing.T) { + db := &MockDB{DB: dbm.NewMemDB()} + + { + tree := NewMutableTree(db, 10000, true, NewNopLogger()) + v, err := tree.Load() + require.NoError(t, err) + require.Equal(t, int64(0), v) + + // the batch size exceeds the threshold, and persist into db in two batches + for i := 0; i < 1000; i++ { + // random key value pairs + key := []byte(fmt.Sprintf("key-%064d", rand.Intn(100000000))) + value := []byte(fmt.Sprintf("value-%064d", rand.Intn(100000000))) + _, err := tree.Set(key, value) + require.NoError(t, err) + } + + // the first batch write will success, + // the second batch write will fail + db.errors = map[int]error{ + 1: fmt.Errorf("filesystem failure"), + } + _, _, err = tree.SaveVersion() + require.Error(t, err) + } + + { + tree := NewMutableTree(db, 10000, true, NewNopLogger()) + v, err := tree.Load() + require.NoError(t, err) + require.Equal(t, int64(0), v) + } +} diff --git a/nodedb.go b/nodedb.go index 6e9b1e2a4..ec711cd9c 100644 --- a/nodedb.go +++ b/nodedb.go @@ -913,13 +913,17 @@ func (ndb *nodeDB) getLatestVersion() (bool, int64, error) { } defer itr.Close() - if itr.Valid() { + for ; itr.Valid(); itr.Next() { k := itr.Key() var nk []byte nodeKeyFormat.Scan(k, &nk) - latestVersion = GetNodeKey(nk).version - ndb.resetLatestVersion(latestVersion) - return true, latestVersion, nil + nodeKey := GetNodeKey(nk) + if bytes.Equal(nk, GetRootKey(nodeKey.version)) { + // only find the latest version when we find the root node + latestVersion = nodeKey.version + ndb.resetLatestVersion(latestVersion) + return true, latestVersion, nil + } } if err := itr.Error(); err != nil {