Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manage pending in memory and move it to synchroniser #1844

Merged
merged 5 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 14 additions & 225 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"encoding/binary"
"errors"
"fmt"
"sync/atomic"
"time"

"github.com/Masterminds/semver/v3"
"github.com/NethermindEth/juno/core"
Expand Down Expand Up @@ -40,23 +38,21 @@ type Reader interface {
HeadState() (core.StateReader, StateCloser, error)
StateAtBlockHash(blockHash *felt.Felt) (core.StateReader, StateCloser, error)
StateAtBlockNumber(blockNumber uint64) (core.StateReader, StateCloser, error)
PendingState() (core.StateReader, StateCloser, error)

BlockCommitmentsByNumber(blockNumber uint64) (*core.BlockCommitments, error)

EventFilter(from *felt.Felt, keys [][]felt.Felt) (EventFilterer, error)

Pending() (Pending, error)

Network() *utils.Network
}

var (
ErrParentDoesNotMatchHead = errors.New("block's parent hash does not match head block hash")
ErrPendingBlockNotFound = errors.New("pending block not found")
SupportedStarknetVersion = semver.MustParse("0.13.3")
)

func checkBlockVersion(protocolVersion string) error {
func CheckBlockVersion(protocolVersion string) error {
kirugan marked this conversation as resolved.
Show resolved Hide resolved
blockVer, err := core.ParseBlockVersion(protocolVersion)
if err != nil {
return err
Expand All @@ -83,20 +79,19 @@ var _ Reader = (*Blockchain)(nil)

// Blockchain is responsible for keeping track of all things related to the Starknet blockchain
type Blockchain struct {
network *utils.Network
database db.DB

listener EventListener

cachedPending atomic.Pointer[Pending]
network *utils.Network
database db.DB
listener EventListener
pendingBlockFn func() *core.Block
}

func New(database db.DB, network *utils.Network) *Blockchain {
func New(database db.DB, network *utils.Network, pendingBlockFn func() *core.Block) *Blockchain {
RegisterCoreTypesToEncoder()
return &Blockchain{
database: database,
network: network,
listener: &SelectiveListener{},
database: database,
network: network,
listener: &SelectiveListener{},
pendingBlockFn: pendingBlockFn,
}
}

Expand Down Expand Up @@ -266,24 +261,6 @@ func (b *Blockchain) TransactionByHash(hash *felt.Felt) (core.Transaction, error
return transaction, b.database.View(func(txn db.Transaction) error {
var err error
transaction, err = transactionByHash(txn, hash)

// not found in the canonical blocks, try pending
if errors.Is(err, db.ErrKeyNotFound) {
var pending Pending
pending, err = b.pendingBlock(txn)
if err != nil {
return err
}

for _, t := range pending.Block.Transactions {
if hash.Equal(t.Hash()) {
transaction = t
return nil
}
}
return db.ErrKeyNotFound
}

return err
})
}
Expand All @@ -299,26 +276,6 @@ func (b *Blockchain) Receipt(hash *felt.Felt) (*core.TransactionReceipt, *felt.F
return receipt, blockHash, blockNumber, b.database.View(func(txn db.Transaction) error {
var err error
receipt, blockHash, blockNumber, err = receiptByHash(txn, hash)

// not found in the canonical blocks, try pending
if errors.Is(err, db.ErrKeyNotFound) {
var pending Pending
pending, err = b.pendingBlock(txn)
if err != nil {
return err
}

for i, t := range pending.Block.Transactions {
if hash.Equal(t.Hash()) {
receipt = pending.Block.Receipts[i]
blockHash = nil
blockNumber = 0
return nil
}
}
return db.ErrKeyNotFound
}

kirugan marked this conversation as resolved.
Show resolved Hide resolved
return err
})
}
Expand Down Expand Up @@ -389,10 +346,6 @@ func (b *Blockchain) Store(block *core.Block, blockCommitments *core.BlockCommit
return err
}

if err := b.storeEmptyPending(txn, block.Header); err != nil {
return err
}

// Head of the blockchain is maintained as follows:
// [db.ChainHeight]() -> (BlockNumber)
heightBin := core.MarshalBlockNumber(block.Number)
Expand All @@ -408,7 +361,7 @@ func (b *Blockchain) VerifyBlock(block *core.Block) error {
}

func verifyBlock(txn db.Transaction, block *core.Block) error {
if err := checkBlockVersion(block.ProtocolVersion); err != nil {
if err := CheckBlockVersion(block.ProtocolVersion); err != nil {
return err
}

Expand Down Expand Up @@ -866,7 +819,7 @@ func (b *Blockchain) EventFilter(from *felt.Felt, keys [][]felt.Felt) (EventFilt
return nil, err
}

return newEventFilter(txn, from, keys, 0, latest), nil
return newEventFilter(txn, from, keys, 0, latest, b.pendingBlockFn), nil
}

// RevertHead reverts the head block
Expand Down Expand Up @@ -936,23 +889,11 @@ func (b *Blockchain) revertHead(txn db.Transaction) error {
return err
}

// Revert chain height and pending.
// Revert chain height.
if genesisBlock {
if err = txn.Delete(db.Pending.Key()); err != nil {
return err
}
return txn.Delete(db.ChainHeight.Key())
}

var newHeader *core.Header
newHeader, err = blockHeaderByNumber(txn, blockNumber-1)
if err != nil {
return err
}
if err := b.storeEmptyPending(txn, newHeader); err != nil {
return err
}

heightBin := core.MarshalBlockNumber(blockNumber - 1)
return txn.Set(db.ChainHeight.Key(), heightBin)
}
Expand Down Expand Up @@ -988,155 +929,3 @@ func removeTxsAndReceipts(txn db.Transaction, blockNumber, numTxs uint64) error

return nil
}

func (b *Blockchain) storeEmptyPending(txn db.Transaction, latestHeader *core.Header) error {
receipts := make([]*core.TransactionReceipt, 0)
pendingBlock := &core.Block{
Header: &core.Header{
ParentHash: latestHeader.Hash,
SequencerAddress: latestHeader.SequencerAddress,
Number: latestHeader.Number + 1,
Timestamp: uint64(time.Now().Unix()),
ProtocolVersion: latestHeader.ProtocolVersion,
EventsBloom: core.EventsBloom(receipts),
GasPrice: latestHeader.GasPrice,
GasPriceSTRK: latestHeader.GasPriceSTRK,
},
Transactions: make([]core.Transaction, 0),
Receipts: receipts,
}

stateDiff, err := MakeStateDiffForEmptyBlock(b, latestHeader.Number+1)
if err != nil {
return err
}

emptyPending := &Pending{
Block: pendingBlock,
StateUpdate: &core.StateUpdate{
OldRoot: latestHeader.GlobalStateRoot,
StateDiff: stateDiff,
},
NewClasses: make(map[felt.Felt]core.Class, 0),
}
return b.storePending(txn, emptyPending)
}

// StorePending stores a pending block given that it is for the next height
func (b *Blockchain) StorePending(pending *Pending) error {
return b.database.Update(func(txn db.Transaction) error {
expectedParentHash := new(felt.Felt)
h, err := headsHeader(txn)
if err != nil && !errors.Is(err, db.ErrKeyNotFound) {
return err
} else if err == nil {
expectedParentHash = h.Hash
}

if !expectedParentHash.Equal(pending.Block.ParentHash) {
return ErrParentDoesNotMatchHead
}

if existingPending, err := b.pendingBlock(txn); err == nil {
if existingPending.Block.TransactionCount >= pending.Block.TransactionCount {
return nil // ignore the incoming pending if it has fewer transactions than the one we already have
}
pending.Block.Number = existingPending.Block.Number // Just in case the number is not set.
} else if !errors.Is(err, db.ErrKeyNotFound) { // Allow StorePending before block zero.
return err
}

return b.storePending(txn, pending)
})
}

func (b *Blockchain) storePending(txn db.Transaction, pending *Pending) error {
if err := storePending(txn, pending); err != nil {
return err
}
b.cachedPending.Store(pending)
return nil
}

func storePending(txn db.Transaction, pending *Pending) error {
pendingBytes, err := encoder.Marshal(pending)
if err != nil {
return err
}
return txn.Set(db.Pending.Key(), pendingBytes)
}

func (b *Blockchain) pendingBlock(txn db.Transaction) (Pending, error) {
if cachedPending := b.cachedPending.Load(); cachedPending != nil {
expectedParentHash := &felt.Zero
if head, err := headsHeader(txn); err == nil {
expectedParentHash = head.Hash
}
if cachedPending.Block.ParentHash.Equal(expectedParentHash) {
return *cachedPending, nil
}
}

// Either cachedPending was nil or wasn't consistent with the HEAD we have
// in the database, so read it directly from the database
return pendingBlock(txn)
}

func pendingBlock(txn db.Transaction) (Pending, error) {
var pending Pending
err := txn.Get(db.Pending.Key(), func(bytes []byte) error {
return encoder.Unmarshal(bytes, &pending)
})
return pending, err
}

// Pending returns the pending block from the database
func (b *Blockchain) Pending() (Pending, error) {
b.listener.OnRead("Pending")
var pending Pending
return pending, b.database.View(func(txn db.Transaction) error {
var err error
pending, err = b.pendingBlock(txn)
return err
})
}

// PendingState returns the state resulting from execution of the pending block
func (b *Blockchain) PendingState() (core.StateReader, StateCloser, error) {
b.listener.OnRead("PendingState")
txn, err := b.database.NewTransaction(false)
if err != nil {
return nil, nil, err
}

pending, err := b.pendingBlock(txn)
if err != nil {
return nil, nil, utils.RunAndWrapOnError(txn.Discard, err)
}

return NewPendingState(
pending.StateUpdate.StateDiff,
pending.NewClasses,
core.NewState(txn),
), txn.Discard, nil
}

func MakeStateDiffForEmptyBlock(bc Reader, blockNumber uint64) (*core.StateDiff, error) {
stateDiff := core.EmptyStateDiff()

const blockHashLag = 10
if blockNumber < blockHashLag {
return stateDiff, nil
}

header, err := bc.BlockHeaderByNumber(blockNumber - blockHashLag)
if err != nil {
return nil, err
}

blockHashStorageContract := new(felt.Felt).SetUint64(1)
stateDiff.StorageDiffs[*blockHashStorageContract] = map[felt.Felt]*felt.Felt{
*new(felt.Felt).SetUint64(header.Number): header.Hash,
}
return stateDiff, nil
}
Loading
Loading