Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement a mempool for the sequencer #2341

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions db/buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ const (
Temporary // used temporarily for migrations
SchemaIntermediateState
L1HandlerTxnHashByMsgHash // maps l1 handler msg hash to l1 handler txn hash
MempoolHead // key of the head node
MempoolTail // key of the tail node
MempoolLength // number of transactions
MempoolNode
)

// Key flattens a prefix and series of byte arrays into a single []byte.
Expand Down
63 changes: 63 additions & 0 deletions mempool/db_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package mempool

import (
"errors"
"math/big"

"github.com/NethermindEth/juno/core/felt"
"github.com/NethermindEth/juno/db"
"github.com/NethermindEth/juno/encoder"
)

func headValue(txn db.Transaction, head *felt.Felt) error {
return txn.Get(db.MempoolHead.Key(), func(b []byte) error {
head.SetBytes(b)
return nil
})
}

func tailValue(txn db.Transaction, tail *felt.Felt) error {
return txn.Get(db.MempoolTail.Key(), func(b []byte) error {
tail.SetBytes(b)
return nil
})
}

func updateHead(txn db.Transaction, head *felt.Felt) error {
return txn.Set(db.MempoolHead.Key(), head.Marshal())
}

func updateTail(txn db.Transaction, tail *felt.Felt) error {
return txn.Set(db.MempoolTail.Key(), tail.Marshal())
}

func readDBElem(txn db.Transaction, itemKey *felt.Felt) (dbPoolTxn, error) {
var item dbPoolTxn
keyBytes := itemKey.Bytes()
err := txn.Get(db.MempoolNode.Key(keyBytes[:]), func(b []byte) error {
return encoder.Unmarshal(b, &item)
})
return item, err
}

func setDBElem(txn db.Transaction, item *dbPoolTxn) error {
itemBytes, err := encoder.Marshal(item)
if err != nil {
return err
}

Check warning on line 47 in mempool/db_utils.go

View check run for this annotation

Codecov / codecov/patch

mempool/db_utils.go#L46-L47

Added lines #L46 - L47 were not covered by tests
keyBytes := item.Txn.Transaction.Hash().Bytes()
return txn.Set(db.MempoolNode.Key(keyBytes[:]), itemBytes)
}

func lenDB(txn db.Transaction) (int, error) {
var l int
err := txn.Get(db.MempoolLength.Key(), func(b []byte) error {
l = int(new(big.Int).SetBytes(b).Int64())
return nil
})

if err != nil && errors.Is(err, db.ErrKeyNotFound) {
return 0, nil
}
return l, err
}
302 changes: 302 additions & 0 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,302 @@
package mempool

import (
"errors"
"fmt"
"math/big"
"sync"

"github.com/NethermindEth/juno/core"
"github.com/NethermindEth/juno/core/felt"
"github.com/NethermindEth/juno/db"
"github.com/NethermindEth/juno/utils"
)

var ErrTxnPoolFull = errors.New("transaction pool is full")

type BroadcastedTransaction struct {
rodrigo-pino marked this conversation as resolved.
Show resolved Hide resolved
Transaction core.Transaction
DeclaredClass core.Class
}

// runtime mempool txn
type memPoolTxn struct {
Txn BroadcastedTransaction
Next *memPoolTxn
}

// persistent db txn value
type dbPoolTxn struct {
Txn BroadcastedTransaction
NextHash *felt.Felt
}

// memTxnList represents a linked list of user transactions at runtime
type memTxnList struct {
head *memPoolTxn
tail *memPoolTxn
len int
mu sync.Mutex
}

func (t *memTxnList) push(newNode *memPoolTxn) {
t.mu.Lock()
defer t.mu.Unlock()
if t.tail != nil {
t.tail.Next = newNode
t.tail = newNode
} else {
t.head = newNode
t.tail = newNode
}
t.len++
}

func (t *memTxnList) pop() (BroadcastedTransaction, error) {
t.mu.Lock()
defer t.mu.Unlock()

if t.head == nil {
return BroadcastedTransaction{}, errors.New("transaction pool is empty")
}

headNode := t.head
t.head = headNode.Next
if t.head == nil {
t.tail = nil
}
t.len--
return headNode.Txn, nil
}

// Pool represents a blockchain mempool, managing transactions using both an
// in-memory and persistent database.
type Pool struct {
log utils.SimpleLogger
state core.StateReader
db db.DB // to store the persistent mempool
txPushed chan struct{}
memTxnList *memTxnList
maxNumTxns int
dbWriteChan chan *BroadcastedTransaction
wg sync.WaitGroup
}

// New initialises the Pool and starts the database writer goroutine.
// It is the responsibility of the caller to execute the closer function.
func New(mainDB db.DB, state core.StateReader, maxNumTxns int, log utils.SimpleLogger) (*Pool, func() error) {
pool := &Pool{
log: log,
state: state,
db: mainDB, // todo: txns should be deleted everytime a new block is stored (builder responsibility)
txPushed: make(chan struct{}, 1),
memTxnList: &memTxnList{},
maxNumTxns: maxNumTxns,
dbWriteChan: make(chan *BroadcastedTransaction, maxNumTxns),
}
closer := func() error {
close(pool.dbWriteChan)
pool.wg.Wait()
if err := pool.db.Close(); err != nil {
return fmt.Errorf("failed to close mempool database: %v", err)
}

Check warning on line 102 in mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

mempool/mempool.go#L101-L102

Added lines #L101 - L102 were not covered by tests
return nil
}
pool.dbWriter()
return pool, closer
}

func (p *Pool) dbWriter() {
p.wg.Add(1)
go func() {
defer p.wg.Done()
for txn := range p.dbWriteChan {
err := p.writeToDB(txn)
if err != nil {
p.log.Errorw("error in handling user transaction in persistent mempool", "err", err)
}

Check warning on line 117 in mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

mempool/mempool.go#L116-L117

Added lines #L116 - L117 were not covered by tests
}
}()
}

// LoadFromDB restores the in-memory transaction pool from the database
func (p *Pool) LoadFromDB() error {
return p.db.View(func(txn db.Transaction) error {
headVal := new(felt.Felt)
err := headValue(txn, headVal)
if err != nil {
if errors.Is(err, db.ErrKeyNotFound) {
return nil
}
return err

Check warning on line 131 in mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

mempool/mempool.go#L131

Added line #L131 was not covered by tests
}
// loop through the persistent pool and push nodes to the in-memory pool
currentHash := headVal
for currentHash != nil {
curDBElem, err := readDBElem(txn, currentHash)
if err != nil {
return err
}

Check warning on line 139 in mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

mempool/mempool.go#L138-L139

Added lines #L138 - L139 were not covered by tests
newMemPoolTxn := &memPoolTxn{
Txn: curDBElem.Txn,
}
if curDBElem.NextHash != nil {
nextDBTxn, err := readDBElem(txn, curDBElem.NextHash)
if err != nil {
return err
}

Check warning on line 147 in mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

mempool/mempool.go#L146-L147

Added lines #L146 - L147 were not covered by tests
newMemPoolTxn.Next = &memPoolTxn{
Txn: nextDBTxn.Txn,
}
}
p.memTxnList.push(newMemPoolTxn)
currentHash = curDBElem.NextHash
}
return nil
})
}

// writeToDB adds the transaction to the persistent pool db
func (p *Pool) writeToDB(userTxn *BroadcastedTransaction) error {
return p.db.Update(func(dbTxn db.Transaction) error {
tailVal := new(felt.Felt)
if err := tailValue(dbTxn, tailVal); err != nil {
if !errors.Is(err, db.ErrKeyNotFound) {
return err
}

Check warning on line 166 in mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

mempool/mempool.go#L165-L166

Added lines #L165 - L166 were not covered by tests
tailVal = nil
}
if err := setDBElem(dbTxn, &dbPoolTxn{Txn: *userTxn}); err != nil {
return err
}

Check warning on line 171 in mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

mempool/mempool.go#L170-L171

Added lines #L170 - L171 were not covered by tests
if tailVal != nil {
// Update old tail to point to the new item
var oldTailElem dbPoolTxn
oldTailElem, err := readDBElem(dbTxn, tailVal)
if err != nil {
return err
}

Check warning on line 178 in mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

mempool/mempool.go#L177-L178

Added lines #L177 - L178 were not covered by tests
oldTailElem.NextHash = userTxn.Transaction.Hash()
if err = setDBElem(dbTxn, &oldTailElem); err != nil {
return err
}

Check warning on line 182 in mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

mempool/mempool.go#L181-L182

Added lines #L181 - L182 were not covered by tests
} else {
// Empty list, make new item both the head and the tail
if err := updateHead(dbTxn, userTxn.Transaction.Hash()); err != nil {
return err
}

Check warning on line 187 in mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

mempool/mempool.go#L186-L187

Added lines #L186 - L187 were not covered by tests
}
if err := updateTail(dbTxn, userTxn.Transaction.Hash()); err != nil {
return err
}

Check warning on line 191 in mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

mempool/mempool.go#L190-L191

Added lines #L190 - L191 were not covered by tests
pLen, err := lenDB(dbTxn)
if err != nil {
return err
}

Check warning on line 195 in mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

mempool/mempool.go#L194-L195

Added lines #L194 - L195 were not covered by tests
return dbTxn.Set(db.MempoolLength.Key(), new(big.Int).SetInt64(int64(pLen+1)).Bytes())
})
}

// Push queues a transaction to the pool
func (p *Pool) Push(userTxn *BroadcastedTransaction) error {
err := p.validate(userTxn)
if err != nil {
return err
}

select {
case p.dbWriteChan <- userTxn:
default:
select {
case _, ok := <-p.dbWriteChan:
if !ok {
p.log.Errorw("cannot store user transasction in persistent pool, database write channel is closed")
}
p.log.Errorw("cannot store user transasction in persistent pool, database is full")
default:
p.log.Errorw("cannot store user transasction in persistent pool, database is full")

Check warning on line 217 in mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

mempool/mempool.go#L209-L217

Added lines #L209 - L217 were not covered by tests
}
}

newNode := &memPoolTxn{Txn: *userTxn, Next: nil}
p.memTxnList.push(newNode)

select {
case p.txPushed <- struct{}{}:
default:
}

return nil
}

func (p *Pool) validate(userTxn *BroadcastedTransaction) error {
if p.memTxnList.len+1 >= p.maxNumTxns {
return ErrTxnPoolFull
}

switch t := userTxn.Transaction.(type) {
case *core.DeployTransaction:
return fmt.Errorf("deploy transactions are not supported")
case *core.DeployAccountTransaction:
if !t.Nonce.IsZero() {
return fmt.Errorf("validation failed, received non-zero nonce %s", t.Nonce)
}
case *core.DeclareTransaction:
nonce, err := p.state.ContractNonce(t.SenderAddress)
if err != nil {
return fmt.Errorf("validation failed, error when retrieving nonce, %v", err)
}
if nonce.Cmp(t.Nonce) > 0 {
return fmt.Errorf("validation failed, existing nonce %s, but received nonce %s", nonce, t.Nonce)
}

Check warning on line 251 in mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

mempool/mempool.go#L238-L251

Added lines #L238 - L251 were not covered by tests
case *core.InvokeTransaction:
if t.TxVersion().Is(0) { // cant verify nonce since SenderAddress was only added in v1
return fmt.Errorf("invoke v0 transactions not supported")
}

Check warning on line 255 in mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

mempool/mempool.go#L254-L255

Added lines #L254 - L255 were not covered by tests
nonce, err := p.state.ContractNonce(t.SenderAddress)
if err != nil {
return fmt.Errorf("validation failed, error when retrieving nonce, %v", err)
}

Check warning on line 259 in mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

mempool/mempool.go#L258-L259

Added lines #L258 - L259 were not covered by tests
if nonce.Cmp(t.Nonce) > 0 {
return fmt.Errorf("validation failed, existing nonce %s, but received nonce %s", nonce, t.Nonce)
}
case *core.L1HandlerTransaction:

Check warning on line 263 in mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

mempool/mempool.go#L261-L263

Added lines #L261 - L263 were not covered by tests
// todo: verification of the L1 handler nonce requires checking the
// message nonce on the L1 Core Contract.
}
return nil
}

// Pop returns the transaction with the highest priority from the in-memory pool
func (p *Pool) Pop() (BroadcastedTransaction, error) {
rianhughes marked this conversation as resolved.
Show resolved Hide resolved
return p.memTxnList.pop()
}

// Remove removes a set of transactions from the pool
// todo: should be called by the builder to remove txns from the db everytime a new block is stored.
// todo: in the consensus+p2p world, the txns should also be removed from the in-memory pool.
func (p *Pool) Remove(hash ...*felt.Felt) error {
return errors.New("not implemented")

Check warning on line 279 in mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

mempool/mempool.go#L278-L279

Added lines #L278 - L279 were not covered by tests
}

// Len returns the number of transactions in the in-memory pool
func (p *Pool) Len() int {
return p.memTxnList.len
}

func (p *Pool) Wait() <-chan struct{} {
return p.txPushed
}

// Len returns the number of transactions in the persistent pool
func (p *Pool) LenDB() (int, error) {
txn, err := p.db.NewTransaction(false)
if err != nil {
return 0, err
}

Check warning on line 296 in mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

mempool/mempool.go#L295-L296

Added lines #L295 - L296 were not covered by tests
lenDB, err := lenDB(txn)
if err != nil {
return 0, err
}

Check warning on line 300 in mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

mempool/mempool.go#L299-L300

Added lines #L299 - L300 were not covered by tests
return lenDB, txn.Discard()
}
Loading
Loading