diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index 43a9d8fb77..4fd5490158 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -5,7 +5,6 @@ import ( "encoding/binary" "errors" "fmt" - "sync/atomic" "github.com/Masterminds/semver/v3" "github.com/NethermindEth/juno/core" @@ -37,24 +36,20 @@ type Reader interface { HeadState() (core.StateReader, StateCloser, error) StateAtBlockHash(blockHash *felt.Felt) (core.StateReader, StateCloser, error) StateAtBlockNumber(blockNumber uint64) (core.StateReader, StateCloser, error) - PendingState() (core.StateReader, StateCloser, error) BlockCommitmentsByNumber(blockNumber uint64) (*core.BlockCommitments, error) EventFilter(from *felt.Felt, keys [][]felt.Felt) (*EventFilter, error) - Pending() (*Pending, error) - Network() *utils.Network } var ( ErrParentDoesNotMatchHead = errors.New("block's parent hash does not match head block hash") - ErrPendingBlockNotFound = errors.New("pending block not found") supportedStarknetVersion = semver.MustParse("0.13.2") ) -func checkBlockVersion(protocolVersion string) error { +func CheckBlockVersion(protocolVersion string) error { blockVer, err := core.ParseBlockVersion(protocolVersion) if err != nil { return err @@ -69,22 +64,21 @@ func checkBlockVersion(protocolVersion string) error { var _ Reader = (*Blockchain)(nil) -// Todo: Remove after pending is moved to sychcroniser -var pending atomic.Pointer[Pending] - // Blockchain is responsible for keeping track of all things related to the Starknet blockchain type Blockchain struct { - network *utils.Network - database db.DB - listener EventListener + network *utils.Network + database db.DB + listener EventListener + pendingBlockFn func() *core.Block } -func New(database db.DB, network *utils.Network) *Blockchain { +func New(database db.DB, network *utils.Network, pendingBlockFn func() *core.Block) *Blockchain { RegisterCoreTypesToEncoder() return &Blockchain{ - database: database, - network: network, - listener: &SelectiveListener{}, + database: database, + network: network, + listener: &SelectiveListener{}, + pendingBlockFn: pendingBlockFn, } } @@ -244,24 +238,6 @@ func (b *Blockchain) TransactionByHash(hash *felt.Felt) (core.Transaction, error return transaction, b.database.View(func(txn db.Transaction) error { var err error transaction, err = transactionByHash(txn, hash) - - // not found in the canonical blocks, try pending - if errors.Is(err, db.ErrKeyNotFound) { - var pending *Pending - pending, err = pendingBlock(txn) - if err != nil { - return err - } - - for _, t := range pending.Block.Transactions { - if hash.Equal(t.Hash()) { - transaction = t - return nil - } - } - return db.ErrKeyNotFound - } - return err }) } @@ -277,29 +253,6 @@ func (b *Blockchain) Receipt(hash *felt.Felt) (*core.TransactionReceipt, *felt.F return receipt, blockHash, blockNumber, b.database.View(func(txn db.Transaction) error { var err error receipt, blockHash, blockNumber, err = receiptByHash(txn, hash) - - // not found in the canonical blocks, try pending - if errors.Is(err, db.ErrKeyNotFound) { - var pending *Pending - pending, err = pendingBlock(txn) - if err != nil { - if !errors.Is(err, ErrPendingBlockNotFound) { - return err - } - return db.ErrKeyNotFound - } - - for i, t := range pending.Block.Transactions { - if hash.Equal(t.Hash()) { - receipt = pending.Block.Receipts[i] - blockHash = nil - blockNumber = 0 - return nil - } - } - return db.ErrKeyNotFound - } - return err }) } @@ -381,7 +334,7 @@ func (b *Blockchain) VerifyBlock(block *core.Block) error { } func verifyBlock(txn db.Transaction, block *core.Block) error { - if err := checkBlockVersion(block.ProtocolVersion); err != nil { + if err := CheckBlockVersion(block.ProtocolVersion); err != nil { return err } @@ -819,7 +772,7 @@ func (b *Blockchain) EventFilter(from *felt.Felt, keys [][]felt.Felt) (*EventFil return nil, err } - return newEventFilter(txn, from, keys, 0, latest, &pending), nil + return newEventFilter(txn, from, keys, 0, latest, b.pendingBlockFn), nil } // RevertHead reverts the head block @@ -907,87 +860,3 @@ func removeTxsAndReceipts(txn db.Transaction, blockNumber, numTxs uint64) error return nil } - -// StorePending stores a pending block given that it is for the next height -func (b *Blockchain) StorePending(p *Pending) error { - err := checkBlockVersion(p.Block.ProtocolVersion) - if err != nil { - return err - } - return b.database.View(func(txn db.Transaction) error { - expectedParentHash := new(felt.Felt) - h, err := headsHeader(txn) - if err != nil && !errors.Is(err, db.ErrKeyNotFound) { - return err - } else if err == nil { - expectedParentHash = h.Hash - } - - if !expectedParentHash.Equal(p.Block.ParentHash) { - return ErrParentDoesNotMatchHead - } - - if existingPending, err := pendingBlock(txn); err == nil { - if existingPending.Block.TransactionCount >= p.Block.TransactionCount { - // ignore the incoming pending if it has fewer transactions than the one we already have - return nil - } - } else if !errors.Is(err, ErrPendingBlockNotFound) { - return err - } - - if h != nil { - p.Block.Number = h.Number + 1 - } - pending.Store(p) - - return nil - }) -} - -func pendingBlock(txn db.Transaction) (*Pending, error) { - p := pending.Load() - if p == nil { - return nil, ErrPendingBlockNotFound - } - - expectedParentHash := &felt.Zero - if head, err := headsHeader(txn); err == nil { - expectedParentHash = head.Hash - } - if p.Block.ParentHash.Equal(expectedParentHash) { - return p, nil - } - - // Since the pending block in the cache is outdated remove it - pending.Store(nil) - - return nil, ErrPendingBlockNotFound -} - -// Pending returns the pending block from the database -func (b *Blockchain) Pending() (*Pending, error) { - b.listener.OnRead("Pending") - var pending *Pending - return pending, b.database.View(func(txn db.Transaction) error { - var err error - pending, err = pendingBlock(txn) - return err - }) -} - -// PendingState returns the state resulting from execution of the pending block -func (b *Blockchain) PendingState() (core.StateReader, StateCloser, error) { - b.listener.OnRead("PendingState") - txn, err := b.database.NewTransaction(false) - if err != nil { - return nil, nil, err - } - - pending, err := pendingBlock(txn) - if err != nil { - return nil, nil, utils.RunAndWrapOnError(txn.Discard, err) - } - - return NewPendingState(pending.StateUpdate.StateDiff, pending.NewClasses, core.NewState(txn)), txn.Discard, nil -} diff --git a/blockchain/blockchain_test.go b/blockchain/blockchain_test.go index 53d3a77c48..6a20acf7d1 100644 --- a/blockchain/blockchain_test.go +++ b/blockchain/blockchain_test.go @@ -23,7 +23,7 @@ func TestNew(t *testing.T) { client := feeder.NewTestClient(t, &utils.Mainnet) gw := adaptfeeder.New(client) t.Run("empty blockchain's head is nil", func(t *testing.T) { - chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet) + chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet, nil) assert.Equal(t, &utils.Mainnet, chain.Network()) b, err := chain.Head() assert.Nil(t, b) @@ -37,10 +37,10 @@ func TestNew(t *testing.T) { require.NoError(t, err) testDB := pebble.NewMemTest(t) - chain := blockchain.New(testDB, &utils.Mainnet) + chain := blockchain.New(testDB, &utils.Mainnet, nil) assert.NoError(t, chain.Store(block0, &emptyCommitments, stateUpdate0, nil)) - chain = blockchain.New(testDB, &utils.Mainnet) + chain = blockchain.New(testDB, &utils.Mainnet, nil) b, err := chain.Head() require.NoError(t, err) assert.Equal(t, block0, b) @@ -51,7 +51,7 @@ func TestHeight(t *testing.T) { client := feeder.NewTestClient(t, &utils.Mainnet) gw := adaptfeeder.New(client) t.Run("return nil if blockchain is empty", func(t *testing.T) { - chain := blockchain.New(pebble.NewMemTest(t), &utils.Sepolia) + chain := blockchain.New(pebble.NewMemTest(t), &utils.Sepolia, nil) _, err := chain.Height() assert.Error(t, err) }) @@ -63,10 +63,10 @@ func TestHeight(t *testing.T) { require.NoError(t, err) testDB := pebble.NewMemTest(t) - chain := blockchain.New(testDB, &utils.Mainnet) + chain := blockchain.New(testDB, &utils.Mainnet, nil) assert.NoError(t, chain.Store(block0, &emptyCommitments, stateUpdate0, nil)) - chain = blockchain.New(testDB, &utils.Mainnet) + chain = blockchain.New(testDB, &utils.Mainnet, nil) height, err := chain.Height() require.NoError(t, err) assert.Equal(t, block0.Number, height) @@ -74,7 +74,7 @@ func TestHeight(t *testing.T) { } func TestBlockByNumberAndHash(t *testing.T) { - chain := blockchain.New(pebble.NewMemTest(t), &utils.Sepolia) + chain := blockchain.New(pebble.NewMemTest(t), &utils.Sepolia, nil) t.Run("same block is returned for both GetBlockByNumber and GetBlockByHash", func(t *testing.T) { client := feeder.NewTestClient(t, &utils.Mainnet) gw := adaptfeeder.New(client) @@ -110,7 +110,7 @@ func TestVerifyBlock(t *testing.T) { h1, err := new(felt.Felt).SetRandom() require.NoError(t, err) - chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet) + chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet, nil) t.Run("error if chain is empty and incoming block number is not 0", func(t *testing.T) { block := &core.Block{Header: &core.Header{Number: 10}} @@ -172,7 +172,7 @@ func TestSanityCheckNewHeight(t *testing.T) { h1, err := new(felt.Felt).SetRandom() require.NoError(t, err) - chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet) + chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet, nil) client := feeder.NewTestClient(t, &utils.Mainnet) @@ -217,7 +217,7 @@ func TestStore(t *testing.T) { require.NoError(t, err) t.Run("add block to empty blockchain", func(t *testing.T) { - chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet) + chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet, nil) require.NoError(t, chain.Store(block0, &emptyCommitments, stateUpdate0, nil)) headBlock, err := chain.Head() @@ -243,7 +243,7 @@ func TestStore(t *testing.T) { stateUpdate1, err := gw.StateUpdate(context.Background(), 1) require.NoError(t, err) - chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet) + chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet, nil) require.NoError(t, chain.Store(block0, &emptyCommitments, stateUpdate0, nil)) require.NoError(t, chain.Store(block1, &emptyCommitments, stateUpdate1, nil)) @@ -266,7 +266,7 @@ func TestStore(t *testing.T) { } func TestBlockCommitments(t *testing.T) { - chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet) + chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet, nil) client := feeder.NewTestClient(t, &utils.Mainnet) gw := adaptfeeder.New(client) @@ -291,7 +291,7 @@ func TestBlockCommitments(t *testing.T) { } func TestTransactionAndReceipt(t *testing.T) { - chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet) + chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet, nil) client := feeder.NewTestClient(t, &utils.Mainnet) gw := adaptfeeder.New(client) @@ -318,7 +318,7 @@ func TestTransactionAndReceipt(t *testing.T) { t.Run("GetTransactionByHash returns error if transaction does not exist", func(t *testing.T) { tx, err := chain.TransactionByHash(new(felt.Felt).SetUint64(345)) assert.Nil(t, tx) - assert.EqualError(t, err, blockchain.ErrPendingBlockNotFound.Error()) + assert.EqualError(t, err, db.ErrKeyNotFound.Error()) }) t.Run("GetTransactionReceipt returns error if receipt does not exist", func(t *testing.T) { @@ -379,7 +379,7 @@ func TestTransactionAndReceipt(t *testing.T) { func TestState(t *testing.T) { testDB := pebble.NewMemTest(t) - chain := blockchain.New(testDB, &utils.Mainnet) + chain := blockchain.New(testDB, &utils.Mainnet, nil) client := feeder.NewTestClient(t, &utils.Mainnet) gw := adaptfeeder.New(client) @@ -441,8 +441,13 @@ func TestState(t *testing.T) { } func TestEvents(t *testing.T) { + var pendingB *core.Block + pendingBlockFn := func() *core.Block { + return pendingB + } + testDB := pebble.NewMemTest(t) - chain := blockchain.New(testDB, &utils.Goerli2) + chain := blockchain.New(testDB, &utils.Goerli2, pendingBlockFn) client := feeder.NewTestClient(t, &utils.Goerli2) gw := adaptfeeder.New(client) @@ -456,10 +461,7 @@ func TestEvents(t *testing.T) { if b.Number < 6 { require.NoError(t, chain.Store(b, &emptyCommitments, s, nil)) } else { - require.NoError(t, chain.StorePending(&blockchain.Pending{ - Block: b, - StateUpdate: s, - })) + pendingB = b } } @@ -561,7 +563,7 @@ func TestEvents(t *testing.T) { func TestRevert(t *testing.T) { testdb := pebble.NewMemTest(t) - chain := blockchain.New(testdb, &utils.Mainnet) + chain := blockchain.New(testdb, &utils.Mainnet, nil) client := feeder.NewTestClient(t, &utils.Mainnet) gw := adaptfeeder.New(client) @@ -645,7 +647,7 @@ func TestL1Update(t *testing.T) { for _, head := range heads { t.Run(fmt.Sprintf("update L1 head to block %d", head.BlockNumber), func(t *testing.T) { - chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet) + chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet, nil) require.NoError(t, chain.SetL1Head(head)) got, err := chain.L1Head() require.NoError(t, err) @@ -653,131 +655,3 @@ func TestL1Update(t *testing.T) { }) } } - -func TestPending(t *testing.T) { - testDB := pebble.NewMemTest(t) - chain := blockchain.New(testDB, &utils.Mainnet) - client := feeder.NewTestClient(t, &utils.Mainnet) - gw := adaptfeeder.New(client) - - b, err := gw.BlockByNumber(context.Background(), 0) - require.NoError(t, err) - su, err := gw.StateUpdate(context.Background(), 0) - require.NoError(t, err) - - t.Run("pending state shouldnt exist if no pending block", func(t *testing.T) { - _, _, err = chain.PendingState() - require.Error(t, err) - }) - - t.Run("cannot store unsupported pending block version", func(t *testing.T) { - pending := &blockchain.Pending{Block: &core.Block{Header: &core.Header{ProtocolVersion: "1.9.0"}}} - require.Error(t, chain.StorePending(pending)) - }) - - t.Run("store genesis as pending", func(t *testing.T) { - pendingGenesis := &blockchain.Pending{ - Block: b, - StateUpdate: su, - } - require.NoError(t, chain.StorePending(pendingGenesis)) - - gotPending, pErr := chain.Pending() - require.NoError(t, pErr) - assert.Equal(t, pendingGenesis, gotPending) - }) - - b.GasPriceSTRK = utils.HexToFelt(t, "0xDEADBEEF") - require.NoError(t, chain.Store(b, &emptyCommitments, su, nil)) - - t.Run("storing a pending too far into the future should fail", func(t *testing.T) { - b, err = gw.BlockByNumber(context.Background(), 2) - require.NoError(t, err) - su, err = gw.StateUpdate(context.Background(), 2) - require.NoError(t, err) - - notExpectedPending := blockchain.Pending{ - Block: b, - StateUpdate: su, - } - require.ErrorIs(t, chain.StorePending(¬ExpectedPending), blockchain.ErrParentDoesNotMatchHead) - }) - - t.Run("store expected pending block", func(t *testing.T) { - b, err = gw.BlockByNumber(context.Background(), 1) - require.NoError(t, err) - su, err = gw.StateUpdate(context.Background(), 1) - require.NoError(t, err) - - expectedPending := &blockchain.Pending{ - Block: b, - StateUpdate: su, - } - require.NoError(t, chain.StorePending(expectedPending)) - - gotPending, pErr := chain.Pending() - require.NoError(t, pErr) - assert.Equal(t, expectedPending, gotPending) - }) - - t.Run("fetch a txn from pending block", func(t *testing.T) { - hash := utils.HexToFelt(t, "0x2f07a65f9f7a6445b2a0b1fb90ef12f5fd3b94128d06a67712efd3b2f163533") - tx, tErr := chain.TransactionByHash(hash) - require.NoError(t, tErr) - assert.Equal(t, hash, tx.Hash()) - t.Run("receipt", func(t *testing.T) { - r, blockHash, blockNumber, rErr := chain.Receipt(hash) - require.NoError(t, rErr) - assert.Nil(t, blockHash) - assert.Zero(t, blockNumber) - assert.Equal(t, hash, r.TransactionHash) - }) - }) - - t.Run("get pending state", func(t *testing.T) { - _, pendingStateCloser, pErr := chain.PendingState() - t.Cleanup(func() { - require.NoError(t, pendingStateCloser()) - }) - require.NoError(t, pErr) - }) -} - -func TestStorePendingIncludesNumber(t *testing.T) { - network := utils.Mainnet - chain := blockchain.New(pebble.NewMemTest(t), &network) - - // Store pending genesis. - require.NoError(t, chain.StorePending(&blockchain.Pending{ - Block: &core.Block{ - Header: &core.Header{ - ParentHash: new(felt.Felt), - Hash: new(felt.Felt), - }, - }, - })) - pending, err := chain.Pending() - require.NoError(t, err) - require.Equal(t, uint64(0), pending.Block.Number) - - // Add block zero. - gw := adaptfeeder.New(feeder.NewTestClient(t, &network)) - b, err := gw.BlockByNumber(context.Background(), 0) - require.NoError(t, err) - su, err := gw.StateUpdate(context.Background(), 0) - require.NoError(t, err) - require.NoError(t, chain.Store(b, nil, su, nil)) - - // Store pending. - require.NoError(t, chain.StorePending(&blockchain.Pending{ - Block: &core.Block{ - Header: &core.Header{ - ParentHash: b.Hash, - Hash: new(felt.Felt), - }, - }, - })) - pending, err = chain.Pending() - require.NoError(t, err) - require.Equal(t, uint64(1), pending.Block.Number) -} diff --git a/blockchain/event_filter.go b/blockchain/event_filter.go index 3a5afe8dcc..8d3d177855 100644 --- a/blockchain/event_filter.go +++ b/blockchain/event_filter.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "math" - "sync/atomic" "github.com/NethermindEth/juno/core" "github.com/NethermindEth/juno/core/felt" @@ -22,7 +21,7 @@ type EventFilter struct { contractAddress *felt.Felt keys [][]felt.Felt maxScanned uint // maximum number of scanned blocks in single call. - pending *atomic.Pointer[Pending] + pendingBlockFn func() *core.Block } type EventFilterRange uint @@ -33,7 +32,7 @@ const ( ) func newEventFilter(txn db.Transaction, contractAddress *felt.Felt, keys [][]felt.Felt, fromBlock, toBlock uint64, - pending *atomic.Pointer[Pending], + pendingBlockFn func() *core.Block, ) *EventFilter { return &EventFilter{ txn: txn, @@ -42,7 +41,7 @@ func newEventFilter(txn db.Transaction, contractAddress *felt.Felt, keys [][]fel fromBlock: fromBlock, toBlock: toBlock, maxScanned: math.MaxUint, - pending: pending, + pendingBlockFn: pendingBlockFn, } } @@ -107,11 +106,11 @@ func (e *EventFilter) Events(cToken *ContinuationToken, chunkSize uint64) ([]*Fi return nil, nil, err } - var pending *Pending + var pending *core.Block if e.toBlock > latest { e.toBlock = latest + 1 - pending = e.pending.Load() + pending = e.pendingBlockFn() if pending == nil { e.toBlock = latest } @@ -137,7 +136,7 @@ func (e *EventFilter) Events(cToken *ContinuationToken, chunkSize uint64) ([]*Fi return nil, nil, err } } else { - header = pending.Block.Header + header = pending.Header } if possibleMatches := e.testBloom(header.EventsBloom, filterKeysMaps); !possibleMatches { @@ -152,7 +151,7 @@ func (e *EventFilter) Events(cToken *ContinuationToken, chunkSize uint64) ([]*Fi return nil, nil, err } } else { - receipts = pending.Block.Receipts + receipts = pending.Receipts } var processedEvents uint64 diff --git a/l1/l1_pkg_test.go b/l1/l1_pkg_test.go index 4e377cce12..97714bf1c2 100644 --- a/l1/l1_pkg_test.go +++ b/l1/l1_pkg_test.go @@ -337,7 +337,7 @@ func TestClient(t *testing.T) { ctrl := gomock.NewController(t) nopLog := utils.NewNopZapLogger() network := utils.Mainnet - chain := blockchain.New(pebble.NewMemTest(t), &network) + chain := blockchain.New(pebble.NewMemTest(t), &network, nil) client := NewClient(nil, chain, nopLog).WithResubscribeDelay(0).WithPollFinalisedInterval(time.Nanosecond) @@ -398,7 +398,7 @@ func TestUnreliableSubscription(t *testing.T) { ctrl := gomock.NewController(t) nopLog := utils.NewNopZapLogger() network := utils.Mainnet - chain := blockchain.New(pebble.NewMemTest(t), &network) + chain := blockchain.New(pebble.NewMemTest(t), &network, nil) client := NewClient(nil, chain, nopLog).WithResubscribeDelay(0).WithPollFinalisedInterval(time.Nanosecond) err := errors.New("test err") diff --git a/l1/l1_test.go b/l1/l1_test.go index 44174b7d3c..bb14cc711a 100644 --- a/l1/l1_test.go +++ b/l1/l1_test.go @@ -54,7 +54,7 @@ func TestFailToCreateSubscription(t *testing.T) { network := utils.Mainnet ctrl := gomock.NewController(t) nopLog := utils.NewNopZapLogger() - chain := blockchain.New(pebble.NewMemTest(t), &network) + chain := blockchain.New(pebble.NewMemTest(t), &network, nil) subscriber := mocks.NewMockSubscriber(ctrl) @@ -85,7 +85,7 @@ func TestMismatchedChainID(t *testing.T) { network := utils.Mainnet ctrl := gomock.NewController(t) nopLog := utils.NewNopZapLogger() - chain := blockchain.New(pebble.NewMemTest(t), &network) + chain := blockchain.New(pebble.NewMemTest(t), &network, nil) subscriber := mocks.NewMockSubscriber(ctrl) @@ -110,7 +110,7 @@ func TestEventListener(t *testing.T) { ctrl := gomock.NewController(t) nopLog := utils.NewNopZapLogger() network := utils.Mainnet - chain := blockchain.New(pebble.NewMemTest(t), &network) + chain := blockchain.New(pebble.NewMemTest(t), &network, nil) subscriber := mocks.NewMockSubscriber(ctrl) subscriber. diff --git a/migration/migration_pkg_test.go b/migration/migration_pkg_test.go index 0972416313..edec4caac8 100644 --- a/migration/migration_pkg_test.go +++ b/migration/migration_pkg_test.go @@ -80,7 +80,7 @@ func TestRelocateContractStorageRootKeys(t *testing.T) { func TestRecalculateBloomFilters(t *testing.T) { testdb := pebble.NewMemTest(t) - chain := blockchain.New(testdb, &utils.Mainnet) + chain := blockchain.New(testdb, &utils.Mainnet, nil) client := feeder.NewTestClient(t, &utils.Mainnet) gw := adaptfeeder.New(client) @@ -188,7 +188,7 @@ func TestChangeTrieNodeEncoding(t *testing.T) { func TestCalculateBlockCommitments(t *testing.T) { testdb := pebble.NewMemTest(t) - chain := blockchain.New(testdb, &utils.Mainnet) + chain := blockchain.New(testdb, &utils.Mainnet, nil) client := feeder.NewTestClient(t, &utils.Mainnet) gw := adaptfeeder.New(client) diff --git a/mocks/mock_blockchain.go b/mocks/mock_blockchain.go index c2a0d2ae48..9f841fa4e0 100644 --- a/mocks/mock_blockchain.go +++ b/mocks/mock_blockchain.go @@ -222,37 +222,6 @@ func (mr *MockReaderMockRecorder) Network() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Network", reflect.TypeOf((*MockReader)(nil).Network)) } -// Pending mocks base method. -func (m *MockReader) Pending() (*blockchain.Pending, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Pending") - ret0, _ := ret[0].(*blockchain.Pending) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// Pending indicates an expected call of Pending. -func (mr *MockReaderMockRecorder) Pending() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Pending", reflect.TypeOf((*MockReader)(nil).Pending)) -} - -// PendingState mocks base method. -func (m *MockReader) PendingState() (core.StateReader, func() error, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PendingState") - ret0, _ := ret[0].(core.StateReader) - ret1, _ := ret[1].(func() error) - ret2, _ := ret[2].(error) - return ret0, ret1, ret2 -} - -// PendingState indicates an expected call of PendingState. -func (mr *MockReaderMockRecorder) PendingState() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PendingState", reflect.TypeOf((*MockReader)(nil).PendingState)) -} - // Receipt mocks base method. func (m *MockReader) Receipt(arg0 *felt.Felt) (*core.TransactionReceipt, *felt.Felt, uint64, error) { m.ctrl.T.Helper() diff --git a/mocks/mock_synchronizer.go b/mocks/mock_synchronizer.go index dc55ebce7f..910e5007e6 100644 --- a/mocks/mock_synchronizer.go +++ b/mocks/mock_synchronizer.go @@ -54,6 +54,51 @@ func (mr *MockSyncReaderMockRecorder) HighestBlockHeader() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HighestBlockHeader", reflect.TypeOf((*MockSyncReader)(nil).HighestBlockHeader)) } +// Pending mocks base method. +func (m *MockSyncReader) Pending() (*sync.Pending, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Pending") + ret0, _ := ret[0].(*sync.Pending) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Pending indicates an expected call of Pending. +func (mr *MockSyncReaderMockRecorder) Pending() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Pending", reflect.TypeOf((*MockSyncReader)(nil).Pending)) +} + +// PendingBlock mocks base method. +func (m *MockSyncReader) PendingBlock() *core.Block { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PendingBlock") + ret0, _ := ret[0].(*core.Block) + return ret0 +} + +// PendingBlock indicates an expected call of PendingBlock. +func (mr *MockSyncReaderMockRecorder) PendingBlock() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PendingBlock", reflect.TypeOf((*MockSyncReader)(nil).PendingBlock)) +} + +// PendingState mocks base method. +func (m *MockSyncReader) PendingState() (core.StateReader, func() error, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PendingState") + ret0, _ := ret[0].(core.StateReader) + ret1, _ := ret[1].(func() error) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// PendingState indicates an expected call of PendingState. +func (mr *MockSyncReaderMockRecorder) PendingState() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PendingState", reflect.TypeOf((*MockSyncReader)(nil).PendingState)) +} + // StartingBlockNumber mocks base method. func (m *MockSyncReader) StartingBlockNumber() (uint64, error) { m.ctrl.T.Helper() diff --git a/node/node.go b/node/node.go index 6a20829e89..987ae77de9 100644 --- a/node/node.go +++ b/node/node.go @@ -123,7 +123,8 @@ func New(cfg *Config, version string) (*Node, error) { //nolint:gocyclo,funlen services := make([]service.Service, 0) - chain := blockchain.New(database, &cfg.Network) + synchronizer := new(sync.Synchronizer) + chain := blockchain.New(database, &cfg.Network, synchronizer.PendingBlock) // Verify that cfg.Network is compatible with the database. head, err := chain.Head() @@ -151,7 +152,7 @@ func New(cfg *Config, version string) (*Node, error) { //nolint:gocyclo,funlen client := feeder.NewClient(cfg.Network.FeederURL).WithUserAgent(ua).WithLogger(log). WithTimeout(cfg.GatewayTimeout).WithAPIKey(cfg.GatewayAPIKey) - synchronizer := sync.New(chain, adaptfeeder.New(client), log, cfg.PendingPollInterval, dbIsRemote) + synchronizer = sync.New(chain, adaptfeeder.New(client), log, cfg.PendingPollInterval, dbIsRemote, database) gatewayClient := gateway.NewClient(cfg.Network.GatewayURL, log).WithUserAgent(ua).WithAPIKey(cfg.GatewayAPIKey) var p2pService *p2p.Service diff --git a/node/node_test.go b/node/node_test.go index a9287e0e4e..f30482d7a7 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -69,8 +69,8 @@ func TestNetworkVerificationOnNonEmptyDB(t *testing.T) { log := utils.NewNopZapLogger() database, err := pebble.New(dbPath) require.NoError(t, err) - chain := blockchain.New(database, &network) - syncer := sync.New(chain, adaptfeeder.New(feeder.NewTestClient(t, &network)), log, 0, false) + chain := blockchain.New(database, &network, nil) + syncer := sync.New(chain, adaptfeeder.New(feeder.NewTestClient(t, &network)), log, 0, false, database) ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond) require.NoError(t, syncer.Run(ctx)) cancel() diff --git a/rpc/block_test.go b/rpc/block_test.go index f24378a0d8..e86f627563 100644 --- a/rpc/block_test.go +++ b/rpc/block_test.go @@ -14,6 +14,7 @@ import ( "github.com/NethermindEth/juno/mocks" "github.com/NethermindEth/juno/rpc" adaptfeeder "github.com/NethermindEth/juno/starknetdata/feeder" + "github.com/NethermindEth/juno/sync" "github.com/NethermindEth/juno/utils" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -155,7 +156,8 @@ func TestBlockTransactionCount(t *testing.T) { n := utils.Ptr(utils.Sepolia) mockReader := mocks.NewMockReader(mockCtrl) - handler := rpc.New(mockReader, nil, nil, "", nil) + mockSyncReader := mocks.NewMockSyncReader(mockCtrl) + handler := rpc.New(mockReader, mockSyncReader, nil, "", nil) client := feeder.NewTestClient(t, n) gw := adaptfeeder.New(client) @@ -217,7 +219,7 @@ func TestBlockTransactionCount(t *testing.T) { t.Run("blockID - pending", func(t *testing.T) { latestBlock.Hash = nil latestBlock.GlobalStateRoot = nil - mockReader.EXPECT().Pending().Return(&blockchain.Pending{ + mockSyncReader.EXPECT().Pending().Return(&sync.Pending{ Block: latestBlock, }, nil) @@ -234,13 +236,24 @@ func TestBlockWithTxHashes(t *testing.T) { "hash": {Hash: new(felt.Felt).SetUint64(1)}, "number": {Number: 1}, } + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) - for description, id := range errTests { + var mockSyncReader *mocks.MockSyncReader + mockReader := mocks.NewMockReader(mockCtrl) + + for description, id := range errTests { //nolint:dupl t.Run(description, func(t *testing.T) { log := utils.NewNopZapLogger() n := utils.Ptr(utils.Mainnet) - chain := blockchain.New(pebble.NewMemTest(t), n) - handler := rpc.New(chain, nil, nil, "", log) + chain := blockchain.New(pebble.NewMemTest(t), n, nil) + + if description == "pending" { //nolint:goconst + mockSyncReader = mocks.NewMockSyncReader(mockCtrl) + mockSyncReader.EXPECT().Pending().Return(nil, sync.ErrPendingBlockNotFound) + } + + handler := rpc.New(chain, mockSyncReader, nil, "", log) block, rpcErr := handler.BlockWithTxHashes(id) assert.Nil(t, block) @@ -248,12 +261,8 @@ func TestBlockWithTxHashes(t *testing.T) { }) } - mockCtrl := gomock.NewController(t) - t.Cleanup(mockCtrl.Finish) - n := utils.Ptr(utils.Sepolia) - mockReader := mocks.NewMockReader(mockCtrl) - handler := rpc.New(mockReader, nil, nil, "", nil) + handler := rpc.New(mockReader, mockSyncReader, nil, "", nil) client := feeder.NewTestClient(t, n) gw := adaptfeeder.New(client) @@ -335,7 +344,7 @@ func TestBlockWithTxHashes(t *testing.T) { t.Run("blockID - pending", func(t *testing.T) { latestBlock.Hash = nil latestBlock.GlobalStateRoot = nil - mockReader.EXPECT().Pending().Return(&blockchain.Pending{ + mockSyncReader.EXPECT().Pending().Return(&sync.Pending{ Block: latestBlock, }, nil) mockReader.EXPECT().L1Head().Return(nil, db.ErrKeyNotFound) @@ -354,12 +363,24 @@ func TestBlockWithTxs(t *testing.T) { "number": {Number: 1}, } - for description, id := range errTests { + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) + + var mockSyncReader *mocks.MockSyncReader + mockReader := mocks.NewMockReader(mockCtrl) + + for description, id := range errTests { //nolint:dupl t.Run(description, func(t *testing.T) { log := utils.NewNopZapLogger() n := utils.Ptr(utils.Mainnet) - chain := blockchain.New(pebble.NewMemTest(t), n) - handler := rpc.New(chain, nil, nil, "", log) + chain := blockchain.New(pebble.NewMemTest(t), n, nil) + + if description == "pending" { + mockSyncReader = mocks.NewMockSyncReader(mockCtrl) + mockSyncReader.EXPECT().Pending().Return(nil, sync.ErrPendingBlockNotFound) + } + + handler := rpc.New(chain, mockSyncReader, nil, "", log) block, rpcErr := handler.BlockWithTxs(id) assert.Nil(t, block) @@ -367,12 +388,8 @@ func TestBlockWithTxs(t *testing.T) { }) } - mockCtrl := gomock.NewController(t) - t.Cleanup(mockCtrl.Finish) - n := utils.Ptr(utils.Mainnet) - mockReader := mocks.NewMockReader(mockCtrl) - handler := rpc.New(mockReader, nil, nil, "", nil) + handler := rpc.New(mockReader, mockSyncReader, nil, "", nil) client := feeder.NewTestClient(t, n) gw := adaptfeeder.New(client) @@ -472,7 +489,7 @@ func TestBlockWithTxs(t *testing.T) { t.Run("blockID - pending", func(t *testing.T) { latestBlock.Hash = nil latestBlock.GlobalStateRoot = nil - mockReader.EXPECT().Pending().Return(&blockchain.Pending{ + mockSyncReader.EXPECT().Pending().Return(&sync.Pending{ Block: latestBlock, }, nil).Times(2) mockReader.EXPECT().L1Head().Return(nil, db.ErrKeyNotFound).Times(2) @@ -560,7 +577,8 @@ func TestBlockWithReceipts(t *testing.T) { n := utils.Ptr(utils.Mainnet) mockReader := mocks.NewMockReader(mockCtrl) - handler := rpc.New(mockReader, nil, nil, "", nil) + mockSyncReader := mocks.NewMockSyncReader(mockCtrl) + handler := rpc.New(mockReader, mockSyncReader, nil, "", nil) t.Run("transaction not found", func(t *testing.T) { blockID := rpc.BlockID{Number: 777} @@ -596,7 +614,7 @@ func TestBlockWithReceipts(t *testing.T) { blockID := rpc.BlockID{Pending: true} - mockReader.EXPECT().Pending().Return(blockchain.Pending{Block: block0}, nil) + mockSyncReader.EXPECT().Pending().Return(sync.Pending{Block: block0}, nil) mockReader.EXPECT().L1Head().Return(&core.L1Head{}, nil) resp, rpcErr := handler.BlockWithReceipts(blockID) diff --git a/rpc/events_test.go b/rpc/events_test.go index 3db08d233e..bc28e17105 100644 --- a/rpc/events_test.go +++ b/rpc/events_test.go @@ -25,9 +25,14 @@ import ( ) func TestEvents(t *testing.T) { + var pendingB *core.Block + pendingBlockFn := func() *core.Block { + return pendingB + } + testDB := pebble.NewMemTest(t) n := utils.Ptr(utils.Sepolia) - chain := blockchain.New(testDB, n) + chain := blockchain.New(testDB, n, pendingBlockFn) client := feeder.NewTestClient(t, n) gw := adaptfeeder.New(client) @@ -43,10 +48,7 @@ func TestEvents(t *testing.T) { } else { b.Hash = nil b.GlobalStateRoot = nil - require.NoError(t, chain.StorePending(&blockchain.Pending{ - Block: b, - StateUpdate: s, - })) + pendingB = b } } @@ -238,8 +240,9 @@ func TestSubscribeNewHeadsAndUnsubscribe(t *testing.T) { gw := adaptfeeder.New(client) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - chain := blockchain.New(pebble.NewMemTest(t), n) - syncer := sync.New(chain, gw, log, 0, false) + testDB := pebble.NewMemTest(t) + chain := blockchain.New(pebble.NewMemTest(t), n, nil) + syncer := sync.New(chain, gw, log, 0, false, testDB) handler := rpc.New(chain, syncer, nil, "", log) go func() { @@ -320,8 +323,9 @@ func TestMultipleSubscribeNewHeadsAndUnsubscribe(t *testing.T) { gw := adaptfeeder.New(feederClient) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - chain := blockchain.New(pebble.NewMemTest(t), n) - syncer := sync.New(chain, gw, log, 0, false) + testDB := pebble.NewMemTest(t) + chain := blockchain.New(testDB, n, nil) + syncer := sync.New(chain, gw, log, 0, false, testDB) handler := rpc.New(chain, syncer, nil, "", log) go func() { require.NoError(t, handler.Run(ctx)) diff --git a/rpc/handlers_test.go b/rpc/handlers_test.go index 48374d20d8..f76887e3ec 100644 --- a/rpc/handlers_test.go +++ b/rpc/handlers_test.go @@ -40,12 +40,13 @@ func TestSpecVersion(t *testing.T) { func TestThrottledVMError(t *testing.T) { mockCtrl := gomock.NewController(t) t.Cleanup(mockCtrl.Finish) + mockSyncReader := mocks.NewMockSyncReader(mockCtrl) mockReader := mocks.NewMockReader(mockCtrl) mockReader.EXPECT().Network().Return(&utils.Mainnet).AnyTimes() mockVM := mocks.NewMockVM(mockCtrl) throttledVM := node.NewThrottledVM(mockVM, 0, 0) - handler := rpc.New(mockReader, nil, throttledVM, "", nil) + handler := rpc.New(mockReader, mockSyncReader, throttledVM, "", nil) mockState := mocks.NewMockStateHistoryReader(mockCtrl) throttledErr := "VM throughput limit reached" @@ -94,7 +95,7 @@ func TestThrottledVMError(t *testing.T) { mockReader.EXPECT().StateAtBlockHash(header.ParentHash).Return(state, nopCloser, nil) headState := mocks.NewMockStateHistoryReader(mockCtrl) headState.EXPECT().Class(declareTx.ClassHash).Return(declaredClass, nil) - mockReader.EXPECT().PendingState().Return(headState, nopCloser, nil) + mockSyncReader.EXPECT().PendingState().Return(headState, nopCloser, nil) _, rpcErr := handler.TraceBlockTransactions(context.Background(), rpc.BlockID{Hash: blockHash}) assert.Equal(t, throttledErr, rpcErr.Data) }) diff --git a/rpc/helpers.go b/rpc/helpers.go index 11e9e23cde..5fc403fec2 100644 --- a/rpc/helpers.go +++ b/rpc/helpers.go @@ -11,6 +11,7 @@ import ( "github.com/NethermindEth/juno/core/felt" "github.com/NethermindEth/juno/db" "github.com/NethermindEth/juno/jsonrpc" + "github.com/NethermindEth/juno/sync" ) func (h *Handler) l1Head() (*core.L1Head, *jsonrpc.Error) { @@ -38,8 +39,8 @@ func (h *Handler) blockByID(id *BlockID) (*core.Block, *jsonrpc.Error) { case id.Hash != nil: block, err = h.bcReader.BlockByHash(id.Hash) case id.Pending: - var pending *blockchain.Pending - pending, err = h.bcReader.Pending() + var pending *sync.Pending + pending, err = h.syncReader.Pending() if err == nil { block = pending.Block } @@ -48,7 +49,7 @@ func (h *Handler) blockByID(id *BlockID) (*core.Block, *jsonrpc.Error) { } if err != nil { - if errors.Is(err, db.ErrKeyNotFound) || errors.Is(err, blockchain.ErrPendingBlockNotFound) { + if errors.Is(err, db.ErrKeyNotFound) || errors.Is(err, sync.ErrPendingBlockNotFound) { return nil, ErrBlockNotFound } return nil, ErrInternal.CloneWithData(err) @@ -68,8 +69,8 @@ func (h *Handler) blockHeaderByID(id *BlockID) (*core.Header, *jsonrpc.Error) { case id.Hash != nil: header, err = h.bcReader.BlockHeaderByHash(id.Hash) case id.Pending: - var pending *blockchain.Pending - pending, err = h.bcReader.Pending() + var pending *sync.Pending + pending, err = h.syncReader.Pending() if err == nil { header = pending.Block.Header } @@ -78,7 +79,7 @@ func (h *Handler) blockHeaderByID(id *BlockID) (*core.Header, *jsonrpc.Error) { } if err != nil { - if errors.Is(err, db.ErrKeyNotFound) || errors.Is(err, blockchain.ErrPendingBlockNotFound) { + if errors.Is(err, db.ErrKeyNotFound) || errors.Is(err, sync.ErrPendingBlockNotFound) { return nil, ErrBlockNotFound } return nil, ErrInternal.CloneWithData(err) @@ -157,13 +158,13 @@ func (h *Handler) stateByBlockID(id *BlockID) (core.StateReader, blockchain.Stat case id.Hash != nil: reader, closer, err = h.bcReader.StateAtBlockHash(id.Hash) case id.Pending: - reader, closer, err = h.bcReader.PendingState() + reader, closer, err = h.syncReader.PendingState() default: reader, closer, err = h.bcReader.StateAtBlockNumber(id.Number) } if err != nil { - if errors.Is(err, db.ErrKeyNotFound) || errors.Is(err, blockchain.ErrPendingBlockNotFound) { + if errors.Is(err, db.ErrKeyNotFound) || errors.Is(err, sync.ErrPendingBlockNotFound) { return nil, nil, ErrBlockNotFound } return nil, nil, ErrInternal.CloneWithData(err) diff --git a/rpc/state_update.go b/rpc/state_update.go index 25a4677889..8f3c6380da 100644 --- a/rpc/state_update.go +++ b/rpc/state_update.go @@ -3,11 +3,11 @@ package rpc import ( "errors" - "github.com/NethermindEth/juno/blockchain" "github.com/NethermindEth/juno/core" "github.com/NethermindEth/juno/core/felt" "github.com/NethermindEth/juno/db" "github.com/NethermindEth/juno/jsonrpc" + "github.com/NethermindEth/juno/sync" ) // https://github.com/starkware-libs/starknet-specs/blob/8016dd08ed7cd220168db16f24c8a6827ab88317/api/starknet_api_openrpc.json#L909 @@ -75,8 +75,8 @@ func (h *Handler) StateUpdate(id BlockID) (*StateUpdate, *jsonrpc.Error) { update, err = h.bcReader.StateUpdateByNumber(height) } } else if id.Pending { - var pending *blockchain.Pending - pending, err = h.bcReader.Pending() + var pending *sync.Pending + pending, err = h.syncReader.Pending() if err == nil { update = pending.StateUpdate } @@ -86,7 +86,7 @@ func (h *Handler) StateUpdate(id BlockID) (*StateUpdate, *jsonrpc.Error) { update, err = h.bcReader.StateUpdateByNumber(id.Number) } if err != nil { - if errors.Is(err, db.ErrKeyNotFound) || errors.Is(err, blockchain.ErrPendingBlockNotFound) { + if errors.Is(err, db.ErrKeyNotFound) || errors.Is(err, sync.ErrPendingBlockNotFound) { return nil, ErrBlockNotFound } return nil, ErrInternal.CloneWithData(err) diff --git a/rpc/state_update_test.go b/rpc/state_update_test.go index 718a424c16..041bbf7fe1 100644 --- a/rpc/state_update_test.go +++ b/rpc/state_update_test.go @@ -12,6 +12,7 @@ import ( "github.com/NethermindEth/juno/mocks" "github.com/NethermindEth/juno/rpc" adaptfeeder "github.com/NethermindEth/juno/starknetdata/feeder" + "github.com/NethermindEth/juno/sync" "github.com/NethermindEth/juno/utils" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -26,11 +27,22 @@ func TestStateUpdate(t *testing.T) { "number": {Number: 1}, } + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) + + var mockSyncReader *mocks.MockSyncReader + mockReader := mocks.NewMockReader(mockCtrl) + n := utils.Ptr(utils.Mainnet) for description, id := range errTests { t.Run(description, func(t *testing.T) { - chain := blockchain.New(pebble.NewMemTest(t), n) - handler := rpc.New(chain, nil, nil, "", nil) + chain := blockchain.New(pebble.NewMemTest(t), n, nil) + + if description == "pending" { + mockSyncReader = mocks.NewMockSyncReader(mockCtrl) + mockSyncReader.EXPECT().Pending().Return(nil, sync.ErrPendingBlockNotFound) + } + handler := rpc.New(chain, mockSyncReader, nil, "", nil) update, rpcErr := handler.StateUpdate(id) assert.Nil(t, update) @@ -38,9 +50,7 @@ func TestStateUpdate(t *testing.T) { }) } - mockCtrl := gomock.NewController(t) - mockReader := mocks.NewMockReader(mockCtrl) - handler := rpc.New(mockReader, nil, nil, "", nil) + handler := rpc.New(mockReader, mockSyncReader, nil, "", nil) client := feeder.NewTestClient(t, n) mainnetGw := adaptfeeder.New(client) @@ -139,7 +149,7 @@ func TestStateUpdate(t *testing.T) { t.Run("pending", func(t *testing.T) { update21656.BlockHash = nil update21656.NewRoot = nil - mockReader.EXPECT().Pending().Return(&blockchain.Pending{ + mockSyncReader.EXPECT().Pending().Return(&sync.Pending{ StateUpdate: update21656, }, nil) diff --git a/rpc/trace.go b/rpc/trace.go index 809a8481de..65ac9785ad 100644 --- a/rpc/trace.go +++ b/rpc/trace.go @@ -11,6 +11,7 @@ import ( "github.com/NethermindEth/juno/core/felt" "github.com/NethermindEth/juno/jsonrpc" "github.com/NethermindEth/juno/starknet" + "github.com/NethermindEth/juno/sync" "github.com/NethermindEth/juno/utils" "github.com/NethermindEth/juno/vm" ) @@ -146,8 +147,8 @@ func (h *Handler) traceTransaction(ctx context.Context, hash *felt.Felt, v0_6Res var block *core.Block isPendingBlock := blockHash == nil if isPendingBlock { - var pending blockchain.Pending - pending, err = h.bcReader.Pending() + var pending *sync.Pending + pending, err = h.syncReader.Pending() if err != nil { // for traceTransaction handlers there is no block not found error return nil, ErrTxnHashNotFound @@ -224,7 +225,7 @@ func (h *Handler) traceBlockTransactions(ctx context.Context, block *core.Block, headStateCloser blockchain.StateCloser ) if isPending { - headState, headStateCloser, err = h.bcReader.PendingState() + headState, headStateCloser, err = h.syncReader.PendingState() } else { headState, headStateCloser, err = h.bcReader.HeadState() } diff --git a/rpc/trace_test.go b/rpc/trace_test.go index b1a4debaf2..e7f854e9bb 100644 --- a/rpc/trace_test.go +++ b/rpc/trace_test.go @@ -16,6 +16,7 @@ import ( "github.com/NethermindEth/juno/mocks" "github.com/NethermindEth/juno/rpc" adaptfeeder "github.com/NethermindEth/juno/starknetdata/feeder" + "github.com/NethermindEth/juno/sync" "github.com/NethermindEth/juno/utils" "github.com/NethermindEth/juno/vm" "github.com/stretchr/testify/assert" @@ -77,9 +78,10 @@ func TestTraceTransaction(t *testing.T) { t.Cleanup(mockCtrl.Finish) mockReader := mocks.NewMockReader(mockCtrl) + mockSyncReader := mocks.NewMockSyncReader(mockCtrl) mockReader.EXPECT().Network().Return(&utils.Mainnet).AnyTimes() mockVM := mocks.NewMockVM(mockCtrl) - handler := rpc.New(mockReader, nil, mockVM, "", utils.NewNopZapLogger()) + handler := rpc.New(mockReader, mockSyncReader, mockVM, "", utils.NewNopZapLogger()) t.Run("not found", func(t *testing.T) { hash := utils.HexToFelt(t, "0xBBBB") @@ -201,14 +203,14 @@ func TestTraceTransaction(t *testing.T) { } mockReader.EXPECT().Receipt(hash).Return(nil, header.Hash, header.Number, nil) - mockReader.EXPECT().Pending().Return(blockchain.Pending{ + mockSyncReader.EXPECT().Pending().Return(&sync.Pending{ Block: block, }, nil) mockReader.EXPECT().StateAtBlockHash(header.ParentHash).Return(nil, nopCloser, nil) headState := mocks.NewMockStateHistoryReader(mockCtrl) headState.EXPECT().Class(tx.ClassHash).Return(declaredClass, nil) - mockReader.EXPECT().PendingState().Return(headState, nopCloser, nil) + mockSyncReader.EXPECT().PendingState().Return(headState, nopCloser, nil) executionResources := `{ "pedersen": 0, @@ -266,9 +268,10 @@ func TestTraceTransactionV0_6(t *testing.T) { t.Cleanup(mockCtrl.Finish) mockReader := mocks.NewMockReader(mockCtrl) + mockSyncReader := mocks.NewMockSyncReader(mockCtrl) mockReader.EXPECT().Network().Return(&utils.Mainnet).AnyTimes() mockVM := mocks.NewMockVM(mockCtrl) - handler := rpc.New(mockReader, nil, mockVM, "", utils.NewNopZapLogger()) + handler := rpc.New(mockReader, mockSyncReader, mockVM, "", utils.NewNopZapLogger()) t.Run("not found", func(t *testing.T) { hash := utils.HexToFelt(t, "0xBBBB") @@ -356,14 +359,14 @@ func TestTraceTransactionV0_6(t *testing.T) { } mockReader.EXPECT().Receipt(hash).Return(nil, header.Hash, header.Number, nil) - mockReader.EXPECT().Pending().Return(blockchain.Pending{ + mockSyncReader.EXPECT().Pending().Return(&sync.Pending{ Block: block, }, nil) mockReader.EXPECT().StateAtBlockHash(header.ParentHash).Return(nil, nopCloser, nil) headState := mocks.NewMockStateHistoryReader(mockCtrl) headState.EXPECT().Class(tx.ClassHash).Return(declaredClass, nil) - mockReader.EXPECT().PendingState().Return(headState, nopCloser, nil) + mockSyncReader.EXPECT().PendingState().Return(headState, nopCloser, nil) vmTraceJSON := json.RawMessage(`{ "validate_invocation": {"contract_address": "0xd747220b2744d8d8d48c8a52bd3869fb98aea915665ab2485d5eadb49def6a", "entry_point_selector": "0x162da33a4585851fe8d3af3c2a9c60b557814e221e0d4f30ff0b2189d9c7775", "calldata": ["0x2", "0x53c91253bc9682c04929ca02ed00b3e423f6710d2ee7e0d5ebb06f3ecf368a8", "0x219209e083275171774dab1df80982e9df2096516f06319c5c6d71ae0a8480c", "0x0", "0x3", "0x4270219d365d6b017231b52e92b3fb5d7c8378b05e9abc97724537a80e93b0f", "0x1171593aa5bdadda4d6b0efde6cc94ee7649c3163d5efeb19da6c16d63a2a63", "0x3", "0x10", "0x13", "0x4270219d365d6b017231b52e92b3fb5d7c8378b05e9abc97724537a80e93b0f", "0x1e8480", "0x0", "0x53c91253bc9682c04929ca02ed00b3e423f6710d2ee7e0d5ebb06f3ecf368a8", "0x1e8480", "0x0", "0x49d36570d4e46f48e99674bd3fcc84644ddd6b96f7c741b1562b82f9e004dc7", "0x420eeb770f7a4", "0x0", "0x40139799e37e4", "0x0", "0xd747220b2744d8d8d48c8a52bd3869fb98aea915665ab2485d5eadb49def6a", "0x0", "0x0", "0x1", "0x53c91253bc9682c04929ca02ed00b3e423f6710d2ee7e0d5ebb06f3ecf368a8", "0x49d36570d4e46f48e99674bd3fcc84644ddd6b96f7c741b1562b82f9e004dc7", "0x7a6f98c03379b9513ca84cca1373ff452a7462a3b61598f0af5bb27ad7f76d1", "0x64"], "caller_address": "0x0", "class_hash": "0x25ec026985a3bf9d0cc1fe17326b245dfdc3ff89b8fde106542a3ea56c5a918", "entry_point_type": "EXTERNAL", "call_type": "CALL", "result": [], "calls": [{"contract_address": "0xd747220b2744d8d8d48c8a52bd3869fb98aea915665ab2485d5eadb49def6a", "entry_point_selector": "0x162da33a4585851fe8d3af3c2a9c60b557814e221e0d4f30ff0b2189d9c7775", "calldata": ["0x2", "0x53c91253bc9682c04929ca02ed00b3e423f6710d2ee7e0d5ebb06f3ecf368a8", "0x219209e083275171774dab1df80982e9df2096516f06319c5c6d71ae0a8480c", "0x0", "0x3", "0x4270219d365d6b017231b52e92b3fb5d7c8378b05e9abc97724537a80e93b0f", "0x1171593aa5bdadda4d6b0efde6cc94ee7649c3163d5efeb19da6c16d63a2a63", "0x3", "0x10", "0x13", "0x4270219d365d6b017231b52e92b3fb5d7c8378b05e9abc97724537a80e93b0f", "0x1e8480", "0x0", "0x53c91253bc9682c04929ca02ed00b3e423f6710d2ee7e0d5ebb06f3ecf368a8", "0x1e8480", "0x0", "0x49d36570d4e46f48e99674bd3fcc84644ddd6b96f7c741b1562b82f9e004dc7", "0x420eeb770f7a4", "0x0", "0x40139799e37e4", "0x0", "0xd747220b2744d8d8d48c8a52bd3869fb98aea915665ab2485d5eadb49def6a", "0x0", "0x0", "0x1", "0x53c91253bc9682c04929ca02ed00b3e423f6710d2ee7e0d5ebb06f3ecf368a8", "0x49d36570d4e46f48e99674bd3fcc84644ddd6b96f7c741b1562b82f9e004dc7", "0x7a6f98c03379b9513ca84cca1373ff452a7462a3b61598f0af5bb27ad7f76d1", "0x64"], "caller_address": "0x0", "class_hash": "0x33434ad846cdd5f23eb73ff09fe6fddd568284a0fb7d1be20ee482f044dabe2", "entry_point_type": "EXTERNAL", "call_type": "DELEGATE", "result": [], "calls": [], "events": [], "messages": []}], "events": [], "messages": []}, @@ -404,7 +407,7 @@ func TestTraceBlockTransactions(t *testing.T) { t.Run(description, func(t *testing.T) { log := utils.NewNopZapLogger() n := utils.Ptr(utils.Mainnet) - chain := blockchain.New(pebble.NewMemTest(t), n) + chain := blockchain.New(pebble.NewMemTest(t), n, nil) handler := rpc.New(chain, nil, nil, "", log) update, rpcErr := handler.TraceBlockTransactions(context.Background(), id) @@ -418,11 +421,12 @@ func TestTraceBlockTransactions(t *testing.T) { n := utils.Ptr(utils.Mainnet) mockReader := mocks.NewMockReader(mockCtrl) + mockSyncReader := mocks.NewMockSyncReader(mockCtrl) mockReader.EXPECT().Network().Return(n).AnyTimes() mockVM := mocks.NewMockVM(mockCtrl) log := utils.NewNopZapLogger() - handler := rpc.New(mockReader, nil, mockVM, "", log) + handler := rpc.New(mockReader, mockSyncReader, mockVM, "", log) t.Run("pending block", func(t *testing.T) { blockHash := utils.HexToFelt(t, "0x0001") @@ -454,7 +458,7 @@ func TestTraceBlockTransactions(t *testing.T) { mockReader.EXPECT().StateAtBlockHash(header.ParentHash).Return(state, nopCloser, nil) headState := mocks.NewMockStateHistoryReader(mockCtrl) headState.EXPECT().Class(declareTx.ClassHash).Return(declaredClass, nil) - mockReader.EXPECT().PendingState().Return(headState, nopCloser, nil) + mockSyncReader.EXPECT().PendingState().Return(headState, nopCloser, nil) paidL1Fees := []*felt.Felt{(&felt.Felt{}).SetUint64(1)} vmTraceJSON := json.RawMessage(`{ diff --git a/rpc/transaction.go b/rpc/transaction.go index d040bf866a..48f6e07b00 100644 --- a/rpc/transaction.go +++ b/rpc/transaction.go @@ -11,6 +11,7 @@ import ( "github.com/NethermindEth/juno/clients/gateway" "github.com/NethermindEth/juno/core" "github.com/NethermindEth/juno/core/felt" + "github.com/NethermindEth/juno/db" "github.com/NethermindEth/juno/jsonrpc" "github.com/NethermindEth/juno/starknet" "github.com/NethermindEth/juno/utils" @@ -426,7 +427,20 @@ func adaptRPCTxToFeederTx(rpcTx *Transaction) *starknet.Transaction { func (h *Handler) TransactionByHash(hash felt.Felt) (*Transaction, *jsonrpc.Error) { txn, err := h.bcReader.TransactionByHash(&hash) if err != nil { - return nil, ErrTxnHashNotFound + if !errors.Is(err, db.ErrKeyNotFound) { + return nil, ErrInternal.CloneWithData(err) + } + + pendingB := h.syncReader.PendingBlock() + if pendingB == nil { + return nil, ErrTxnHashNotFound + } + + for _, t := range pendingB.Transactions { + if hash.Equal(t.Hash()) { + txn = t + } + } } return AdaptTransaction(txn), nil } @@ -442,7 +456,7 @@ func (h *Handler) TransactionByBlockIDAndIndex(id BlockID, txIndex int) (*Transa } if id.Pending { - pending, err := h.bcReader.Pending() + pending, err := h.syncReader.Pending() if err != nil { return nil, ErrBlockNotFound } @@ -471,46 +485,57 @@ func (h *Handler) TransactionByBlockIDAndIndex(id BlockID, txIndex int) (*Transa // // It follows the specification defined here: // https://github.com/starkware-libs/starknet-specs/blob/master/api/starknet_api_openrpc.json#L222 -func (h *Handler) TransactionReceiptByHash(hash felt.Felt) (*TransactionReceipt, *jsonrpc.Error) { //nolint:dupl - txn, err := h.bcReader.TransactionByHash(&hash) - if err != nil { - return nil, ErrTxnHashNotFound - } - - receipt, blockHash, blockNumber, err := h.bcReader.Receipt(&hash) - if err != nil { - return nil, ErrTxnHashNotFound - } - - status := TxnAcceptedOnL2 - - if blockHash != nil { - l1H, jsonErr := h.l1Head() - if jsonErr != nil { - return nil, jsonErr - } - - if isL1Verified(blockNumber, l1H) { - status = TxnAcceptedOnL1 - } - } - - return AdaptReceipt(receipt, txn, status, blockHash, blockNumber, false), nil +func (h *Handler) TransactionReceiptByHash(hash felt.Felt) (*TransactionReceipt, *jsonrpc.Error) { + return h.transactionReceiptByHash(hash, false) } // TransactionReceiptByHash returns the receipt of a transaction identified by the given hash. // // It follows the specification defined here: // https://github.com/starkware-libs/starknet-specs/blob/master/api/starknet_api_openrpc.json#L222 -func (h *Handler) TransactionReceiptByHashV0_6(hash felt.Felt) (*TransactionReceipt, *jsonrpc.Error) { //nolint:dupl +func (h *Handler) TransactionReceiptByHashV0_6(hash felt.Felt) (*TransactionReceipt, *jsonrpc.Error) { + return h.transactionReceiptByHash(hash, true) +} + +func (h *Handler) transactionReceiptByHash(hash felt.Felt, v0_6Response bool) (*TransactionReceipt, *jsonrpc.Error) { + var ( + pendingB *core.Block + pendingBIndex int + ) + txn, err := h.bcReader.TransactionByHash(&hash) if err != nil { - return nil, ErrTxnHashNotFound + if !errors.Is(err, db.ErrKeyNotFound) { + return nil, ErrInternal.CloneWithData(err) + } + + pendingB = h.syncReader.PendingBlock() + if pendingB == nil { + return nil, ErrTxnHashNotFound + } + + for i, t := range pendingB.Transactions { + pendingBIndex = i + if hash.Equal(t.Hash()) { + txn = t + break + } + } } - receipt, blockHash, blockNumber, err := h.bcReader.Receipt(&hash) - if err != nil { - return nil, ErrTxnHashNotFound + var ( + receipt *core.TransactionReceipt + blockHash *felt.Felt + blockNumber uint64 + ) + + if pendingB != nil { + receipt = pendingB.Receipts[pendingBIndex] + } else { + receipt, blockHash, blockNumber, err = h.bcReader.Receipt(&hash) + if err != nil { + return nil, ErrTxnHashNotFound + } } status := TxnAcceptedOnL2 @@ -526,7 +551,7 @@ func (h *Handler) TransactionReceiptByHashV0_6(hash felt.Felt) (*TransactionRece } } - return AdaptReceipt(receipt, txn, status, blockHash, blockNumber, true), nil + return AdaptReceipt(receipt, txn, status, blockHash, blockNumber, v0_6Response), nil } // AddTransaction relays a transaction to the gateway. diff --git a/rpc/transaction_test.go b/rpc/transaction_test.go index b4a09a92e0..e57bfee8c4 100644 --- a/rpc/transaction_test.go +++ b/rpc/transaction_test.go @@ -7,7 +7,6 @@ import ( "math/rand" "testing" - "github.com/NethermindEth/juno/blockchain" "github.com/NethermindEth/juno/clients/feeder" "github.com/NethermindEth/juno/core" "github.com/NethermindEth/juno/core/felt" @@ -16,6 +15,7 @@ import ( "github.com/NethermindEth/juno/rpc" "github.com/NethermindEth/juno/starknet" adaptfeeder "github.com/NethermindEth/juno/starknetdata/feeder" + "github.com/NethermindEth/juno/sync" "github.com/NethermindEth/juno/utils" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -26,11 +26,13 @@ func TestTransactionByHashNotFound(t *testing.T) { mockCtrl := gomock.NewController(t) t.Cleanup(mockCtrl.Finish) mockReader := mocks.NewMockReader(mockCtrl) + mockSyncReader := mocks.NewMockSyncReader(mockCtrl) txHash := new(felt.Felt).SetBytes([]byte("random hash")) - mockReader.EXPECT().TransactionByHash(txHash).Return(nil, errors.New("tx not found")) + mockReader.EXPECT().TransactionByHash(txHash).Return(nil, db.ErrKeyNotFound) + mockSyncReader.EXPECT().PendingBlock().Return(nil) - handler := rpc.New(mockReader, nil, nil, "", nil) + handler := rpc.New(mockReader, mockSyncReader, nil, "", nil) tx, rpcErr := handler.TransactionByHash(*txHash) assert.Nil(t, tx) @@ -348,6 +350,7 @@ func TestTransactionByBlockIdAndIndex(t *testing.T) { t.Cleanup(mockCtrl.Finish) n := utils.Ptr(utils.Mainnet) mockReader := mocks.NewMockReader(mockCtrl) + mockSyncReader := mocks.NewMockSyncReader(mockCtrl) client := feeder.NewTestClient(t, n) mainnetGw := adaptfeeder.New(client) @@ -356,7 +359,7 @@ func TestTransactionByBlockIdAndIndex(t *testing.T) { require.NoError(t, err) latestBlockHash := latestBlock.Hash - handler := rpc.New(mockReader, nil, nil, "", nil) + handler := rpc.New(mockReader, mockSyncReader, nil, "", nil) t.Run("empty blockchain", func(t *testing.T) { mockReader.EXPECT().HeadsHeader().Return(nil, db.ErrKeyNotFound) @@ -470,7 +473,7 @@ func TestTransactionByBlockIdAndIndex(t *testing.T) { latestBlock.Hash = nil latestBlock.GlobalStateRoot = nil - mockReader.EXPECT().Pending().Return(&blockchain.Pending{ + mockSyncReader.EXPECT().Pending().Return(&sync.Pending{ Block: latestBlock, }, nil) mockReader.EXPECT().TransactionByHash(latestBlock.Transactions[index].Hash()).DoAndReturn( @@ -1425,8 +1428,10 @@ func TestTransactionStatus(t *testing.T) { for description, notFoundTest := range notFoundTests { t.Run(description, func(t *testing.T) { mockReader := mocks.NewMockReader(mockCtrl) + mockSyncReader := mocks.NewMockSyncReader(mockCtrl) mockReader.EXPECT().TransactionByHash(notFoundTest.hash).Return(nil, db.ErrKeyNotFound).Times(2) - handler := rpc.New(mockReader, nil, nil, "", nil) + mockSyncReader.EXPECT().PendingBlock().Return(nil).Times(2) + handler := rpc.New(mockReader, mockSyncReader, nil, "", nil) _, err := handler.TransactionStatus(ctx, *notFoundTest.hash) require.Equal(t, rpc.ErrTxnHashNotFound.Code, err.Code) @@ -1442,8 +1447,10 @@ func TestTransactionStatus(t *testing.T) { // transaction no† found in db and feeder t.Run("transaction not found in db and feeder ", func(t *testing.T) { mockReader := mocks.NewMockReader(mockCtrl) + mockSyncReader := mocks.NewMockSyncReader(mockCtrl) mockReader.EXPECT().TransactionByHash(test.notFoundTxHash).Return(nil, db.ErrKeyNotFound) - handler := rpc.New(mockReader, nil, nil, "", nil).WithFeeder(client) + mockSyncReader.EXPECT().PendingBlock().Return(nil) + handler := rpc.New(mockReader, mockSyncReader, nil, "", nil).WithFeeder(client) _, err := handler.TransactionStatus(ctx, *test.notFoundTxHash) require.NotNil(t, err) diff --git a/blockchain/pending.go b/sync/pending.go similarity index 98% rename from blockchain/pending.go rename to sync/pending.go index b69cd0ee06..03e12fcf3a 100644 --- a/blockchain/pending.go +++ b/sync/pending.go @@ -1,4 +1,4 @@ -package blockchain +package sync import ( "github.com/NethermindEth/juno/core" diff --git a/blockchain/pending_test.go b/sync/pending_test.go similarity index 95% rename from blockchain/pending_test.go rename to sync/pending_test.go index d5682b9eef..d2cc63e73f 100644 --- a/blockchain/pending_test.go +++ b/sync/pending_test.go @@ -1,12 +1,12 @@ -package blockchain_test +package sync_test import ( "testing" - "github.com/NethermindEth/juno/blockchain" "github.com/NethermindEth/juno/core" "github.com/NethermindEth/juno/core/felt" "github.com/NethermindEth/juno/mocks" + "github.com/NethermindEth/juno/sync" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" @@ -29,7 +29,7 @@ func TestPendingState(t *testing.T) { replacedClassHash, err := new(felt.Felt).SetRandom() require.NoError(t, err) - pending := blockchain.Pending{ + pending := sync.Pending{ Block: nil, StateUpdate: &core.StateUpdate{ BlockHash: nil, @@ -57,7 +57,7 @@ func TestPendingState(t *testing.T) { *deployedClassHash: &core.Cairo0Class{}, }, } - state := blockchain.NewPendingState(pending.StateUpdate.StateDiff, pending.NewClasses, mockState) + state := sync.NewPendingState(pending.StateUpdate.StateDiff, pending.NewClasses, mockState) t.Run("ContractClassHash", func(t *testing.T) { t.Run("from pending", func(t *testing.T) { diff --git a/sync/sync.go b/sync/sync.go index 99f3d3dd2b..67d48a1679 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -3,6 +3,7 @@ package sync import ( "context" "errors" + "fmt" "runtime" "sync/atomic" "time" @@ -21,6 +22,8 @@ import ( var ( _ service.Service = (*Synchronizer)(nil) _ Reader = (*Synchronizer)(nil) + + ErrPendingBlockNotFound = errors.New("pending block not found") ) const ( @@ -41,13 +44,17 @@ type Reader interface { StartingBlockNumber() (uint64, error) HighestBlockHeader() *core.Header SubscribeNewHeads() HeaderSubscription + + Pending() (*Pending, error) + PendingBlock() *core.Block + PendingState() (core.StateReader, func() error, error) } // This is temporary and will be removed once the p2p synchronizer implements this interface. type NoopSynchronizer struct{} func (n *NoopSynchronizer) StartingBlockNumber() (uint64, error) { - return 0, nil + return 0, errors.New("StartingBlockNumber not implemented") } func (n *NoopSynchronizer) HighestBlockHeader() *core.Header { @@ -58,9 +65,22 @@ func (n *NoopSynchronizer) SubscribeNewHeads() HeaderSubscription { return HeaderSubscription{feed.New[*core.Header]().Subscribe()} } +func (n *NoopSynchronizer) PendingBlock() *core.Block { + return nil +} + +func (n *NoopSynchronizer) Pending() (*Pending, error) { + return nil, errors.New("Pending() is not implemented") +} + +func (n *NoopSynchronizer) PendingState() (core.StateReader, func() error, error) { + return nil, nil, errors.New("PendingState() not implemented") +} + // Synchronizer manages a list of StarknetData to fetch the latest blockchain updates type Synchronizer struct { blockchain *blockchain.Blockchain + db db.DB readOnlyBlockchain bool starknetData starknetdata.StarknetData startingBlockNumber *uint64 @@ -70,15 +90,17 @@ type Synchronizer struct { log utils.SimpleLogger listener EventListener + pending atomic.Pointer[Pending] pendingPollInterval time.Duration catchUpMode bool } -func New(bc *blockchain.Blockchain, starkNetData starknetdata.StarknetData, - log utils.SimpleLogger, pendingPollInterval time.Duration, readOnlyBlockchain bool, +func New(bc *blockchain.Blockchain, starkNetData starknetdata.StarknetData, log utils.SimpleLogger, + pendingPollInterval time.Duration, readOnlyBlockchain bool, database db.DB, ) *Synchronizer { s := &Synchronizer{ blockchain: bc, + db: database, starknetData: starkNetData, log: log, newHeads: feed.New[*core.Header](), @@ -420,7 +442,7 @@ func (s *Synchronizer) fetchAndStorePending(ctx context.Context) error { } s.log.Debugw("Found pending block", "txns", pendingBlock.TransactionCount) - return s.blockchain.StorePending(&blockchain.Pending{ + return s.StorePending(&Pending{ Block: pendingBlock, StateUpdate: pendingStateUpdate, NewClasses: newClasses, @@ -443,3 +465,78 @@ func (s *Synchronizer) SubscribeNewHeads() HeaderSubscription { Subscription: s.newHeads.Subscribe(), } } + +// StorePending stores a pending block given that it is for the next height +func (s *Synchronizer) StorePending(p *Pending) error { + err := blockchain.CheckBlockVersion(p.Block.ProtocolVersion) + if err != nil { + return err + } + + expectedParentHash := new(felt.Felt) + h, err := s.blockchain.HeadsHeader() + if err != nil && !errors.Is(err, db.ErrKeyNotFound) { + return err + } else if err == nil { + expectedParentHash = h.Hash + } + + if !expectedParentHash.Equal(p.Block.ParentHash) { + return fmt.Errorf("store pending: %w", blockchain.ErrParentDoesNotMatchHead) + } + + if existingPending, err := s.Pending(); err == nil { + if existingPending.Block.TransactionCount >= p.Block.TransactionCount { + // ignore the incoming pending if it has fewer transactions than the one we already have + return nil + } + } else if !errors.Is(err, ErrPendingBlockNotFound) { + return err + } + s.pending.Store(p) + + return nil +} + +func (s *Synchronizer) Pending() (*Pending, error) { + p := s.pending.Load() + if p == nil { + return nil, ErrPendingBlockNotFound + } + + expectedParentHash := &felt.Zero + if head, err := s.blockchain.HeadsHeader(); err == nil { + expectedParentHash = head.Hash + } + if p.Block.ParentHash.Equal(expectedParentHash) { + return p, nil + } + + // Since the pending block in the cache is outdated remove it + s.pending.Store(nil) + + return nil, ErrPendingBlockNotFound +} + +func (s *Synchronizer) PendingBlock() *core.Block { + pending, err := s.Pending() + if err != nil { + return nil + } + return pending.Block +} + +// PendingState returns the state resulting from execution of the pending block +func (s *Synchronizer) PendingState() (core.StateReader, func() error, error) { + txn, err := s.db.NewTransaction(false) + if err != nil { + return nil, nil, err + } + + pending, err := s.Pending() + if err != nil { + return nil, nil, utils.RunAndWrapOnError(txn.Discard, err) + } + + return NewPendingState(pending.StateUpdate.StateDiff, pending.NewClasses, core.NewState(txn)), txn.Discard, nil +} diff --git a/sync/sync_test.go b/sync/sync_test.go index 4f1d8a096a..ab97fc322b 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -55,8 +55,8 @@ func TestSyncBlocks(t *testing.T) { log := utils.NewNopZapLogger() t.Run("sync multiple blocks in an empty db", func(t *testing.T) { testDB := pebble.NewMemTest(t) - bc := blockchain.New(testDB, &utils.Mainnet) - synchronizer := sync.New(bc, gw, log, time.Duration(0), false) + bc := blockchain.New(testDB, &utils.Mainnet, nil) + synchronizer := sync.New(bc, gw, log, time.Duration(0), false, testDB) ctx, cancel := context.WithTimeout(context.Background(), timeout) require.NoError(t, synchronizer.Run(ctx)) @@ -67,14 +67,14 @@ func TestSyncBlocks(t *testing.T) { t.Run("sync multiple blocks in a non-empty db", func(t *testing.T) { testDB := pebble.NewMemTest(t) - bc := blockchain.New(testDB, &utils.Mainnet) + bc := blockchain.New(testDB, &utils.Mainnet, nil) b0, err := gw.BlockByNumber(context.Background(), 0) require.NoError(t, err) s0, err := gw.StateUpdate(context.Background(), 0) require.NoError(t, err) require.NoError(t, bc.Store(b0, &core.BlockCommitments{}, s0, nil)) - synchronizer := sync.New(bc, gw, log, time.Duration(0), false) + synchronizer := sync.New(bc, gw, log, time.Duration(0), false, testDB) ctx, cancel := context.WithTimeout(context.Background(), timeout) require.NoError(t, synchronizer.Run(ctx)) @@ -85,7 +85,7 @@ func TestSyncBlocks(t *testing.T) { t.Run("sync multiple blocks, with an unreliable gw", func(t *testing.T) { testDB := pebble.NewMemTest(t) - bc := blockchain.New(testDB, &utils.Mainnet) + bc := blockchain.New(testDB, &utils.Mainnet, nil) mockSNData := mocks.NewMockStarknetData(mockCtrl) @@ -126,7 +126,7 @@ func TestSyncBlocks(t *testing.T) { return gw.BlockLatest(context.Background()) }).AnyTimes() - synchronizer := sync.New(bc, mockSNData, log, time.Duration(0), false) + synchronizer := sync.New(bc, mockSNData, log, time.Duration(0), false, testDB) ctx, cancel := context.WithTimeout(context.Background(), 2*timeout) require.NoError(t, synchronizer.Run(ctx)) @@ -146,22 +146,22 @@ func TestReorg(t *testing.T) { testDB := pebble.NewMemTest(t) // sync to integration for 2 blocks - bc := blockchain.New(testDB, &utils.Integration) - synchronizer := sync.New(bc, integGw, utils.NewNopZapLogger(), 0, false) + bc := blockchain.New(testDB, &utils.Integration, nil) + synchronizer := sync.New(bc, integGw, utils.NewNopZapLogger(), 0, false, testDB) ctx, cancel := context.WithTimeout(context.Background(), timeout) require.NoError(t, synchronizer.Run(ctx)) cancel() t.Run("resync to mainnet with the same db", func(t *testing.T) { - bc := blockchain.New(testDB, &utils.Mainnet) + bc := blockchain.New(testDB, &utils.Mainnet, nil) // Ensure current head is Integration head head, err := bc.HeadsHeader() require.NoError(t, err) require.Equal(t, utils.HexToFelt(t, "0x34e815552e42c5eb5233b99de2d3d7fd396e575df2719bf98e7ed2794494f86"), head.Hash) - synchronizer = sync.New(bc, mainGw, utils.NewNopZapLogger(), 0, false) + synchronizer = sync.New(bc, mainGw, utils.NewNopZapLogger(), 0, false, testDB) ctx, cancel = context.WithTimeout(context.Background(), timeout) require.NoError(t, synchronizer.Run(ctx)) cancel() @@ -173,16 +173,41 @@ func TestReorg(t *testing.T) { }) } -func TestPending(t *testing.T) { +func TestSubscribeNewHeads(t *testing.T) { + t.Parallel() + testDB := pebble.NewMemTest(t) + log := utils.NewNopZapLogger() + integration := utils.Integration + chain := blockchain.New(testDB, &integration, nil) + integrationClient := feeder.NewTestClient(t, &integration) + gw := adaptfeeder.New(integrationClient) + syncer := sync.New(chain, gw, log, 0, false, testDB) + + sub := syncer.SubscribeNewHeads() + + // Receive on new block. + ctx, cancel := context.WithTimeout(context.Background(), timeout) + require.NoError(t, syncer.Run(ctx)) + cancel() + got, ok := <-sub.Recv() + require.True(t, ok) + want, err := gw.BlockByNumber(context.Background(), 0) + require.NoError(t, err) + require.Equal(t, want.Header, got) + sub.Unsubscribe() +} + +func TestPendingSync(t *testing.T) { t.Parallel() client := feeder.NewTestClient(t, &utils.Mainnet) gw := adaptfeeder.New(client) + var synchronizer *sync.Synchronizer testDB := pebble.NewMemTest(t) log := utils.NewNopZapLogger() - bc := blockchain.New(testDB, &utils.Mainnet) - synchronizer := sync.New(bc, gw, log, time.Millisecond*100, false) + bc := blockchain.New(testDB, &utils.Mainnet, synchronizer.PendingBlock) + synchronizer = sync.New(bc, gw, log, time.Millisecond*100, false, testDB) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) require.NoError(t, synchronizer.Run(ctx)) @@ -190,31 +215,84 @@ func TestPending(t *testing.T) { head, err := bc.HeadsHeader() require.NoError(t, err) - pending, err := bc.Pending() + pending, err := synchronizer.Pending() require.NoError(t, err) assert.Equal(t, head.Hash, pending.Block.ParentHash) } -func TestSubscribeNewHeads(t *testing.T) { - t.Parallel() - testDB := pebble.NewMemTest(t) - log := utils.NewNopZapLogger() - integration := utils.Integration - chain := blockchain.New(testDB, &integration) - integrationClient := feeder.NewTestClient(t, &integration) - gw := adaptfeeder.New(integrationClient) - syncer := sync.New(chain, gw, log, 0, false) +func TestPending(t *testing.T) { + client := feeder.NewTestClient(t, &utils.Mainnet) + gw := adaptfeeder.New(client) - sub := syncer.SubscribeNewHeads() + var synchronizer *sync.Synchronizer + testDB := pebble.NewMemTest(t) + chain := blockchain.New(testDB, &utils.Mainnet, synchronizer.PendingBlock) + synchronizer = sync.New(chain, gw, utils.NewNopZapLogger(), 0, false, testDB) - // Receive on new block. - ctx, cancel := context.WithTimeout(context.Background(), timeout) - require.NoError(t, syncer.Run(ctx)) - cancel() - got, ok := <-sub.Recv() - require.True(t, ok) - want, err := gw.BlockByNumber(context.Background(), 0) + b, err := gw.BlockByNumber(context.Background(), 0) require.NoError(t, err) - require.Equal(t, want.Header, got) - sub.Unsubscribe() + su, err := gw.StateUpdate(context.Background(), 0) + require.NoError(t, err) + + t.Run("pending state shouldnt exist if no pending block", func(t *testing.T) { + _, _, err = synchronizer.PendingState() + require.Error(t, err) + }) + + t.Run("cannot store unsupported pending block version", func(t *testing.T) { + pending := &sync.Pending{Block: &core.Block{Header: &core.Header{ProtocolVersion: "1.9.0"}}} + require.Error(t, synchronizer.StorePending(pending)) + }) + + t.Run("store genesis as pending", func(t *testing.T) { + pendingGenesis := &sync.Pending{ + Block: b, + StateUpdate: su, + } + require.NoError(t, synchronizer.StorePending(pendingGenesis)) + + gotPending, pErr := synchronizer.Pending() + require.NoError(t, pErr) + assert.Equal(t, pendingGenesis, gotPending) + }) + + require.NoError(t, chain.Store(b, &core.BlockCommitments{}, su, nil)) + + t.Run("storing a pending too far into the future should fail", func(t *testing.T) { + b, err = gw.BlockByNumber(context.Background(), 2) + require.NoError(t, err) + su, err = gw.StateUpdate(context.Background(), 2) + require.NoError(t, err) + + notExpectedPending := sync.Pending{ + Block: b, + StateUpdate: su, + } + require.ErrorIs(t, synchronizer.StorePending(¬ExpectedPending), blockchain.ErrParentDoesNotMatchHead) + }) + + t.Run("store expected pending block", func(t *testing.T) { + b, err = gw.BlockByNumber(context.Background(), 1) + require.NoError(t, err) + su, err = gw.StateUpdate(context.Background(), 1) + require.NoError(t, err) + + expectedPending := &sync.Pending{ + Block: b, + StateUpdate: su, + } + require.NoError(t, synchronizer.StorePending(expectedPending)) + + gotPending, pErr := synchronizer.Pending() + require.NoError(t, pErr) + assert.Equal(t, expectedPending, gotPending) + }) + + t.Run("get pending state", func(t *testing.T) { + _, pendingStateCloser, pErr := synchronizer.PendingState() + t.Cleanup(func() { + require.NoError(t, pendingStateCloser()) + }) + require.NoError(t, pErr) + }) }