Skip to content

Commit

Permalink
Less logs, verification fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
pnowosie committed Sep 30, 2024
1 parent 61a20c7 commit c384cda
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 66 deletions.
17 changes: 1 addition & 16 deletions p2p/snap_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,6 @@ func (b *snapServer) GetClassRange(request *spec.ClassRangeRequest) (iter.Seq[pr
}

stateRoot := p2p2core.AdaptHash(request.Root)
startAddr := p2p2core.AdaptHash(request.Start)
b.log.Debugw("GetClassRange", "start", startAddr, "chunks", request.ChunksPerProof)

return func(yield yieldFunc) {
s, err := b.blockchain.GetStateForStateRoot(stateRoot)
Expand Down Expand Up @@ -171,7 +169,7 @@ func (b *snapServer) GetClassRange(request *spec.ClassRangeRequest) (iter.Seq[pr
}

yield(finMsg)
b.log.Infow("GetClassRange iteration completed")
b.log.Infow("class range iteration completed")
}, nil
}

Expand All @@ -180,8 +178,6 @@ func (b *snapServer) GetContractRange(request *spec.ContractRangeRequest) (iter.
Responses: &spec.ContractRangeResponse_Fin{},
}
stateRoot := p2p2core.AdaptHash(request.StateRoot)
startAddr := p2p2core.AdaptAddress(request.Start)
b.log.Debugw("GetContractRange", "root", stateRoot, "start", startAddr, "chunks", request.ChunksPerProof)

return func(yield yieldFunc) {
s, err := b.blockchain.GetStateForStateRoot(stateRoot)
Expand Down Expand Up @@ -256,12 +252,6 @@ func (b *snapServer) GetContractRange(request *spec.ContractRangeRequest) (iter.
},
}

var first, last *felt.Felt
if len(states) > 0 {
first = p2p2core.AdaptAddress(states[0].Address)
last = p2p2core.AdaptAddress(states[len(states)-1].Address)
}
b.log.Infow("sending contract range response", "len(states)", len(states), "first", first, "last", last)
if !yield(cntrMsg) {
// we should not send `FinMsg` when the client explicitly asks to stop
return
Expand All @@ -282,10 +272,6 @@ func (b *snapServer) GetStorageRange(request *spec.ContractStorageRequest) (iter
var finMsg proto.Message = &spec.ContractStorageResponse{
Responses: &spec.ContractStorageResponse_Fin{},
}
startKey := p2p2core.AdaptAddress(request.Query[0].Address)
last := len(request.Query) - 1
endKey := p2p2core.AdaptAddress(request.Query[last].Address)
b.log.Debugw("GetStorageRange", "query[0]", startKey, "query[", last, "]", endKey)

return func(yield yieldFunc) {
stateRoot := p2p2core.AdaptHash(request.StateRoot)
Expand Down Expand Up @@ -349,7 +335,6 @@ func (b *snapServer) GetClasses(request *spec.ClassHashesRequest) (iter.Seq[prot
var finMsg proto.Message = &spec.ClassesResponse{
ClassMessage: &spec.ClassesResponse_Fin{},
}
b.log.Debugw("GetClasses", "len(hashes)", len(request.ClassHashes))

return func(yield yieldFunc) {
felts := make([]*felt.Felt, len(request.ClassHashes))
Expand Down
18 changes: 18 additions & 0 deletions p2p/snap_server_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package p2p

import (
"context"
"fmt"
"github.com/NethermindEth/juno/core"
"github.com/NethermindEth/juno/core/crypto"
Expand Down Expand Up @@ -752,6 +753,23 @@ func TestGetContractStorageRoot(t *testing.T) {
}
}

func TestReadAndVerifySnapshot(t *testing.T) {
var d db.DB
t.Skip("DB snapshot is needed for this test")
d, _ = pebble.NewWithOptions("/Users/pnowosie/juno/snapshots/node1", 128000000, 128, false)
defer func() { _ = d.Close() }()
bc := blockchain.New(d, &utils.Sepolia)

logger, _ := utils.NewZapLogger(utils.DEBUG, false)
syncer := SnapSyncer{
log: logger,
blockchain: bc,
}

err := syncer.PhraseVerify(context.Background())
assert.NoError(t, err)
}

func TestPercentageCalculation(t *testing.T) {
tests := []struct {
actual *felt.Felt
Expand Down
88 changes: 38 additions & 50 deletions p2p/snap_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,12 @@ func (s *SnapSyncer) Run(ctx context.Context) error {
if err != nil {
return err
}

s.log.Infow("phase 1 completed")

if err = s.PhraseVerify(ctx); err != nil {
return err
}
s.log.Infow("trie roots verification completed")

s.log.Infow("delegating to standard synchronizer")

Expand Down Expand Up @@ -286,7 +287,31 @@ func (s *SnapSyncer) runPhase1(ctx context.Context) error {
}

func (s *SnapSyncer) PhraseVerify(ctx context.Context) error {
// 1. Get the correct tries roots (again)
// 1. Get the actual class & contract trie roots
st, closer, err := s.blockchain.(*blockchain.Blockchain).HeadStateFreakingState()
defer func() { _ = closer() }()
if err != nil {
s.log.Errorw("error getting state for state root", "err", err)
return err
}
contractRoot, classRoot, err := st.StateAndClassRoot()
if err != nil {
s.log.Errorw("error getting contract and class root", "err", err)
return err
}

// 2. Verify the global state root
err = VerifyGlobalStateRoot(s.currentGlobalStateRoot, classRoot, contractRoot)
if err == nil {
// all good no need for additional verification
return nil
}

if err != nil {
s.log.Errorw("global state root verification failure", "err", err)
}

// 3. Get the correct tries roots from the client
iter, err := s.client.RequestContractRange(ctx, &spec.ContractRangeRequest{
StateRoot: core2p2p.AdaptHash(s.currentGlobalStateRoot),
Start: core2p2p.AdaptAddress(&felt.Zero),
Expand All @@ -297,49 +322,27 @@ func (s *SnapSyncer) PhraseVerify(ctx context.Context) error {
return err
}

var classRoot, contractRoot *felt.Felt
var classR, contractR *felt.Felt
iter(func(response *spec.ContractRangeResponse) bool {
if _, ok := response.GetResponses().(*spec.ContractRangeResponse_Range); ok {
classRoot = p2p2core.AdaptHash(response.ClassesRoot)
contractRoot = p2p2core.AdaptHash(response.ContractsRoot)
classR = p2p2core.AdaptHash(response.ClassesRoot)
contractR = p2p2core.AdaptHash(response.ContractsRoot)
} else {
s.log.Errorw("unexpected response", "response", response)
}

return false
})
if classRoot == nil || contractRoot == nil {
if classR == nil || contractR == nil {
s.log.Errorw("cannot obtain the trie roots from client response")
return errors.New("cannot obtain the trie roots")
}

// 2. Verify the global state root
if err = VerifyGlobalStateRoot(s.currentGlobalStateRoot, classRoot, contractRoot); err != nil {
s.log.Errorw("global state root verification failure", "err", err)
return err
}

// 3. Verify the class & contract trie roots
st, err := s.blockchain.(*blockchain.Blockchain).GetStateForStateRoot(s.currentGlobalStateRoot)
if err != nil {
s.log.Errorw("error getting state for state root", "err", err)
return err
}
ctrtRoot, clsRoot, err := st.StateAndClassRoot()
if err != nil {
s.log.Errorw("error getting contract and class root", "err", err)
return err
}
// 4. Log which one is incorrect
s.log.Infow("Contract trie root", "expected", contractR, "actual", contractRoot)
s.log.Infow("Class trie root", "expected", classR, "actual", classRoot)

if !classRoot.Equal(clsRoot) {
s.log.Errorw("class root mismatch", "got", clsRoot, "expected", classRoot)
}

if !contractRoot.Equal(ctrtRoot) {
s.log.Errorw("contract root mismatch", "got", ctrtRoot, "expected", contractRoot)
}

return nil
return errors.New("trie roots verification failed")
}

func (s *SnapSyncer) getNextStartingBlock(ctx context.Context) (*core.Block, error) {
Expand Down Expand Up @@ -440,7 +443,6 @@ func (s *SnapSyncer) runClassRangeWorker(ctx context.Context) error {
case *spec.ClassRangeResponse_Classes:
classes = v.Classes.Classes
case *spec.ClassRangeResponse_Fin:
s.log.Infow("[finMsg] class range completed")
break ResponseIter
default:
s.log.Warnw("Unexpected class range message", "GetResponses", v)
Expand Down Expand Up @@ -491,16 +493,15 @@ func (s *SnapSyncer) runClassRangeWorker(ctx context.Context) error {

err = egrp.Wait()
if err != nil {
s.log.Infow("class range adaptation failure", "err", err)
s.log.Errorw("class range adaptation failure", "err", err)
return err
}
s.log.Infow("class range adaptation completed", "classes", len(classes))

proofs := P2pProofToTrieProofs(response.RangeProof)
hasNext, err := VerifyTrie(classRoot, paths, values, proofs, core.GlobalTrieHeight, crypto.Poseidon)
if err != nil {
// TODO: Ban peer
s.log.Infow("trie verification failed", "err", err)
s.log.Errorw("trie verification failed", "err", err)
return err
}

Expand All @@ -518,7 +519,6 @@ func (s *SnapSyncer) runClassRangeWorker(ctx context.Context) error {
panic(err)
}
totalAdded += len(classes)
s.log.Infow("class range added classes into state", "classes", len(classes), "total", totalAdded)

if !hasNext {
s.log.Infow("class range completed", "totalClass", totalAdded)
Expand Down Expand Up @@ -579,7 +579,6 @@ func (s *SnapSyncer) runFetchClassWorker(ctx context.Context, workerIdx int) err
atomic.AddInt32(&s.classFetchJobCount, -1)
}
}
s.log.Infow("class fetch job completes batch", "asked keys", len(keyBatches), "worker", workerIdx, "pending", s.classFetchJobCount)

var hashes []*spec.Hash
for _, key := range keyBatches {
Expand Down Expand Up @@ -607,7 +606,6 @@ func (s *SnapSyncer) runFetchClassWorker(ctx context.Context, workerIdx int) err
case *spec.ClassesResponse_Class:
classes = append(classes, v.Class)
case *spec.ClassesResponse_Fin:
s.log.Infow("[FinMsg] class batch completed", "classes", len(classes), "worker", workerIdx)
break ResponseIter
default:
s.log.Warnw("Unexpected ClassMessage from getClasses", "v", v)
Expand Down Expand Up @@ -649,7 +647,6 @@ func (s *SnapSyncer) runFetchClassWorker(ctx context.Context, workerIdx int) err
s.log.Errorw("error storing class", "err", err)
return err
}
s.log.Infow("class fetch job added classes into state", "classes", len(newClasses), "worker", workerIdx)
} else {
s.log.Errorw("Unable to fetch any class from peer")
// TODO: Penalise peer?
Expand All @@ -663,7 +660,6 @@ func (s *SnapSyncer) runFetchClassWorker(ctx context.Context, workerIdx int) err
}

keyBatches = newBatch
s.log.Infow("class fetch job completed batch", "processed", len(processedClasses), "unprocessed", len(newBatch), "worker", workerIdx)
}
}

Expand Down Expand Up @@ -701,7 +697,6 @@ func (s *SnapSyncer) runContractRangeWorker(ctx context.Context) error {
case *spec.ContractRangeResponse_Range:
crange = v.Range
case *spec.ContractRangeResponse_Fin:
s.log.Infow("[finMsg] contract range completed", "totalAdded", totalAdded)
break ResponseIter
default:
s.log.Warnw("Unexpected contract range message", "GetResponses", v)
Expand Down Expand Up @@ -742,10 +737,9 @@ func (s *SnapSyncer) runContractRangeWorker(ctx context.Context) error {
hasNext, err := VerifyTrie(contractRoot, paths, values, proofs, core.GlobalTrieHeight, crypto.Pedersen)
if err != nil {
// The peer should get penalised in this case
s.log.Infow("trie verification failed", "err", err)
s.log.Errorw("trie verification failed", "err", err)
return err
}
s.log.Infow("contract range adaptation completed", "hasNext", hasNext, "states", len(paths), "totalAdded", totalAdded)

classes := []*felt.Felt{}
nonces := []*felt.Felt{}
Expand All @@ -761,7 +755,6 @@ func (s *SnapSyncer) runContractRangeWorker(ctx context.Context) error {
panic(err)
}
totalAdded += len(paths)
s.log.Infow("contract range added contracts into state", "contracts", len(paths), "totalAdded", totalAdded)

// We don't actually store it directly here... only put it as part of job.
// Can't remember why. Could be because it would be some wasted work.
Expand Down Expand Up @@ -888,9 +881,6 @@ func (s *SnapSyncer) runStorageRangeWorker(ctx context.Context, workerIdx int) e
case *spec.ContractStorageResponse_Storage:
csto = v.Storage
case *spec.ContractStorageResponse_Fin:
s.log.Infow("[FinMsg] storage range completed",
"totalPath", totalPath, "worker", workerIdx, "jobs", processedJobs.jobIdx,
"pending", s.storageRangeJobCount, "contract", processedJobs.jobAddr)
break ResponseIter
default:
s.log.Warnw("Unexpected storage range message", "GetResponses", v)
Expand Down Expand Up @@ -1072,13 +1062,11 @@ func (s *SnapSyncer) runStorageRefreshWorker(ctx context.Context) error {
case *spec.ContractRangeResponse_Range:
crange = v.Range
case *spec.ContractRangeResponse_Fin:
s.log.Infow("[finMsg] contract range [storage refresh] completed")
break ResponseIter
default:
s.log.Warnw("Unexpected contract range message [storage refresh]", "GetResponses", v)
continue
}
s.log.Infow("storage refresh worker received response", "states", len(crange.State))

if crange == nil || crange.State == nil {
s.log.Errorw("contract range [storage refresh] respond with nil state")
Expand Down

0 comments on commit c384cda

Please sign in to comment.