diff --git a/cli/util/upload_state.go b/cli/util/upload_state.go index 852b5bc3ab..4068b66772 100644 --- a/cli/util/upload_state.go +++ b/cli/util/upload_state.go @@ -1,6 +1,7 @@ package util import ( + "encoding/hex" "fmt" "strconv" "time" @@ -138,11 +139,14 @@ loop: attrs = []object.Attribute{ object.NewAttribute(attr, strconv.Itoa(int(height))), object.NewAttribute("Timestamp", strconv.FormatInt(time.Now().Unix(), 10)), - object.NewAttribute("StateRoot", stateRoot.Root.StringLE()), + object.NewAttribute(neofs.DefaultStateRootAttribute, stateRoot.Root.StringLE()), object.NewAttribute("StateSyncInterval", strconv.Itoa(syncInterval)), object.NewAttribute("BlockTime", strconv.FormatUint(h.Timestamp, 10)), } ) + if len(stateRoot.Witness) > 0 { + attrs = append(attrs, object.NewAttribute(neofs.DefaultWitnessAttribute, hex.EncodeToString(stateRoot.Witness[0].Bytes()))) + } hdr.SetContainerID(containerID) hdr.SetOwner(signer.UserID()) hdr.SetAttributes(attrs...) diff --git a/docs/neofs-blockstorage.md b/docs/neofs-blockstorage.md index 3b17d12c3a..3706ce0948 100644 --- a/docs/neofs-blockstorage.md +++ b/docs/neofs-blockstorage.md @@ -37,6 +37,8 @@ in a binary form as a separate object with a unique OID and a set of attributes: - state root hash in the LE form (`StateRoot:58a5157b7e99eeabf631291f1747ec8eb12ab89461cda888492b17a301de81e8`) - millisecond-precision block creation timestamp (`BlockTime:1468595301000`) - second-precision contract state object uploading timestamp (`Timestamp:1742957073`) +- hex-encoded binary representation of state root witness (for validated + stateroots only) (`Witness:c60c408b7c6f48320eb201ed20cce53188a4c6eb989cc71fa49991380d1c53f6527f4b1e014c2f77d7893f372a2f8e1fdf49f4513720a25256196d6531b6c18e4f5d440c4096e6bdeed500011a20f784444d757fe2b3a372d2c63369d0c759dfc5e852b2ae63b41d721aae44eb22284db1e16a032107f1c36986c94e41696f2bcd659827210c40f6d84e945e1f632a2e43b730c728c5f18741868c19f41aa46a151f8f94ea5c880d67f681bd14baa45cfcaa8b1d2c7553f493299e34cd941ba3d6e93044d2392493130c210345e2bbda8d3d9e24d1e9ee61df15d4f435f69a44fe012d86e9cf9377baaa42cd0c210353663d8da8d6c344aade0168c1cfb651db859175a60c48c6bd4000c9e682d0f50c210392fbd1d809a3c62f7dcde8f25454a1570830a21e4b014b3f362a79baf413e1150c21039b45040cc529966165ef5dff3d046a4960520ce616ae170e265d669e0e2de7f414419ed0dc3a`) The binary form of the contract state object includes serialized sequence of contract storage item key-value pairs for every deployed contract including diff --git a/internal/fakechain/fakechain.go b/internal/fakechain/fakechain.go index 6b06646185..0e7bf15937 100644 --- a/internal/fakechain/fakechain.go +++ b/internal/fakechain/fakechain.go @@ -459,7 +459,12 @@ func (s *FakeStateSync) AddMPTNodes(nodes [][]byte) error { } // AddContractStorageItems implements the StateSync interface. -func (s *FakeStateSync) AddContractStorageItems(kv []storage.KeyValue, syncHeight uint32, expectedRoot util.Uint256) error { +func (s *FakeStateSync) AddContractStorageItems(kv []storage.KeyValue) error { + panic("TODO") +} + +// InitContractStorageSync implements the StateSync interface. +func (s *FakeStateSync) InitContractStorageSync(r state.MPTRoot) error { panic("TODO") } @@ -527,3 +532,8 @@ func (s *FakeStateSync) GetStateSyncPoint() uint32 { func (s *FakeStateSync) GetLastStoredKey() []byte { panic("TODO") } + +// VerifyWitness implements the StateSync interface. +func (s *FakeStateSync) VerifyWitness(h util.Uint160, c hash.Hashable, w *transaction.Witness, gas int64) (int64, error) { + panic("TODO") +} diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index ebe3141f07..0fe7fd89eb 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -860,7 +860,7 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateChangeStage) erro if err != nil { return fmt.Errorf("failed to get checkpoint metadata: %w", err) } - root = ckpt.MPTRoot + root = ckpt.IntermediateRoot } else { blk, err := bc.dao.GetBlock(bc.GetHeaderHash(p + 1)) if err != nil { @@ -868,14 +868,17 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateChangeStage) erro } root = blk.PrevStateRoot } - bc.stateRoot.JumpToState(&state.MPTRoot{ + err := bc.stateRoot.JumpToState(&state.MPTRoot{ Index: p, Root: root, }) + if err != nil { + return fmt.Errorf("failed to update stateroot module state: %w", err) + } bc.dao.Store.Delete(jumpStageKey) bc.dao.Store.Delete([]byte{byte(storage.SYSStateSyncCheckpoint)}) - _, err := bc.dao.Store.Persist() + _, err = bc.dao.Store.Persist() if err != nil { return fmt.Errorf("failed to persist %d stage of state jump: %w", staleBlocksRemoved, err) } @@ -912,6 +915,7 @@ func (bc *Blockchain) resetRAMState(height uint32, resetHeaders bool) error { if err != nil { return fmt.Errorf("failed to initialize natives cache: %w", err) } + bc.designate.NotifyServices(bc.dao) if err := bc.updateExtensibleWhitelist(height); err != nil { return fmt.Errorf("failed to update extensible whitelist: %w", err) diff --git a/pkg/core/dao/dao.go b/pkg/core/dao/dao.go index ec90dc4a04..992a3ef74a 100644 --- a/pkg/core/dao/dao.go +++ b/pkg/core/dao/dao.go @@ -580,26 +580,35 @@ func (dao *Simple) GetStateSyncPoint() (uint32, error) { // StateSyncCheckpoint stores the state of an interrupted contract storage state sync in // ContractStorageBased mode. type StateSyncCheckpoint struct { - // MPTRoot is a computed intermediate MPT root at the StateSyncCheckpoint. - MPTRoot util.Uint256 - // IsMPTSynced indicates whether sync process is completed. - IsMPTSynced bool + // IntermediateRoot is a computed intermediate root of non-complete MPT + // at the StateSyncCheckpoint. + IntermediateRoot util.Uint256 + // TODO: @roman-khimov, technically this and previous commit include a DB schema change since StateSyncCheckpoint is stored in the DB. + // Practically, hardly ever anyone used this feature and StateSyncCheckpoint is removed right after sync. + // So can we avoid DB schema version upgrade? + // Root is the actual state root at the StateSyncPoint. + Root util.Uint256 + // Witness is the actual witness of state root at the StateSyncPoint, it + // may be empty if the state object doesn't contain it. + Witness transaction.Witness // LastStoredKey is the last processed storage key. LastStoredKey []byte } // EncodeBinary encodes StateSyncCheckpoint to binary format. func (s *StateSyncCheckpoint) EncodeBinary(w *io.BinWriter) { - w.WriteBytes(s.MPTRoot[:]) - w.WriteBool(s.IsMPTSynced) + w.WriteBytes(s.IntermediateRoot[:]) + w.WriteBytes(s.Root[:]) w.WriteVarBytes(s.LastStoredKey) + s.Witness.EncodeBinary(w) } // DecodeBinary decodes StateSyncCheckpoint from binary format. func (s *StateSyncCheckpoint) DecodeBinary(br *io.BinReader) { - br.ReadBytes(s.MPTRoot[:]) - s.IsMPTSynced = br.ReadBool() + br.ReadBytes(s.IntermediateRoot[:]) + br.ReadBytes(s.Root[:]) s.LastStoredKey = br.ReadVarBytes() + s.Witness.DecodeBinary(br) } // GetStateSyncCheckpoint returns the current StateSyncCheckpoint. diff --git a/pkg/core/dao/dao_test.go b/pkg/core/dao/dao_test.go index 2d1232ff77..a3af05210a 100644 --- a/pkg/core/dao/dao_test.go +++ b/pkg/core/dao/dao_test.go @@ -447,9 +447,13 @@ func TestPutGetStateSyncCheckPoint(t *testing.T) { // non-empty store expected := StateSyncCheckpoint{ - MPTRoot: util.Uint256{1, 2, 3}, - IsMPTSynced: true, - LastStoredKey: []byte{1, 2, 3}, + IntermediateRoot: util.Uint256{1, 2, 3}, + Root: util.Uint256{4, 5, 6}, + LastStoredKey: []byte{1, 2, 3}, + Witness: transaction.Witness{ + InvocationScript: []byte{1, 2, 3}, + VerificationScript: []byte{1, 2, 3}, + }, } dao.PutStateSyncCheckpoint(expected) actual, err := dao.GetStateSyncCheckpoint() diff --git a/pkg/core/native/contract.go b/pkg/core/native/contract.go index 252728c4af..0e558453a7 100644 --- a/pkg/core/native/contract.go +++ b/pkg/core/native/contract.go @@ -99,6 +99,7 @@ type ( SetOracleService(o OracleService) SetNotaryService(n NotaryService) SetStateRootService(s StateRootService) + NotifyServices(dao *dao.Simple) } // INotary is an interface required from native Notary contract for diff --git a/pkg/core/native/designate.go b/pkg/core/native/designate.go index ae17c8a35e..d889773df4 100644 --- a/pkg/core/native/designate.go +++ b/pkg/core/native/designate.go @@ -184,9 +184,24 @@ func (s *Designate) OnPersist(ic *interop.Context) error { // PostPersist implements the Contract interface. func (s *Designate) PostPersist(ic *interop.Context) error { - cache := ic.DAO.GetRWCache(s.ID).(*DesignationCache) - if !cache.rolesChangedFlag { - return nil + s.notifyServicesInternal(ic.DAO, false) + return nil +} + +// NotifyServices notifies dependent services about changes in the node roles. +// It does not check whether roles were updated in the last block. It modifies +// Designate cache accordingly. +func (s *Designate) NotifyServices(dao *dao.Simple) { + s.notifyServicesInternal(dao, true) +} + +// notifyServicesInternal is the internal implementation of NotifyServices. It +// accepts an additional force flag to force services notification even if no +// roles were updated in the last block. +func (s *Designate) notifyServicesInternal(dao *dao.Simple, force bool) { + cache := dao.GetRWCache(s.ID).(*DesignationCache) + if !cache.rolesChangedFlag && !force { + return } s.notifyRoleChanged(&cache.oracles, noderoles.Oracle) @@ -195,7 +210,6 @@ func (s *Designate) PostPersist(ic *interop.Context) error { s.notifyRoleChanged(&cache.notaries, noderoles.P2PNotary) cache.rolesChangedFlag = false - return nil } // Metadata returns contract metadata. diff --git a/pkg/core/stateroot/module.go b/pkg/core/stateroot/module.go index a63073dda3..2d90baad05 100644 --- a/pkg/core/stateroot/module.go +++ b/pkg/core/stateroot/module.go @@ -223,7 +223,7 @@ func (s *Module) CleanStorage() error { } // JumpToState performs jump to the state specified by given stateroot index. -func (s *Module) JumpToState(sr *state.MPTRoot) { +func (s *Module) JumpToState(sr *state.MPTRoot) error { s.addLocalStateRoot(s.Store, sr) data := make([]byte, 4) @@ -234,6 +234,8 @@ func (s *Module) JumpToState(sr *state.MPTRoot) { s.currentLocal.Store(sr.Root) s.localHeight.Store(sr.Index) s.mpt = mpt.NewTrie(mpt.NewHashNode(sr.Root), s.mode, s.Store) + + return nil } // ResetState resets MPT state to the given height. @@ -371,13 +373,15 @@ func (s *Module) VerifyStateRoot(r *state.MPTRoot) error { return s.verifyWitness(r) } -const maxVerificationGAS = 2_00000000 +// MaxVerificationGAS is the maximum amount of GAS that can be spent for stateroot +// witness verification. +const MaxVerificationGAS = 2_00000000 // verifyWitness verifies state root witness. func (s *Module) verifyWitness(r *state.MPTRoot) error { s.mtx.Lock() h := s.getKeyCacheForHeight(r.Index).validatorsHash s.mtx.Unlock() - _, err := s.verifier(h, r, &r.Witness[0], maxVerificationGAS) + _, err := s.verifier(h, r, &r.Witness[0], MaxVerificationGAS) return err } diff --git a/pkg/core/statesync/module.go b/pkg/core/statesync/module.go index 8d068aa796..d2fee38ada 100644 --- a/pkg/core/statesync/module.go +++ b/pkg/core/statesync/module.go @@ -21,6 +21,7 @@ package statesync //go:generate stringer -type=StorageSyncMode import ( + "bytes" "encoding/binary" "encoding/hex" "errors" @@ -32,8 +33,11 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/dao" "github.com/nspcc-dev/neo-go/pkg/core/mpt" "github.com/nspcc-dev/neo-go/pkg/core/native" + "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/stateroot" "github.com/nspcc-dev/neo-go/pkg/core/storage" + "github.com/nspcc-dev/neo-go/pkg/core/transaction" + "github.com/nspcc-dev/neo-go/pkg/crypto/hash" "github.com/nspcc-dev/neo-go/pkg/encoding/bigint" "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/util" @@ -89,16 +93,19 @@ type Ledger interface { GetHeaderHash(uint32) util.Uint256 HeaderHeight() uint32 NativePolicyID() int32 + VerifyWitness(h util.Uint160, c hash.Hashable, w *transaction.Witness, gas int64) (int64, error) } // Module represents state sync module and aimed to gather state-related data to // perform an atomic state jump. type Module struct { - lock sync.RWMutex log *zap.Logger mode StorageSyncMode policyID int32 + // lock protects syncStage and all dependent fields that may be updated concurrently + // via Module callbacks along with the change of the synchronisation stage. + lock sync.RWMutex // syncPoint is the state synchronisation point P we're currently working against. syncPoint uint32 // syncStage is the stage of the sync process. @@ -110,6 +117,9 @@ type Module struct { // lastStoredKey is the last processed storage key in case of ContractStorageBased // state synchronisation. lastStoredKey []byte + // root is the MPT root (optionally with witness attached) at the syncPoint. It + // may be empty in case if the Module is started on already synchronized state. + root state.MPTRoot dao *dao.Simple bc Ledger @@ -161,11 +171,12 @@ func NewModule(bc Ledger, stateMod *stateroot.Module, log *zap.Logger, s *dao.Si // Init initializes state sync module for the current chain's height with given // callback for MPT nodes requests. func (s *Module) Init(currChainHeight uint32) error { - oldStage := s.syncStage s.lock.Lock() + oldStage := s.syncStage defer func() { + newStage := s.syncStage s.lock.Unlock() - if s.syncStage != oldStage { + if newStage != oldStage { s.notifyStageChanged() } }() @@ -229,10 +240,45 @@ func (s *Module) Init(currChainHeight uint32) error { return s.defineSyncStage() } -// SetOnStageChanged sets callback that is triggered whenever the sync stage changes. -func (s *Module) SetOnStageChanged(cb func()) { +// InitContractStorageSync prepares Module for contract storage items +// synchronization, so that it's possible to use the AddContractStorageItems +// callback afterwards. +func (s *Module) InitContractStorageSync(root state.MPTRoot) error { s.lock.Lock() defer s.lock.Unlock() + + if root.Index != s.syncPoint { + return fmt.Errorf("invalid sync height: expected %d, got %d", s.syncPoint, root.Index) + } + + if s.bc.GetConfig().StateRootInHeader { + header, err := s.bc.GetHeader(s.bc.GetHeaderHash(s.syncPoint + 1)) + if err != nil { + return fmt.Errorf("failed to get header to check state root: %w", err) + } + if !header.PrevStateRoot.Equals(root.Root) { + return fmt.Errorf("state root mismatch: %s != %s", header.PrevStateRoot.StringLE(), root.Root.StringLE()) + } + } else if len(root.Witness) > 0 { + header, err := s.bc.GetHeader(s.bc.GetHeaderHash(s.syncPoint)) + if err != nil { + return fmt.Errorf("failed to get header to check state root: %w", err) + } + if !bytes.Equal(header.Script.VerificationScript, root.Witness[0].VerificationScript) { + return fmt.Errorf("state root witness mismatch: %s != %s", hex.EncodeToString(header.Script.VerificationScript), hex.EncodeToString(root.Witness[0].VerificationScript)) + } + } + + // If s.root is extracted from a checkpoint, check that the provided root matches the stored one. + if !s.root.Root.Equals(util.Uint256{}) && (s.root.Index != root.Index || !s.root.Root.Equals(root.Root)) { + return fmt.Errorf("invalid state root: checkpoint holds %d/%s, got %d/%s", s.root.Index, s.root.Root.StringLE(), root.Index, root.Root.StringLE()) + } + s.root = root + return nil +} + +// SetOnStageChanged sets callback that is triggered whenever the sync stage changes. +func (s *Module) SetOnStageChanged(cb func()) { s.stageChangedCallback = cb } @@ -321,14 +367,23 @@ func (s *Module) defineSyncStage() error { return fmt.Errorf("failed to load checkpoint: %w", err) } } else { - s.localTrie = mpt.NewTrie(mpt.NewHashNode(ckpt.MPTRoot), mode, s.dao.Store) + s.localTrie = mpt.NewTrie(mpt.NewHashNode(ckpt.IntermediateRoot), mode, s.dao.Store) s.lastStoredKey = ckpt.LastStoredKey + var w []transaction.Witness + if len(ckpt.Witness.VerificationScript) > 0 { + w = []transaction.Witness{ckpt.Witness} + } + s.root = state.MPTRoot{ + Index: s.syncPoint, + Root: ckpt.Root, + Witness: w, + } - if ckpt.IsMPTSynced { + if ckpt.Root.Equals(ckpt.IntermediateRoot) { s.syncStage |= mptSynced s.log.Info("MPT and contract storage are in sync", zap.Uint32("syncPoint", s.syncPoint), - zap.String("stateRoot", ckpt.MPTRoot.StringLE())) + zap.String("stateRoot", ckpt.IntermediateRoot.StringLE())) } } } @@ -400,11 +455,12 @@ func (s *Module) getLatestSavedBlock(p uint32) uint32 { // AddHeaders validates and adds specified headers to the chain. func (s *Module) AddHeaders(hdrs ...*block.Header) error { - oldStage := s.syncStage s.lock.Lock() + oldStage := s.syncStage defer func() { + newStage := s.syncStage s.lock.Unlock() - if s.syncStage != oldStage { + if newStage != oldStage { s.notifyStageChanged() } }() @@ -429,14 +485,15 @@ func (s *Module) AddHeaders(hdrs ...*block.Header) error { // AddBlock verifies and saves block skipping executable scripts. func (s *Module) AddBlock(block *block.Block) error { - oldStage := s.syncStage s.lock.Lock() + oldStage := s.syncStage defer func() { - if s.syncStage != oldStage { + newStage := s.syncStage + s.lock.Unlock() + if newStage != oldStage { s.notifyStageChanged() } }() - defer s.lock.Unlock() if s.syncStage&headersSynced == 0 || s.syncStage&mptSynced == 0 || s.syncStage&blocksSynced != 0 { return nil @@ -499,14 +556,16 @@ func (s *Module) AddMPTNodes(nodes [][]byte) error { if s.mode == ContractStorageBased { panic("MPT nodes are not expected in storage-based sync mode") } - oldStage := s.syncStage + s.lock.Lock() + oldStage := s.syncStage defer func() { - if s.syncStage != oldStage { + newStage := s.syncStage + s.lock.Unlock() + if newStage != oldStage { s.notifyStageChanged() } }() - defer s.lock.Unlock() if s.syncStage&headersSynced == 0 || s.syncStage&mptSynced != 0 { return fmt.Errorf("MPT nodes were not requested: current state sync stage is %d", s.syncStage) @@ -540,25 +599,24 @@ func (s *Module) AddMPTNodes(nodes [][]byte) error { } // AddContractStorageItems adds a batch of key-value pairs for storage-based sync. -func (s *Module) AddContractStorageItems(kvs []storage.KeyValue, syncHeight uint32, expectedRoot util.Uint256) error { +func (s *Module) AddContractStorageItems(kvs []storage.KeyValue) error { if s.mode == MPTBased { panic("contract storage items are not expected in MPT-based mode") } - oldStage := s.syncStage + s.lock.Lock() + oldStage := s.syncStage defer func() { - if s.syncStage != oldStage { + newStage := s.syncStage + s.lock.Unlock() + if newStage != oldStage { s.notifyStageChanged() } }() - defer s.lock.Unlock() - if s.syncStage&headersSynced == 0 || s.syncStage&mptSynced != 0 || expectedRoot.Equals(s.localTrie.StateRoot()) { + if s.syncStage&headersSynced == 0 || s.syncStage&mptSynced != 0 { return errors.New("contract storage items were not requested") } - if syncHeight != s.syncPoint { - return fmt.Errorf("invalid sync height: expected %d, got %d", s.syncPoint, syncHeight) - } if len(kvs) == 0 { return fmt.Errorf("key-value pairs are empty") } @@ -573,33 +631,29 @@ func (s *Module) AddContractStorageItems(kvs []storage.KeyValue, syncHeight uint _ = s.dao.Store.PutChangeSet(nil, batch) mptBatch := mpt.MapToMPTBatch(batch) if _, err := s.localTrie.PutBatch(mptBatch); err != nil { - return fmt.Errorf("failed to apply MPT batch at height %d: %w", syncHeight, err) + return fmt.Errorf("failed to apply MPT batch at %d: %w", s.syncPoint, err) } - s.localTrie.Flush(syncHeight) + s.localTrie.Flush(s.syncPoint) s.lastStoredKey = kvs[len(kvs)-1].Key computedRoot := s.localTrie.StateRoot() + w := transaction.Witness{} + if len(s.root.Witness) > 0 { + w = s.root.Witness[0] + } ckpt := dao.StateSyncCheckpoint{ - MPTRoot: s.localTrie.StateRoot(), - LastStoredKey: kvs[len(kvs)-1].Key, - IsMPTSynced: computedRoot.Equals(expectedRoot), + IntermediateRoot: computedRoot, + Root: s.root.Root, + LastStoredKey: kvs[len(kvs)-1].Key, + Witness: w, } s.dao.PutStateSyncCheckpoint(ckpt) if _, err := s.dao.Store.PersistSync(); err != nil { return fmt.Errorf("failed to persist checkpoint metadata: %w", err) } - if !computedRoot.Equals(expectedRoot) { + if !computedRoot.Equals(s.root.Root) { return nil } - if s.bc.GetConfig().StateRootInHeader { - header, err := s.bc.GetHeader(s.bc.GetHeaderHash(s.syncPoint + 1)) - if err != nil { - return fmt.Errorf("failed to get header to check state root: %w", err) - } - if !header.PrevStateRoot.Equals(expectedRoot) { - return fmt.Errorf("state root mismatch: %s != %s", header.PrevStateRoot.StringLE(), expectedRoot.StringLE()) - } - } s.syncStage |= mptSynced s.blockHeight = s.getLatestSavedBlock(s.syncPoint) s.log.Info("MPT and contract storage are in sync", @@ -774,3 +828,8 @@ func (s *Module) GetStateSyncPoint() uint32 { return s.syncPoint } + +// VerifyWitness implements [network.StateSync] interface. +func (s *Module) VerifyWitness(h util.Uint160, c hash.Hashable, w *transaction.Witness, gas int64) (int64, error) { + return s.bc.VerifyWitness(h, c, w, gas) +} diff --git a/pkg/core/statesync/neotest_test.go b/pkg/core/statesync/neotest_test.go index 5d14c4e9bf..fe2f1a372d 100644 --- a/pkg/core/statesync/neotest_test.go +++ b/pkg/core/statesync/neotest_test.go @@ -8,6 +8,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/mpt" + "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/storage" "github.com/nspcc-dev/neo-go/pkg/neotest" "github.com/nspcc-dev/neo-go/pkg/neotest/chain" @@ -108,7 +109,7 @@ func TestStateSyncModule_Init(t *testing.T) { require.False(t, module.NeedStorageData()) }) - check := func(t *testing.T, boltCfg func(c *config.Blockchain), storageEnabled bool) { + check := func(t *testing.T, boltCfg func(c *config.Blockchain), storageItemsSync bool) { bcBolt, validatorsBolt, committeeBolt := chain.NewMultiWithCustomConfig(t, boltCfg) eBolt := neotest.NewExecutor(t, bcBolt, validatorsBolt, committeeBolt) module := bcBolt.GetStateSyncModule() @@ -146,7 +147,7 @@ func TestStateSyncModule_Init(t *testing.T) { require.NoError(t, err) var lastKey []byte // add a few MPT nodes or contract storage items to create DB state where some of the elements are missing - if !storageEnabled { + if !storageItemsSync { unknownNodes := module.GetUnknownMPTNodesBatch(2) require.Equal(t, 1, len(unknownNodes)) require.Equal(t, expectedHeader.PrevStateRoot, unknownNodes[0]) @@ -169,13 +170,14 @@ func TestStateSyncModule_Init(t *testing.T) { } } else { // check AddContractStorageItems parameters - require.ErrorContains(t, module.AddContractStorageItems([]storage.KeyValue{}, stateSyncPoint-3, sroot.Root), "invalid sync height:") - require.ErrorContains(t, module.AddContractStorageItems([]storage.KeyValue{}, stateSyncPoint, sroot.Root), "key-value pairs are empty") + require.ErrorContains(t, module.InitContractStorageSync(state.MPTRoot{Index: stateSyncPoint - 3, Root: sroot.Root}), "invalid sync height:") + require.ErrorContains(t, module.AddContractStorageItems([]storage.KeyValue{}), "key-value pairs are empty") + require.NoError(t, module.InitContractStorageSync(state.MPTRoot{Index: stateSyncPoint, Root: sroot.Root})) var batch []storage.KeyValue sm.SeekStates(sroot.Root, nil, func(k, v []byte) bool { batch = append(batch, storage.KeyValue{Key: k, Value: v}) if len(batch) == 2 { - require.NoError(t, module.AddContractStorageItems(batch, stateSyncPoint, sroot.Root)) + require.NoError(t, module.AddContractStorageItems(batch)) lastKey = batch[len(batch)-1].Key return false // stop seeking } @@ -193,7 +195,7 @@ func TestStateSyncModule_Init(t *testing.T) { require.False(t, module.NeedHeaders()) require.True(t, module.NeedStorageData()) require.False(t, module.NeedBlocks()) - if !storageEnabled { + if !storageItemsSync { unknownNodes := module.GetUnknownMPTNodesBatch(100) require.True(t, len(unknownNodes) > 0) require.NotContains(t, unknownNodes, expectedHeader.PrevStateRoot) @@ -222,6 +224,7 @@ func TestStateSyncModule_Init(t *testing.T) { unknownNodes = module.GetUnknownMPTNodesBatch(2) require.Equal(t, 0, len(unknownNodes)) } else { + require.NoError(t, module.InitContractStorageSync(state.MPTRoot{Index: stateSyncPoint, Root: sroot.Root})) require.Equal(t, lastKey, module.GetLastStoredKey()) var skip bool sm.SeekStates(sroot.Root, nil, func(k, v []byte) bool { @@ -231,10 +234,10 @@ func TestStateSyncModule_Init(t *testing.T) { } return true // skip this key } - require.NoError(t, module.AddContractStorageItems([]storage.KeyValue{{Key: k, Value: v}}, stateSyncPoint, sroot.Root)) + require.NoError(t, module.AddContractStorageItems([]storage.KeyValue{{Key: k, Value: v}})) return true }) - require.ErrorContains(t, module.AddContractStorageItems([]storage.KeyValue{{Key: []byte{1}, Value: []byte{1}}}, stateSyncPoint, sroot.Root), "contract storage items were not requested") + require.ErrorContains(t, module.AddContractStorageItems([]storage.KeyValue{{Key: []byte{1}, Value: []byte{1}}}), "contract storage items were not requested") } // check that module is active and storage data is in sync require.True(t, module.IsActive()) @@ -265,11 +268,11 @@ func TestStateSyncModule_Init(t *testing.T) { require.False(t, module.NeedHeaders()) require.False(t, module.NeedStorageData()) require.True(t, module.NeedBlocks()) - if !storageEnabled { + if !storageItemsSync { unknownNodes := module.GetUnknownMPTNodesBatch(2) require.Equal(t, 0, len(unknownNodes)) } else { - require.ErrorContains(t, module.AddContractStorageItems([]storage.KeyValue{{Key: []byte{1}, Value: []byte{1}}}, stateSyncPoint, sroot.Root), "contract storage items were not requested") + require.ErrorContains(t, module.AddContractStorageItems([]storage.KeyValue{{Key: []byte{1}, Value: []byte{1}}}), "contract storage items were not requested") } require.Equal(t, uint32(stateSyncPoint-stateSyncInterval-1), module.BlockHeight()) @@ -304,11 +307,11 @@ func TestStateSyncModule_Init(t *testing.T) { require.False(t, module.NeedHeaders()) require.False(t, module.NeedStorageData()) require.False(t, module.NeedBlocks()) - if !storageEnabled { + if !storageItemsSync { unknownNodes := module.GetUnknownMPTNodesBatch(1) require.True(t, len(unknownNodes) == 0) } else { - require.ErrorContains(t, module.AddContractStorageItems([]storage.KeyValue{{Key: []byte{1}, Value: []byte{1}}}, stateSyncPoint, sroot.Root), "contract storage items were not requested") + require.ErrorContains(t, module.AddContractStorageItems([]storage.KeyValue{{Key: []byte{1}, Value: []byte{1}}}), "contract storage items were not requested") } require.Equal(t, uint32(0), module.BlockHeight()) // inactive -> 0 require.Equal(t, uint32(stateSyncPoint), bcBolt.BlockHeight()) @@ -319,11 +322,11 @@ func TestStateSyncModule_Init(t *testing.T) { require.False(t, module.IsActive()) require.False(t, module.NeedHeaders()) require.False(t, module.NeedStorageData()) - if !storageEnabled { + if !storageItemsSync { unknownNodes := module.GetUnknownMPTNodesBatch(1) require.True(t, len(unknownNodes) == 0) } else { - require.Error(t, module.AddContractStorageItems([]storage.KeyValue{{Key: []byte{1}, Value: []byte{1}}}, stateSyncPoint, sroot.Root), "contract storage items were not requested") + require.Error(t, module.AddContractStorageItems([]storage.KeyValue{{Key: []byte{1}, Value: []byte{1}}}), "contract storage items were not requested") } require.Equal(t, uint32(0), module.BlockHeight()) // inactive -> 0 require.Equal(t, uint32(stateSyncPoint), bcBolt.BlockHeight()) @@ -336,11 +339,11 @@ func TestStateSyncModule_Init(t *testing.T) { require.False(t, module.IsActive()) require.False(t, module.NeedHeaders()) require.False(t, module.NeedStorageData()) - if !storageEnabled { + if !storageItemsSync { unknownNodes := module.GetUnknownMPTNodesBatch(1) require.True(t, len(unknownNodes) == 0) } else { - require.ErrorContains(t, module.AddContractStorageItems([]storage.KeyValue{{Key: []byte{1}, Value: []byte{1}}}, stateSyncPoint, sroot.Root), "contract storage items were not requested") + require.ErrorContains(t, module.AddContractStorageItems([]storage.KeyValue{{Key: []byte{1}, Value: []byte{1}}}), "contract storage items were not requested") } require.Equal(t, uint32(0), module.BlockHeight()) // inactive -> 0 require.Equal(t, uint32(stateSyncPoint)+1, bcBolt.BlockHeight()) @@ -349,7 +352,6 @@ func TestStateSyncModule_Init(t *testing.T) { t.Run("initialization from headers/blocks/mpt synced stages", func(t *testing.T) { check(t, boltCfg, false) }) - t.Run("initialization from headers/blocks/storage synced stages", func(t *testing.T) { check(t, boltCfgStorage, true) }) @@ -439,12 +441,12 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) { }) }) t.Run("error: add ContractStorage items without initialisation", func(t *testing.T) { - require.Error(t, module.AddContractStorageItems([]storage.KeyValue{}, 123, util.Uint256{})) + require.Error(t, module.AddContractStorageItems([]storage.KeyValue{})) }) } else { t.Run("panic: add contract storage items in MPTBased mode", func(t *testing.T) { require.Panics(t, func() { - err := module.AddContractStorageItems([]storage.KeyValue{}, 123, util.Uint256{}) + err := module.AddContractStorageItems([]storage.KeyValue{}) if err != nil { return } @@ -487,18 +489,22 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) { sroot, err := bcSpout.GetStateModule().GetStateRoot(uint32(stateSyncPoint)) require.NoError(t, err) + require.NoError(t, module.InitContractStorageSync(state.MPTRoot{ + Index: uint32(stateSyncPoint), + Root: sroot.Root, + })) var batch []storage.KeyValue sm.SeekStates(sroot.Root, nil, func(k, v []byte) bool { batch = append(batch, storage.KeyValue{Key: k, Value: v}) if len(batch) >= 3 { - err = module.AddContractStorageItems(batch, uint32(stateSyncPoint), sroot.Root) + err = module.AddContractStorageItems(batch) require.NoError(t, err) batch = batch[:0] } return true }) if len(batch) > 0 { - err = module.AddContractStorageItems(batch, uint32(stateSyncPoint), sroot.Root) + err = module.AddContractStorageItems(batch) require.NoError(t, err) } require.NoError(t, err) @@ -702,12 +708,16 @@ func TestStateSyncModule_SetOnStageChanged(t *testing.T) { sm := bcSpout.GetStateModule() sroot, err := sm.GetStateRoot(syncPoint) require.NoError(t, err) + require.NoError(t, module.InitContractStorageSync(state.MPTRoot{ + Index: syncPoint, + Root: sroot.Root, + })) var all []storage.KeyValue sm.SeekStates(sroot.Root, nil, func(k, v []byte) bool { all = append(all, storage.KeyValue{Key: k, Value: v}) return true }) - require.NoError(t, module.AddContractStorageItems(all, syncPoint, sroot.Root)) + require.NoError(t, module.AddContractStorageItems(all)) } require.Equal(t, 3, calls) diff --git a/pkg/core/transaction/witness.go b/pkg/core/transaction/witness.go index ae4f2446c7..a594b38c69 100644 --- a/pkg/core/transaction/witness.go +++ b/pkg/core/transaction/witness.go @@ -48,3 +48,20 @@ func (w Witness) Copy() Witness { VerificationScript: bytes.Clone(w.VerificationScript), } } + +// Bytes returns the serialized Witness bytes. +func (w Witness) Bytes() []byte { + buf := io.NewBufBinWriter() + w.EncodeBinary(buf.BinWriter) + if buf.Err != nil { + return nil + } + return buf.Bytes() +} + +// FromBytes decodes Witness from bytes. +func (w *Witness) FromBytes(b []byte) error { + buf := io.NewBinReaderFromBuf(b) + w.DecodeBinary(buf) + return buf.Err +} diff --git a/pkg/core/transaction/witness_test.go b/pkg/core/transaction/witness_test.go index 1c05beaa82..b239db43ee 100644 --- a/pkg/core/transaction/witness_test.go +++ b/pkg/core/transaction/witness_test.go @@ -35,6 +35,15 @@ func TestWitnessSerDes(t *testing.T) { require.NoError(t, err) require.Error(t, testserdes.DecodeBinary(bin1, exp)) require.Error(t, testserdes.DecodeBinary(bin2, exp)) + + require.NoError(t, testserdes.DecodeBinary(good1.Bytes(), exp)) + require.Equal(t, good1, exp) + require.NoError(t, testserdes.DecodeBinary(good2.Bytes(), exp)) + require.Equal(t, good2, exp) + require.NoError(t, exp.FromBytes(good1.Bytes())) + require.Equal(t, good1, exp) + require.NoError(t, exp.FromBytes(good2.Bytes())) + require.Equal(t, good2, exp) } func TestWitnessCopy(t *testing.T) { diff --git a/pkg/network/server.go b/pkg/network/server.go index 968f9f1e7a..ee6c392d24 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -437,7 +437,7 @@ func (s *Server) stateSyncCallBack() { if needStorage { s.syncHeaderFetcher.Shutdown() if !s.syncStateFetcher.IsShutdown() { - if err := s.syncStateFetcher.Start(); err != nil { + if err := s.syncStateFetcher.Start(s.stateSync.GetStateSyncPoint()); err != nil { s.log.Error("skipping NeoFS Sync StateFetcher", zap.Error(err)) } } @@ -1619,7 +1619,7 @@ func (s *Server) tryInitStateSync() { // Choose the height of the median peer as the current chain's height h = heights[len(heights)/2] } else { - lastStateHeight, err := s.syncStateFetcher.LatestStateObjectHeight() + lastStateHeight, err := s.syncStateFetcher.FindStateObject(0, 0) if err != nil { s.log.Fatal("failed to get the last state index", zap.Uint32("blockHeight", s.chain.BlockHeight()), diff --git a/pkg/network/state_sync.go b/pkg/network/state_sync.go index ad17b96148..fc4b4de9ff 100644 --- a/pkg/network/state_sync.go +++ b/pkg/network/state_sync.go @@ -3,7 +3,10 @@ package network import ( "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core/mpt" + "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/storage" + "github.com/nspcc-dev/neo-go/pkg/core/transaction" + "github.com/nspcc-dev/neo-go/pkg/crypto/hash" "github.com/nspcc-dev/neo-go/pkg/util" ) @@ -11,8 +14,9 @@ import ( type StateSync interface { blockHeaderQueuer AddMPTNodes([][]byte) error - AddContractStorageItems(kvs []storage.KeyValue, syncHeight uint32, expectedRoot util.Uint256) error + AddContractStorageItems(kvs []storage.KeyValue) error Init(currChainHeight uint32) error + InitContractStorageSync(r state.MPTRoot) error IsActive() bool IsInitialized() bool GetUnknownMPTNodesBatch(limit int) []util.Uint256 @@ -24,4 +28,5 @@ type StateSync interface { GetStateSyncPoint() uint32 SetOnStageChanged(func()) Traverse(root util.Uint256, process func(node mpt.Node, nodeBytes []byte) bool) error + VerifyWitness(h util.Uint160, c hash.Hashable, w *transaction.Witness, gas int64) (int64, error) } diff --git a/pkg/services/helpers/neofs/blockstorage.go b/pkg/services/helpers/neofs/blockstorage.go index 3460d7d417..40337a0635 100644 --- a/pkg/services/helpers/neofs/blockstorage.go +++ b/pkg/services/helpers/neofs/blockstorage.go @@ -26,6 +26,10 @@ const ( DefaultBlockAttribute = "Block" // DefaultStateAttribute is the default attribute name for state objects. DefaultStateAttribute = "State" + // DefaultWitnessAttribute is the default attribute name for state object witness. + DefaultWitnessAttribute = "Witness" + // DefaultStateRootAttribute is the default attribute name for the stateroot field of a state object. + DefaultStateRootAttribute = "StateRoot" // DefaultKVBatchSize is a number of contract storage key-value objects to // flush to the node's DB in a batch. DefaultKVBatchSize = 1000 diff --git a/pkg/services/statefetcher/statefetcher.go b/pkg/services/statefetcher/statefetcher.go index b2676a4e73..5602bcadf7 100644 --- a/pkg/services/statefetcher/statefetcher.go +++ b/pkg/services/statefetcher/statefetcher.go @@ -3,6 +3,7 @@ package statefetcher import ( "bytes" "context" + "encoding/hex" "errors" "fmt" "io" @@ -12,7 +13,11 @@ import ( "sync/atomic" "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/core/stateroot" "github.com/nspcc-dev/neo-go/pkg/core/storage" + "github.com/nspcc-dev/neo-go/pkg/core/transaction" + "github.com/nspcc-dev/neo-go/pkg/crypto/hash" gio "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs" "github.com/nspcc-dev/neo-go/pkg/util" @@ -27,14 +32,15 @@ import ( type Ledger interface { GetConfig() config.Blockchain HeaderHeight() uint32 - AddContractStorageItems(kv []storage.KeyValue, syncHeight uint32, expectedRoot util.Uint256) error + AddContractStorageItems(kv []storage.KeyValue) error + InitContractStorageSync(r state.MPTRoot) error GetLastStoredKey() []byte + VerifyWitness(h util.Uint160, c hash.Hashable, w *transaction.Witness, gas int64) (int64, error) } // Service fetches contract storage state from NeoFS. type Service struct { neofs.BasicService - containerMagic int isActive atomic.Bool isShutdown atomic.Bool @@ -42,9 +48,11 @@ type Service struct { cfg config.NeoFSStateFetcher stateSyncInterval uint32 - lock sync.RWMutex - lastStateObjectIndex uint32 - lastStateOID oid.ID + // lock protects stateRoot and stateObjectID from concurrent update by + // network server. + lock sync.RWMutex + stateRoot state.MPTRoot + stateObjectID oid.ID chain Ledger log *zap.Logger @@ -113,70 +121,114 @@ func New(chain Ledger, cfg config.NeoFSStateFetcher, stateSyncInterval int, logg s.isActive.CompareAndSwap(true, false) return nil, fmt.Errorf("container magic mismatch: expected %d, got %s", s.chain.GetConfig().Magic, containerMagic) } - s.containerMagic, err = strconv.Atoi(containerMagic) return s, nil } -// LatestStateObjectHeight returns the height of the most recent state object found in the container. -func (s *Service) LatestStateObjectHeight(h ...uint32) (uint32, error) { +// FindStateObject finds and caches the most recent state object within +// [minH, maxH] height range. If max is 0, the latest available object is +// returned. If the cached state object doesn't match the search criteria, a new +// search will be initiated followed by the Service's cache update. The object +// height is returned. +func (s *Service) FindStateObject(minH uint32, maxH uint32) (uint32, error) { + if maxH > 0 && maxH < minH { + // Program bug. + panic(fmt.Errorf("max height must be greater than min height, got %d/%d", minH, maxH)) + } + s.lock.RLock() - if s.lastStateObjectIndex != 0 { - idx := s.lastStateObjectIndex + if s.stateRoot.Index != 0 && s.stateRoot.Index >= minH && (maxH == 0 || s.stateRoot.Index <= maxH) { + h := s.stateRoot.Index s.lock.RUnlock() - return idx, nil + return h, nil } s.lock.RUnlock() - var height uint32 - if len(h) > 0 { - height = h[0] - } + var ( + obj *client.SearchResultItem + r = new(state.MPTRoot) + err error + ) filters := object.NewSearchFilters() - filters.AddFilter(s.cfg.StateAttribute, fmt.Sprintf("%d", height), object.MatchNumGE) + filters.AddFilter(s.cfg.StateAttribute, fmt.Sprintf("%d", minH), object.MatchNumGE) + if maxH > 0 { + filters.AddFilter(s.cfg.StateAttribute, fmt.Sprintf("%d", maxH), object.MatchNumLE) + } ctx, cancel := context.WithTimeout(s.Ctx, s.cfg.Timeout) defer cancel() - results, errs := neofs.ObjectSearch(ctx, s.Pool, s.Account.PrivateKey(), s.ContainerID, filters, []string{s.cfg.StateAttribute}) - - var ( - lastItem *client.SearchResultItem - lastFoundIdx uint64 - ) - + results, errs := neofs.ObjectSearch(ctx, s.Pool, s.Account.PrivateKey(), s.ContainerID, filters, []string{s.cfg.StateAttribute, neofs.DefaultStateRootAttribute, neofs.DefaultWitnessAttribute}) loop: for { select { - case item, ok := <-results: + case res, ok := <-results: if !ok { break loop } - lastItem = &item + obj = &res - case err := <-errs: + case err = <-errs: if err != nil && !neofs.IsContextCanceledErr(err) { s.isActive.CompareAndSwap(true, false) - return 0, fmt.Errorf("failed to search state object at height %d: %w", height, err) + return 0, fmt.Errorf("failed to search state object: %w", err) } break loop } } - lastFoundIdx, err := strconv.ParseUint(lastItem.Attributes[0], 10, 32) - if err != nil || lastFoundIdx == 0 { + if len(obj.Attributes) < 2 { + s.isActive.CompareAndSwap(true, false) + return 0, fmt.Errorf("missing state object attributes: expected at least 2, got %d", len(obj.Attributes)) + } + h, err := strconv.ParseUint(obj.Attributes[0], 10, 32) + if err != nil || h == 0 { + s.isActive.CompareAndSwap(true, false) + return 0, fmt.Errorf("invalid state object index: %w", err) + } + r.Index = uint32(h) + + r.Root, err = util.Uint256DecodeStringLE(obj.Attributes[1]) + if err != nil { s.isActive.CompareAndSwap(true, false) - return 0, fmt.Errorf("failed to parse state object index: %w", err) + return 0, fmt.Errorf("failed to decode state root from state object attribute: %w", err) } + if len(obj.Attributes) > 2 { + b, err := hex.DecodeString(obj.Attributes[2]) + if err != nil { + s.isActive.CompareAndSwap(true, false) + return 0, fmt.Errorf("failed to decode state root witness from hex: %w", err) + } + w := new(transaction.Witness) + err = w.FromBytes(b) + if err != nil { + s.isActive.CompareAndSwap(true, false) + return 0, fmt.Errorf("failed to decode state root witness: %w", err) + } + r.Witness = []transaction.Witness{*w} + _, err = s.chain.VerifyWitness(hash.Hash160(w.VerificationScript), r, w, stateroot.MaxVerificationGAS) + if err != nil { + s.isActive.CompareAndSwap(true, false) + return 0, fmt.Errorf("state root witness check failed: %w", err) + } + } + + s.log.Info("new state object found", + zap.Stringer("oid", obj.ID), + zap.Uint32("height", r.Index), + zap.String("root", r.Root.StringLE()), + zap.Bool("witnessed", len(r.Witness) > 0)) + s.lock.Lock() - s.lastStateObjectIndex = uint32(lastFoundIdx) - s.lastStateOID = lastItem.ID - s.lock.Unlock() + defer s.lock.Unlock() + s.stateRoot = *r + s.stateObjectID = obj.ID - return s.lastStateObjectIndex, nil + return s.stateRoot.Index, nil } -// Start begins state fetching. -func (s *Service) Start() error { +// Start starts state fetcher service. If syncPoint is 0, the latest available +// state object will be fetched. +func (s *Service) Start(syncPoint uint32) error { if s.IsShutdown() { return errors.New("service is already shut down") } @@ -185,7 +237,7 @@ func (s *Service) Start() error { } s.log.Info("starting NeoFS StateFetcher service") go s.exiter() - go s.run() + go s.run(syncPoint) return nil } @@ -236,62 +288,95 @@ func (s *Service) exiter() { close(s.exiterToShutdown) } -func (s *Service) run() { +func (s *Service) run(syncPoint uint32) { defer close(s.runToExiter) + // Ensure the proper state object found before accessing its OID since the + // node may be recovering after previously interrupted state sync at some + // older state sync point. + _, err := s.FindStateObject(s.chain.HeaderHeight()-1, syncPoint) + if err != nil { + s.log.Error("failed to find the latest state object", + zap.Uint32("minHeight", s.chain.HeaderHeight()-1), + zap.Uint32("maxHeight", syncPoint), + zap.Error(err)) + s.stopService(true) + return + } + var ( - syncHeight uint32 - expectedRoot util.Uint256 + oid oid.ID + r state.MPTRoot ) - - s.lock.RLock() - isZero := s.lastStateOID.IsZero() - s.lock.RUnlock() - if isZero { - _, err := s.LatestStateObjectHeight(s.chain.HeaderHeight() - 1) - if err != nil { - s.log.Error("failed to get state object", zap.Error(err)) - s.stopService(true) - return - } - } s.lock.RLock() - oidStr := s.lastStateOID.String() + r = s.stateRoot + oid = s.stateObjectID s.lock.RUnlock() - reader, err := s.objectGet(s.Ctx, oidStr) + + reader, err := s.objectGet(s.Ctx, oid) if err != nil { - s.log.Error("failed to get state object", zap.Error(err), zap.String("oid", s.lastStateOID.String())) + s.log.Error("failed to get state object", + zap.Uint32("height", r.Index), + zap.Stringer("oid", oid), + zap.Error(err)) s.stopService(true) return } defer func() { if err = reader.Close(); err != nil { - s.log.Warn("failed to close reader", zap.Error(err)) + s.log.Warn("failed to close state object reader", zap.Error(err)) } }() + + br := gio.NewBinReaderFromIO(reader) + version := br.ReadB() + if version != 0 || br.Err != nil { + s.log.Error("invalid state object version", + zap.Uint32("expected", 0), + zap.Uint8("actual", version), + zap.Error(br.Err)) + return + } + magic := br.ReadU32LE() + if magic != uint32(s.chain.GetConfig().Magic) || br.Err != nil { + s.log.Error("invalid state object magic", + zap.Uint32("expected", uint32(s.chain.GetConfig().Magic)), + zap.Uint32("actual", magic), + zap.Error(br.Err)) + return + } + h := br.ReadU32LE() + if h != r.Index || br.Err != nil { + s.log.Error("invalid state object height", + zap.Uint32("expected", r.Index), + zap.Uint32("actual", h), + zap.Error(br.Err)) + return + } + root := util.Uint256{} + br.ReadBytes(root[:]) + if !root.Equals(r.Root) || br.Err != nil { + s.log.Error("invalid state object root hash", + zap.String("expected", r.Root.StringLE()), + zap.String("actual", root.StringLE()), + zap.Error(br.Err)) + return + } + + s.log.Info("initializing contract storage sync", + zap.Uint32("height", s.stateRoot.Index), + zap.String("root", s.stateRoot.Root.StringLE()), + zap.Bool("witnessed", len(r.Witness) > 0)) + err = s.chain.InitContractStorageSync(r) + if err != nil { + s.log.Error("failed to initialize contract storage sync", zap.Error(err)) + return + } + batches := make(chan []storage.KeyValue, 2) go func() { defer close(batches) - br := gio.NewBinReaderFromIO(reader) - version := br.ReadB() - if version != 0 || br.Err != nil { - s.log.Error("invalid state object version", zap.Uint8("version", version), zap.Error(br.Err)) - return - } - magic := br.ReadU32LE() - if magic != uint32(s.containerMagic) || br.Err != nil { - s.log.Error("invalid state object magic", zap.Uint32("magic", magic)) - return - } - syncHeight = br.ReadU32LE() - br.ReadBytes(expectedRoot[:]) - if br.Err != nil { - s.log.Error("failed to read state root", zap.Error(br.Err)) - return - } - s.log.Info("contract storage state object found", zap.String("root", expectedRoot.StringLE()), zap.Uint32("height", syncHeight)) - var ( lastKey = s.chain.GetLastStoredKey() skip = len(lastKey) > 0 @@ -306,6 +391,10 @@ func (s *Service) run() { } key := br.ReadVarBytes() + // TODO: @roman-khimov, we don't include len(KVs) in the state object, + // so the only indicator of state object end is EOF. I suggest + // to change the format of state object files and include + // len(KVs) to the header. Reuploading is required. if errors.Is(br.Err, io.EOF) { // Flush remainder. if len(batch) > 0 { @@ -352,7 +441,7 @@ func (s *Service) run() { if len(batch) == 0 { continue } - if err = s.chain.AddContractStorageItems(batch, syncHeight, expectedRoot); err != nil { + if err = s.chain.AddContractStorageItems(batch); err != nil { s.log.Error("failed to add storage batch", zap.Error(err)) s.stopService(true) return @@ -361,7 +450,7 @@ func (s *Service) run() { } } -func (s *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, error) { +func (s *Service) objectGet(ctx context.Context, oid oid.ID) (io.ReadCloser, error) { u, err := url.Parse(fmt.Sprintf("%s:%s/%s", neofs.URIScheme, s.cfg.ContainerID, oid)) if err != nil { return nil, err diff --git a/pkg/services/statefetcher/statefetcher_test.go b/pkg/services/statefetcher/statefetcher_test.go index 51e3ee0350..cca6bbc422 100644 --- a/pkg/services/statefetcher/statefetcher_test.go +++ b/pkg/services/statefetcher/statefetcher_test.go @@ -4,7 +4,10 @@ import ( "testing" "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/storage" + "github.com/nspcc-dev/neo-go/pkg/core/transaction" + "github.com/nspcc-dev/neo-go/pkg/crypto/hash" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -21,10 +24,18 @@ func (m *mockLedger) GetConfig() config.Blockchain { return config.Blockchain{} func (m *mockLedger) GetLastStoredKey() []byte { return m.lastStoredKey } -func (m *mockLedger) AddContractStorageItems(kvs []storage.KeyValue, syncHeight uint32, expectedRoot util.Uint256) error { +func (m *mockLedger) AddContractStorageItems(kvs []storage.KeyValue) error { return nil } +func (m *mockLedger) InitContractStorageSync(r state.MPTRoot) error { + return nil +} + +func (m *mockLedger) VerifyWitness(h util.Uint160, c hash.Hashable, w *transaction.Witness, gas int64) (int64, error) { + return 0, nil +} + func TestServiceConstructor(t *testing.T) { logger := zap.NewNop() ledger := &mockLedger{height: 100}