Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion internal/fakechain/fakechain.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,12 @@ func (s *FakeStateSync) AddMPTNodes(nodes [][]byte) error {
}

// AddContractStorageItems implements the StateSync interface.
func (s *FakeStateSync) AddContractStorageItems(kv []storage.KeyValue, syncHeight uint32, expectedRoot util.Uint256, witness transaction.Witness) error {
func (s *FakeStateSync) AddContractStorageItems(kv []storage.KeyValue) error {
panic("TODO")
}

// InitContractStorageSync implements the StateSync interface.
func (s *FakeStateSync) InitContractStorageSync(r state.MPTRoot) error {
panic("TODO")
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateChangeStage) erro
if err != nil {
return fmt.Errorf("failed to get checkpoint metadata: %w", err)
}
root = ckpt.MPTRoot
root = ckpt.IntermediateRoot
} else {
blk, err := bc.dao.GetBlock(bc.GetHeaderHash(p + 1))
if err != nil {
Expand Down
20 changes: 12 additions & 8 deletions pkg/core/dao/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,29 +580,33 @@ func (dao *Simple) GetStateSyncPoint() (uint32, error) {
// StateSyncCheckpoint stores the state of an interrupted contract storage state sync in
// ContractStorageBased mode.
type StateSyncCheckpoint struct {
// MPTRoot is a computed intermediate MPT root at the StateSyncCheckpoint.
MPTRoot util.Uint256
// IntermediateRoot is a computed intermediate root of non-complete MPT
// at the StateSyncCheckpoint.
IntermediateRoot util.Uint256
// TODO: @roman-khimov, technically this and previous commit include a DB schema change since StateSyncCheckpoint is stored in the DB.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a note to release, that's it. State sync should be finished with the old node if it's in process.

// Practically, hardly ever anyone used this feature and StateSyncCheckpoint is removed right after sync.
// So can we avoid DB schema version upgrade?
// Root is the actual state root at the StateSyncPoint.
Root util.Uint256
// Witness is the actual witness of state root at the StateSyncPoint, it
// may be empty if the state object doesn't contain it.
Witness transaction.Witness
// IsMPTSynced indicates whether sync process is completed.
IsMPTSynced bool
// LastStoredKey is the last processed storage key.
LastStoredKey []byte
}

// EncodeBinary encodes StateSyncCheckpoint to binary format.
func (s *StateSyncCheckpoint) EncodeBinary(w *io.BinWriter) {
w.WriteBytes(s.MPTRoot[:])
w.WriteBool(s.IsMPTSynced)
w.WriteBytes(s.IntermediateRoot[:])
w.WriteBytes(s.Root[:])
w.WriteVarBytes(s.LastStoredKey)
s.Witness.EncodeBinary(w)
}

// DecodeBinary decodes StateSyncCheckpoint from binary format.
func (s *StateSyncCheckpoint) DecodeBinary(br *io.BinReader) {
br.ReadBytes(s.MPTRoot[:])
s.IsMPTSynced = br.ReadBool()
br.ReadBytes(s.IntermediateRoot[:])
br.ReadBytes(s.Root[:])
s.LastStoredKey = br.ReadVarBytes()
s.Witness.DecodeBinary(br)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/core/dao/dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,9 +447,9 @@ func TestPutGetStateSyncCheckPoint(t *testing.T) {

// non-empty store
expected := StateSyncCheckpoint{
MPTRoot: util.Uint256{1, 2, 3},
IsMPTSynced: true,
LastStoredKey: []byte{1, 2, 3},
IntermediateRoot: util.Uint256{1, 2, 3},
Root: util.Uint256{4, 5, 6},
LastStoredKey: []byte{1, 2, 3},
Witness: transaction.Witness{
InvocationScript: []byte{1, 2, 3},
VerificationScript: []byte{1, 2, 3},
Expand Down
96 changes: 63 additions & 33 deletions pkg/core/statesync/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/dao"
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
"github.com/nspcc-dev/neo-go/pkg/core/native"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/stateroot"
"github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
Expand Down Expand Up @@ -116,6 +117,9 @@ type Module struct {
// lastStoredKey is the last processed storage key in case of ContractStorageBased
// state synchronisation.
lastStoredKey []byte
// root is the MPT root (optionally with witness attached) at the syncPoint. It
// may be empty in case if the Module is started on already synchronized state.
root state.MPTRoot

dao *dao.Simple
bc Ledger
Expand Down Expand Up @@ -236,8 +240,41 @@ func (s *Module) Init(currChainHeight uint32) error {
return s.defineSyncStage()
}

func (s *Module) InitContractStorageItemsSync(syncHeight uint32, expectedRoot util.Uint256, witness transaction.Witness) {
// InitContractStorageSync prepares Module for contract storage items
// synchronization, so that it's possible to use the AddContractStorageItems
// callback afterwards.
func (s *Module) InitContractStorageSync(root state.MPTRoot) error {
s.lock.Lock()
defer s.lock.Unlock()

if root.Index != s.syncPoint {
return fmt.Errorf("invalid sync height: expected %d, got %d", s.syncPoint, root.Index)
}

if s.bc.GetConfig().StateRootInHeader {
header, err := s.bc.GetHeader(s.bc.GetHeaderHash(s.syncPoint + 1))
if err != nil {
return fmt.Errorf("failed to get header to check state root: %w", err)
}
if !header.PrevStateRoot.Equals(root.Root) {
return fmt.Errorf("state root mismatch: %s != %s", header.PrevStateRoot.StringLE(), root.Root.StringLE())
}
} else if len(root.Witness) > 0 {
header, err := s.bc.GetHeader(s.bc.GetHeaderHash(s.syncPoint))
if err != nil {
return fmt.Errorf("failed to get header to check state root: %w", err)
}
if !bytes.Equal(header.Script.VerificationScript, root.Witness[0].VerificationScript) {
return fmt.Errorf("state root witness mismatch: %s != %s", hex.EncodeToString(header.Script.VerificationScript), hex.EncodeToString(root.Witness[0].VerificationScript))
}
}

// If s.root is extracted from a checkpoint, check that the provided root matches the stored one.
if !s.root.Root.Equals(util.Uint256{}) && (s.root.Index != root.Index || !s.root.Root.Equals(root.Root)) {
return fmt.Errorf("invalid state root: checkpoint holds %d/%s, got %d/%s", s.root.Index, s.root.Root.StringLE(), root.Index, root.Root.StringLE())
}
s.root = root
return nil
}

// SetOnStageChanged sets callback that is triggered whenever the sync stage changes.
Expand Down Expand Up @@ -330,14 +367,23 @@ func (s *Module) defineSyncStage() error {
return fmt.Errorf("failed to load checkpoint: %w", err)
}
} else {
s.localTrie = mpt.NewTrie(mpt.NewHashNode(ckpt.MPTRoot), mode, s.dao.Store)
s.localTrie = mpt.NewTrie(mpt.NewHashNode(ckpt.IntermediateRoot), mode, s.dao.Store)
s.lastStoredKey = ckpt.LastStoredKey
var w []transaction.Witness
if len(ckpt.Witness.VerificationScript) > 0 {
w = []transaction.Witness{ckpt.Witness}
}
s.root = state.MPTRoot{
Index: s.syncPoint,
Root: ckpt.Root,
Witness: w,
}

if ckpt.IsMPTSynced {
if ckpt.Root.Equals(ckpt.IntermediateRoot) {
s.syncStage |= mptSynced
s.log.Info("MPT and contract storage are in sync",
zap.Uint32("syncPoint", s.syncPoint),
zap.String("stateRoot", ckpt.MPTRoot.StringLE()))
zap.String("stateRoot", ckpt.IntermediateRoot.StringLE()))
}
}
}
Expand Down Expand Up @@ -553,7 +599,7 @@ func (s *Module) AddMPTNodes(nodes [][]byte) error {
}

// AddContractStorageItems adds a batch of key-value pairs for storage-based sync.
func (s *Module) AddContractStorageItems(kvs []storage.KeyValue, syncHeight uint32, expectedRoot util.Uint256, witness transaction.Witness) error {
func (s *Module) AddContractStorageItems(kvs []storage.KeyValue) error {
if s.mode == MPTBased {
panic("contract storage items are not expected in MPT-based mode")
}
Expand All @@ -568,12 +614,9 @@ func (s *Module) AddContractStorageItems(kvs []storage.KeyValue, syncHeight uint
}
}()

if s.syncStage&headersSynced == 0 || s.syncStage&mptSynced != 0 || expectedRoot.Equals(s.localTrie.StateRoot()) {
if s.syncStage&headersSynced == 0 || s.syncStage&mptSynced != 0 {
return errors.New("contract storage items were not requested")
}
if syncHeight != s.syncPoint {
return fmt.Errorf("invalid sync height: expected %d, got %d", s.syncPoint, syncHeight)
}
if len(kvs) == 0 {
return fmt.Errorf("key-value pairs are empty")
}
Expand All @@ -588,42 +631,29 @@ func (s *Module) AddContractStorageItems(kvs []storage.KeyValue, syncHeight uint
_ = s.dao.Store.PutChangeSet(nil, batch)
mptBatch := mpt.MapToMPTBatch(batch)
if _, err := s.localTrie.PutBatch(mptBatch); err != nil {
return fmt.Errorf("failed to apply MPT batch at height %d: %w", syncHeight, err)
return fmt.Errorf("failed to apply MPT batch at %d: %w", s.syncPoint, err)
}
s.localTrie.Flush(syncHeight)
s.localTrie.Flush(s.syncPoint)
s.lastStoredKey = kvs[len(kvs)-1].Key
computedRoot := s.localTrie.StateRoot()
w := transaction.Witness{}
if len(s.root.Witness) > 0 {
w = s.root.Witness[0]
}
ckpt := dao.StateSyncCheckpoint{
MPTRoot: s.localTrie.StateRoot(),
LastStoredKey: kvs[len(kvs)-1].Key,
IsMPTSynced: computedRoot.Equals(expectedRoot),
Witness: witness,
IntermediateRoot: computedRoot,
Root: s.root.Root,
LastStoredKey: kvs[len(kvs)-1].Key,
Witness: w,
}
s.dao.PutStateSyncCheckpoint(ckpt)
if _, err := s.dao.Store.PersistSync(); err != nil {
return fmt.Errorf("failed to persist checkpoint metadata: %w", err)
}
if !computedRoot.Equals(expectedRoot) {
if !computedRoot.Equals(s.root.Root) {
return nil
}

if s.bc.GetConfig().StateRootInHeader {
header, err := s.bc.GetHeader(s.bc.GetHeaderHash(s.syncPoint + 1))
if err != nil {
return fmt.Errorf("failed to get header to check state root: %w", err)
}
if !header.PrevStateRoot.Equals(expectedRoot) {
return fmt.Errorf("state root mismatch: %s != %s", header.PrevStateRoot.StringLE(), expectedRoot.StringLE())
}
} else if len(witness.VerificationScript) > 0 {
header, err := s.bc.GetHeader(s.bc.GetHeaderHash(s.syncPoint))
if err != nil {
return fmt.Errorf("failed to get header to check state root: %w", err)
}
if !bytes.Equal(header.Script.VerificationScript, witness.VerificationScript) {
return fmt.Errorf("state root witness mismatch: %s != %s", hex.EncodeToString(header.Script.VerificationScript), hex.EncodeToString(witness.VerificationScript))
}
}
s.syncStage |= mptSynced
s.blockHeight = s.getLatestSavedBlock(s.syncPoint)
s.log.Info("MPT and contract storage are in sync",
Expand Down
Loading
Loading