Skip to content

Commit

Permalink
Add bound options to txn.NewIterator (#2357)
Browse files Browse the repository at this point in the history
  • Loading branch information
weiihann authored Jan 6, 2025
1 parent f3b45d8 commit 360f07a
Show file tree
Hide file tree
Showing 21 changed files with 130 additions and 64 deletions.
23 changes: 10 additions & 13 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,20 +505,16 @@ func BlockByNumber(txn db.Transaction, number uint64) (*core.Block, error) {
}

func TransactionsByBlockNumber(txn db.Transaction, number uint64) ([]core.Transaction, error) {
iterator, err := txn.NewIterator()
numBytes := core.MarshalBlockNumber(number)
prefix := db.TransactionsByBlockNumberAndIndex.Key(numBytes)

iterator, err := txn.NewIterator(prefix, true)
if err != nil {
return nil, err
}

var txs []core.Transaction
numBytes := core.MarshalBlockNumber(number)

prefix := db.TransactionsByBlockNumberAndIndex.Key(numBytes)
for iterator.Seek(prefix); iterator.Valid(); iterator.Next() {
if !bytes.HasPrefix(iterator.Key(), prefix) {
break
}

for iterator.First(); iterator.Valid(); iterator.Next() {
val, vErr := iterator.Value()
if vErr != nil {
return nil, utils.RunAndWrapOnError(iterator.Close, vErr)
Expand All @@ -540,16 +536,17 @@ func TransactionsByBlockNumber(txn db.Transaction, number uint64) ([]core.Transa
}

func receiptsByBlockNumber(txn db.Transaction, number uint64) ([]*core.TransactionReceipt, error) {
iterator, err := txn.NewIterator()
numBytes := core.MarshalBlockNumber(number)
prefix := db.ReceiptsByBlockNumberAndIndex.Key(numBytes)

iterator, err := txn.NewIterator(prefix, true)
if err != nil {
return nil, err
}

var receipts []*core.TransactionReceipt
numBytes := core.MarshalBlockNumber(number)

prefix := db.ReceiptsByBlockNumberAndIndex.Key(numBytes)
for iterator.Seek(prefix); iterator.Valid(); iterator.Next() {
for iterator.First(); iterator.Valid(); iterator.Next() {
if !bytes.HasPrefix(iterator.Key(), prefix) {
break
}
Expand Down
2 changes: 1 addition & 1 deletion blockchain/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ func TestRevert(t *testing.T) {

t.Run("empty blockchain should mean empty db", func(t *testing.T) {
require.NoError(t, testdb.View(func(txn db.Transaction) error {
it, err := txn.NewIterator()
it, err := txn.NewIterator(nil, false)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion core/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (h *history) deleteLog(key []byte, height uint64) error {
}

func (h *history) valueAt(key []byte, height uint64) ([]byte, error) {
it, err := h.txn.NewIterator()
it, err := h.txn.NewIterator(nil, false)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion core/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ func TestRevert(t *testing.T) {

t.Run("empty state should mean empty db", func(t *testing.T) {
require.NoError(t, testDB.View(func(txn db.Transaction) error {
it, err := txn.NewIterator()
it, err := txn.NewIterator(nil, false)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion db/buffered_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,6 @@ func (t *BufferedTransaction) Impl() any {
}

// NewIterator : see db.Transaction.NewIterator
func (t *BufferedTransaction) NewIterator() (Iterator, error) {
func (t *BufferedTransaction) NewIterator(_ []byte, _ bool) (Iterator, error) {
return nil, errors.New("buffered transactions dont support iterators")
}
5 changes: 4 additions & 1 deletion db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type Iterator interface {
// Valid returns true if the iterator is positioned at a valid key/value pair.
Valid() bool

// First moves the iterator to the first key/value pair.
First() bool

// Next moves the iterator to the next key/value pair. It returns whether the
// iterator is valid after the call. Once invalid, the iterator remains
// invalid.
Expand All @@ -63,7 +66,7 @@ type Iterator interface {
// the transaction is committed.
type Transaction interface {
// NewIterator returns an iterator over the database's key/value pairs.
NewIterator() (Iterator, error)
NewIterator(lowerBound []byte, withUpperBound bool) (Iterator, error)
// Discard discards all the changes done to the database with this transaction
Discard() error
// Commit flushes all the changes pending on this transaction to the database, making the changes visible to other
Expand Down
2 changes: 1 addition & 1 deletion db/memory_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func NewMemTransaction() Transaction {
return &memTransaction{storage: make(map[string][]byte)}
}

func (t *memTransaction) NewIterator() (Iterator, error) {
func (t *memTransaction) NewIterator(_ []byte, _ bool) (Iterator, error) {
return nil, errors.New("not implemented")
}

Expand Down
9 changes: 7 additions & 2 deletions db/pebble/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,20 @@ func (b *batch) Get(key []byte, cb func([]byte) error) error {
}

// NewIterator : see db.Transaction.NewIterator
func (b *batch) NewIterator() (db.Iterator, error) {
func (b *batch) NewIterator(lowerBound []byte, withUpperBound bool) (db.Iterator, error) {
var iter *pebble.Iterator
var err error

if b.batch == nil {
return nil, ErrDiscardedTransaction
}

iter, err = b.batch.NewIter(nil)
iterOpt := &pebble.IterOptions{LowerBound: lowerBound}
if withUpperBound {
iterOpt.UpperBound = upperBound(lowerBound)
}

iter, err = b.batch.NewIter(iterOpt)
if err != nil {
return nil, err
}
Expand Down
8 changes: 8 additions & 0 deletions db/pebble/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,14 @@ func CalculatePrefixSize(ctx context.Context, pDB *DB, prefix []byte, withUpperB
return item, utils.RunAndWrapOnError(it.Close, err)
}

// Calculates the next possible prefix after the given prefix bytes.
// It's used to establish an upper boundary for prefix-based database scans.
// Examples:
//
// [1] -> [2]
// [1, 255, 255] -> [2]
// [1, 2, 255] -> [1, 3]
// [255, 255] -> nil
func upperBound(prefix []byte) []byte {
var ub []byte

Expand Down
89 changes: 63 additions & 26 deletions db/pebble/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func TestSeek(t *testing.T) {
require.NoError(t, txn.Set([]byte{3}, []byte{3}))

t.Run("seeks to the next key in lexicographical order", func(t *testing.T) {
iter, err := txn.NewIterator()
iter, err := txn.NewIterator(nil, false)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, iter.Close())
Expand All @@ -275,7 +275,7 @@ func TestSeek(t *testing.T) {
})

t.Run("key returns nil when seeking nonexistent data", func(t *testing.T) {
iter, err := txn.NewIterator()
iter, err := txn.NewIterator(nil, false)
require.NoError(t, err)

t.Cleanup(func() {
Expand All @@ -289,52 +289,56 @@ func TestSeek(t *testing.T) {

func TestPrefixSearch(t *testing.T) {
type entry struct {
key uint64
value []byte
prefix []byte
key uint64
value []byte
}

data := []entry{
{11, []byte("c")},
{12, []byte("a")},
{13, []byte("e")},
{22, []byte("d")},
{23, []byte("b")},
{123, []byte("f")},
{[]byte{11}, 1, []byte("c")},
{[]byte{11}, 2, []byte("a")},
{[]byte{11}, 3, []byte("e")},
{[]byte{12}, 4, []byte("d")},
{[]byte{23}, 5, []byte("b")},
{[]byte{123}, 6, []byte("f")},
{[]byte{0}, 7, []byte("g")},
}

testDB := pebble.NewMemTest(t)

require.NoError(t, testDB.Update(func(txn db.Transaction) error {
for _, d := range data {
numBytes := make([]byte, 8)
binary.BigEndian.PutUint64(numBytes, d.key)
require.NoError(t, txn.Set(numBytes, d.value))
keyBytes := make([]byte, 8)
binary.BigEndian.PutUint64(keyBytes, d.key)
var dbKey []byte
dbKey = append(dbKey, d.prefix...)
dbKey = append(dbKey, keyBytes...)
require.NoError(t, txn.Set(dbKey, d.value))
}
return nil
}))

require.NoError(t, testDB.View(func(txn db.Transaction) error {
iter, err := txn.NewIterator()
targetPrefix := []byte{11}
iter, err := txn.NewIterator(targetPrefix, true)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, iter.Close())
})

prefixBytes := make([]byte, 8)
binary.BigEndian.PutUint64(prefixBytes, 1)

var entries []entry
for iter.Seek(prefixBytes); iter.Valid(); iter.Next() {
key := binary.BigEndian.Uint64(iter.Key())
if key >= 20 {
break
}
for iter.First(); iter.Valid(); iter.Next() {
key := iter.Key()
key = key[len(targetPrefix):]
keyUint64 := binary.BigEndian.Uint64(key)

v, err := iter.Value()
require.NoError(t, err)
entries = append(entries, entry{key, v})

entries = append(entries, entry{targetPrefix, keyUint64, v})
}

expectedKeys := []uint64{11, 12, 13}
expectedKeys := []uint64{1, 2, 3}

assert.Equal(t, len(expectedKeys), len(entries))

Expand All @@ -346,6 +350,39 @@ func TestPrefixSearch(t *testing.T) {
}))
}

func TestFirst(t *testing.T) {
testDB := pebble.NewMemTest(t)

txn, err := testDB.NewTransaction(true)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, txn.Discard())
})
require.NoError(t, txn.Set([]byte{0}, []byte{0}))
require.NoError(t, txn.Set([]byte{1}, []byte{1}))
require.NoError(t, txn.Set([]byte{2}, []byte{2}))

t.Run("First() on new iterator", func(t *testing.T) {
iter, err := txn.NewIterator(nil, false)
require.NoError(t, err)
assert.Equal(t, true, iter.First())
assert.Equal(t, []byte{0}, iter.Key())
require.NoError(t, iter.Close())
})

t.Run("First() after multiple Next()", func(t *testing.T) {
iter, err := txn.NewIterator(nil, false)
require.NoError(t, err)
assert.Equal(t, true, iter.Next())
assert.Equal(t, []byte{0}, iter.Key())
assert.Equal(t, true, iter.Next())
assert.Equal(t, []byte{1}, iter.Key())
assert.Equal(t, true, iter.First())
assert.Equal(t, []byte{0}, iter.Key())
require.NoError(t, iter.Close())
})
}

func TestNext(t *testing.T) {
testDB := pebble.NewMemTest(t)

Expand All @@ -359,7 +396,7 @@ func TestNext(t *testing.T) {
require.NoError(t, txn.Set([]byte{2}, []byte{2}))

t.Run("Next() on new iterator", func(t *testing.T) {
it, err := txn.NewIterator()
it, err := txn.NewIterator(nil, false)
require.NoError(t, err)

t.Run("new iterator should be invalid", func(t *testing.T) {
Expand All @@ -374,7 +411,7 @@ func TestNext(t *testing.T) {
})

t.Run("Next() should work as expected after a Seek()", func(t *testing.T) {
it, err := txn.NewIterator()
it, err := txn.NewIterator(nil, false)
require.NoError(t, err)

require.True(t, it.Seek([]byte{0}))
Expand Down
5 changes: 5 additions & 0 deletions db/pebble/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ func (i *iterator) Value() ([]byte, error) {
return buf, nil
}

func (i *iterator) First() bool {
i.positioned = true
return i.iter.First()
}

// Next : see db.Transaction.Iterator.Next
func (i *iterator) Next() bool {
if !i.positioned {
Expand Down
9 changes: 7 additions & 2 deletions db/pebble/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,20 @@ func (s *snapshot) Get(key []byte, cb func([]byte) error) error {
}

// NewIterator : see db.Transaction.NewIterator
func (s *snapshot) NewIterator() (db.Iterator, error) {
func (s *snapshot) NewIterator(lowerBound []byte, withUpperBound bool) (db.Iterator, error) {
var iter *pebble.Iterator
var err error

if s.snapshot == nil {
return nil, ErrDiscardedTransaction
}

iter, err = s.snapshot.NewIter(nil)
iterOpt := &pebble.IterOptions{LowerBound: lowerBound}
if withUpperBound {
iterOpt.UpperBound = upperBound(lowerBound)
}

iter, err = s.snapshot.NewIter(iterOpt)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions db/remote/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestRemote(t *testing.T) {

t.Run("iterate", func(t *testing.T) {
err := remoteDB.View(func(txn db.Transaction) error {
it, err := txn.NewIterator()
it, err := txn.NewIterator(nil, false)
if err != nil {
return err
}
Expand All @@ -85,7 +85,7 @@ func TestRemote(t *testing.T) {

t.Run("seek", func(t *testing.T) {
err := remoteDB.View(func(txn db.Transaction) error {
it, err := txn.NewIterator()
it, err := txn.NewIterator(nil, false)
if err != nil {
return err
}
Expand Down
7 changes: 7 additions & 0 deletions db/remote/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ func (i *iterator) Value() ([]byte, error) {
return i.currentV, nil
}

func (i *iterator) First() bool {
if err := i.doOpAndUpdate(gen.Op_FIRST, nil); err != nil {
i.log.Debugw("Error", "op", gen.Op_FIRST, "err", err)
}
return len(i.currentK) > 0 || len(i.currentV) > 0
}

func (i *iterator) Next() bool {
if err := i.doOpAndUpdate(gen.Op_NEXT, nil); err != nil {
i.log.Debugw("Error", "op", gen.Op_NEXT, "err", err)
Expand Down
2 changes: 1 addition & 1 deletion db/remote/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type transaction struct {
log utils.SimpleLogger
}

func (t *transaction) NewIterator() (db.Iterator, error) {
func (t *transaction) NewIterator(_ []byte, _ bool) (db.Iterator, error) {
err := t.client.Send(&gen.Cursor{
Op: gen.Op_OPEN,
})
Expand Down
2 changes: 1 addition & 1 deletion db/sync_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,6 @@ func (t *SyncTransaction) Impl() any {
}

// NewIterator : see db.Transaction.NewIterator
func (t *SyncTransaction) NewIterator() (Iterator, error) {
func (t *SyncTransaction) NewIterator(_ []byte, _ bool) (Iterator, error) {
return nil, errors.New("sync transactions dont support iterators")
}
2 changes: 1 addition & 1 deletion grpc/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func newTx(dbTx db.Transaction) *tx {
}

func (t *tx) newCursor() (uint32, error) {
it, err := t.dbTx.NewIterator()
it, err := t.dbTx.NewIterator(nil, false)
if err != nil {
return 0, err
}
Expand Down
Loading

0 comments on commit 360f07a

Please sign in to comment.