From e53b7711981439133f8fe51cbe25bf439edc33a8 Mon Sep 17 00:00:00 2001 From: Pratik Patil Date: Fri, 10 Oct 2025 16:42:27 +0530 Subject: [PATCH 01/22] core: implemented witness state caching --- core/blockchain.go | 104 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) diff --git a/core/blockchain.go b/core/blockchain.go index c960067fc9..7e8c1db5eb 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -56,6 +56,7 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie/trienode" "github.com/ethereum/go-ethereum/triedb" "github.com/ethereum/go-ethereum/triedb/hashdb" "github.com/ethereum/go-ethereum/triedb/pathdb" @@ -294,6 +295,10 @@ type BlockChain struct { // future blocks are blocks added for later processing futureBlocks *lru.Cache[common.Hash, *types.Block] + // Span state cache for reducing witness bandwidth + spanStateCache *lru.Cache[string, struct{}] // Cache state nodes within current span + currentSpan uint64 // Track current span for boundary detection + wg sync.WaitGroup quit chan struct{} // shutdown signal, closed in Stop. stopping atomic.Bool // false if chain is running, true when stopped @@ -373,6 +378,11 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis engine: engine, vmConfig: vmConfig, + // Initialize span state cache with sufficient capacity for one span + // Typical span has ~100K-500K unique state nodes + spanStateCache: lru.NewCache[string, struct{}](1000000), + currentSpan: 0, // Will be set on first block processing + borReceiptsCache: lru.NewCache[common.Hash, *types.Receipt](receiptsCacheLimit), logger: vmConfig.Tracer, checker: checker, @@ -3657,12 +3667,63 @@ func (bc *BlockChain) SubscribeChain2HeadEvent(ch chan<- Chain2HeadEvent) event. return bc.scope.Track(bc.chain2HeadFeed.Subscribe(ch)) } +// mergeSpanCacheIntoWitness merges cached state nodes from the span cache into the witness. +// This allows reduced witnesses to work by adding previously cached state data. +func (bc *BlockChain) mergeSpanCacheIntoWitness(witness *stateless.Witness) int { + if bc.spanStateCache == nil || witness == nil { + return 0 + } + + mergedCount := 0 + keys := bc.spanStateCache.Keys() + + for _, key := range keys { + stateNode := key + // Check if this state node is missing from the witness + if _, exists := witness.State[stateNode]; !exists { + // Add it from cache + witness.State[stateNode] = struct{}{} + mergedCount++ + } + } + + return mergedCount +} + // ProcessBlockWithWitnesses processes a block in stateless mode using the provided witnesses. func (bc *BlockChain) ProcessBlockWithWitnesses(block *types.Block, witness *stateless.Witness) (*state.StateDB, error) { if witness == nil { return nil, errors.New("nil witness") } + blockNum := block.Number().Uint64() + + // Span boundary detection and cache management + const spanSize uint64 = 6400 // Bor span length + newSpan := blockNum / spanSize + + if bc.spanStateCache != nil && newSpan != bc.currentSpan { + oldCacheSize := bc.spanStateCache.Len() + bc.spanStateCache.Purge() + bc.currentSpan = newSpan + log.Info("Span boundary: flushed state cache", + "block", blockNum, + "oldSpan", bc.currentSpan-1, + "newSpan", newSpan, + "cachedStates", oldCacheSize) + } + + // Merge cached states into witness for blocks that aren't the first in span + witnessStatesBefore := len(witness.State) + mergedCount := bc.mergeSpanCacheIntoWitness(witness) + if mergedCount > 0 { + log.Debug("Merged cached states into witness", + "block", blockNum, + "witnessStatesBefore", witnessStatesBefore, + "mergedFromCache", mergedCount, + "witnessStatesAfter", len(witness.State)) + } + // Validate witness. if err := stateless.ValidateWitnessPreState(witness, bc); err != nil { log.Error("Witness validation failed during stateless processing", "blockNumber", block.Number(), "blockHash", block.Hash(), "err", err) @@ -3697,6 +3758,49 @@ func (bc *BlockChain) ProcessBlockWithWitnesses(block *types.Block, witness *sta err = fmt.Errorf("stateless self-validation receipt root mismatch: remote %x != local %x", block.ReceiptHash(), crossReceiptRoot) return nil, err } + + // Cache witness states and updated states for subsequent blocks in this span + if bc.spanStateCache != nil { + cachedWitnessStates := 0 + cachedUpdatedStates := 0 + + // Cache all witness state nodes + for stateNode := range witness.State { + if _, exists := bc.spanStateCache.Get(stateNode); !exists { + bc.spanStateCache.Add(stateNode, struct{}{}) + cachedWitnessStates++ + } + } + + // Extract and cache updated states from execution + if statedb != nil { + _, stateUpdate, _ := statedb.CommitAndReturnStateUpdate(blockNum, bc.chainConfig.IsEIP158(block.Number())) + if stateUpdate != nil && stateUpdate.Nodes != nil { + nodes := stateUpdate.Nodes + + // Iterate through all node sets + for owner := range nodes.Sets { + subset := nodes.Sets[owner] + subset.ForEachWithOrder(func(path string, n *trienode.Node) { + if !n.IsDeleted() && n.Blob != nil { + stateNode := string(n.Blob) + if _, exists := bc.spanStateCache.Get(stateNode); !exists { + bc.spanStateCache.Add(stateNode, struct{}{}) + cachedUpdatedStates++ + } + } + }) + } + } + } + + log.Debug("Cached states for span", + "block", blockNum, + "cachedWitnessStates", cachedWitnessStates, + "cachedUpdatedStates", cachedUpdatedStates, + "totalCacheSize", bc.spanStateCache.Len()) + } + return statedb, nil } From 3d6673056aee8482c96de936a0d4c11738073653 Mon Sep 17 00:00:00 2001 From: Pratik Patil Date: Fri, 10 Oct 2025 17:32:56 +0530 Subject: [PATCH 02/22] core/state: added helper functions --- core/state/statedb.go | 10 +++++++++- core/state/stateupdate.go | 4 ++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/core/state/statedb.go b/core/state/statedb.go index 77793482d1..359285be1f 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -1767,7 +1767,7 @@ func (s *StateDB) commitAndFlush(block uint64, deleteEmptyObjects bool, noStorag // If trie database is enabled, commit the state update as a new layer if db := s.db.TrieDB(); db != nil { start := time.Now() - if err := db.Update(ret.root, ret.originRoot, block, ret.nodes, ret.stateSet()); err != nil { + if err := db.Update(ret.root, ret.originRoot, block, ret.Nodes, ret.stateSet()); err != nil { return nil, err } s.TrieDBCommits += time.Since(start) @@ -1777,6 +1777,14 @@ func (s *StateDB) commitAndFlush(block uint64, deleteEmptyObjects bool, noStorag return ret, err } +func (s *StateDB) CommitAndReturnStateUpdate(block uint64, deleteEmptyObjects bool) (common.Hash, *stateUpdate, error) { + ret, err := s.commitAndFlush(block, deleteEmptyObjects, true) + if err != nil { + return common.Hash{}, nil, err + } + return ret.root, ret, nil +} + // Commit writes the state mutations into the configured data stores. // // Once the state is committed, tries cached in stateDB (including account diff --git a/core/state/stateupdate.go b/core/state/stateupdate.go index 75c4ca028c..882e8c6a00 100644 --- a/core/state/stateupdate.go +++ b/core/state/stateupdate.go @@ -81,7 +81,7 @@ type stateUpdate struct { rawStorageKey bool codes map[common.Address]contractCode // codes contains the set of dirty codes - nodes *trienode.MergedNodeSet // Aggregated dirty nodes caused by state changes + Nodes *trienode.MergedNodeSet // Aggregated dirty nodes caused by state changes } // empty returns a flag indicating the state transition is empty or not. @@ -170,7 +170,7 @@ func newStateUpdate(rawStorageKey bool, originRoot common.Hash, root common.Hash storagesOrigin: storagesOrigin, rawStorageKey: rawStorageKey, codes: codes, - nodes: nodes, + Nodes: nodes, } } From 62bd494cc4ced860905c7eaad543d0ded305c5de Mon Sep 17 00:00:00 2001 From: Pratik Patil Date: Mon, 13 Oct 2025 15:00:18 +0530 Subject: [PATCH 03/22] core: updates in witness state caching --- core/blockchain.go | 68 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 48 insertions(+), 20 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 04b554cde8..5e561b4313 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -346,8 +346,8 @@ type BlockChain struct { futureBlocks *lru.Cache[common.Hash, *types.Block] // Span state cache for reducing witness bandwidth - spanStateCache *lru.Cache[string, struct{}] // Cache state nodes within current span - currentSpan uint64 // Track current span for boundary detection + spanStateCache *lru.Cache[string, struct{}] // Cache state nodes within current span + currentcacheSpan uint64 // Track current span for boundary detection wg sync.WaitGroup quit chan struct{} // shutdown signal, closed in Stop. @@ -427,8 +427,8 @@ func NewBlockChain(db ethdb.Database, genesis *Genesis, engine consensus.Engine, // Initialize span state cache with sufficient capacity for one span // Typical span has ~100K-500K unique state nodes - spanStateCache: lru.NewCache[string, struct{}](1000000), - currentSpan: 0, // Will be set on first block processing + spanStateCache: lru.NewCache[string, struct{}](1000000), + currentcacheSpan: 0, // Will be set on first block processing borReceiptsCache: lru.NewCache[common.Hash, *types.Receipt](receiptsCacheLimit), borReceiptsRLPCache: lru.NewCache[common.Hash, rlp.RawValue](receiptsCacheLimit), @@ -3919,6 +3919,32 @@ func (bc *BlockChain) mergeSpanCacheIntoWitness(witness *stateless.Witness) int return mergedCount } +const ( + defaultSpanLength = 6400 // Default span length i.e. number of bor blocks in a span + zerothSpanEnd = 255 // End block of 0th span +) + +func isSpanEnd(blockNum uint64) bool { + if blockNum > zerothSpanEnd { + return (blockNum-zerothSpanEnd)%defaultSpanLength == 0 + } + return blockNum == zerothSpanEnd +} + +func isSpanStart(blockNum uint64) bool { + if blockNum > zerothSpanEnd { + return (blockNum-zerothSpanEnd-1)%defaultSpanLength == 0 + } + return blockNum == 0 +} + +func getSpanNum(blockNum uint64) uint64 { + if blockNum > zerothSpanEnd { + return (blockNum - zerothSpanEnd) / defaultSpanLength + } + return 0 +} + // ProcessBlockWithWitnesses processes a block in stateless mode using the provided witnesses. func (bc *BlockChain) ProcessBlockWithWitnesses(block *types.Block, witness *stateless.Witness) (*state.StateDB, *ProcessResult, error) { if witness == nil { @@ -3927,30 +3953,32 @@ func (bc *BlockChain) ProcessBlockWithWitnesses(block *types.Block, witness *sta blockNum := block.Number().Uint64() - // Span boundary detection and cache management - const spanSize uint64 = 6400 // Bor span length - newSpan := blockNum / spanSize - - if bc.spanStateCache != nil && newSpan != bc.currentSpan { + // Span boundary detection and cache management using isSpanStart + if bc.spanStateCache != nil && isSpanStart(blockNum) { oldCacheSize := bc.spanStateCache.Len() bc.spanStateCache.Purge() - bc.currentSpan = newSpan + + // Calculate span number for logging + spanNum := getSpanNum(blockNum) + bc.currentcacheSpan = spanNum + log.Info("Span boundary: flushed state cache", "block", blockNum, - "oldSpan", bc.currentSpan-1, - "newSpan", newSpan, + "spanNum", spanNum, "cachedStates", oldCacheSize) } // Merge cached states into witness for blocks that aren't the first in span - witnessStatesBefore := len(witness.State) - mergedCount := bc.mergeSpanCacheIntoWitness(witness) - if mergedCount > 0 { - log.Debug("Merged cached states into witness", - "block", blockNum, - "witnessStatesBefore", witnessStatesBefore, - "mergedFromCache", mergedCount, - "witnessStatesAfter", len(witness.State)) + if !isSpanStart(blockNum) { + witnessStatesBefore := len(witness.State) + mergedCount := bc.mergeSpanCacheIntoWitness(witness) + if mergedCount > 0 { + log.Debug("Merged cached states into witness", + "block", blockNum, + "witnessStatesBefore", witnessStatesBefore, + "mergedFromCache", mergedCount, + "witnessStatesAfter", len(witness.State)) + } } // Validate witness. From d4707dacffc96b3747a7fc496c785c8f2cc9f22b Mon Sep 17 00:00:00 2001 From: Pratik Patil Date: Thu, 23 Oct 2025 17:02:06 +0530 Subject: [PATCH 04/22] core: updated the witness state caching mechanism to now use 2 sliding windows --- core/blockchain.go | 173 ++++++++++++++++++++++++--------------------- 1 file changed, 91 insertions(+), 82 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 5e561b4313..3202a5d162 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -345,9 +345,14 @@ type BlockChain struct { // future blocks are blocks added for later processing futureBlocks *lru.Cache[common.Hash, *types.Block] - // Span state cache for reducing witness bandwidth - spanStateCache *lru.Cache[string, struct{}] // Cache state nodes within current span - currentcacheSpan uint64 // Track current span for boundary detection + // Sliding window cache for reducing witness bandwidth + // Uses two maps with overlap for efficient small-window caching + activeCacheMap map[string]struct{} // Currently active cache map + nextCacheMap map[string]struct{} // Pre-warming map for next window + cacheWindowStart uint64 // Start block of current window + cacheWindowSize uint64 // Size of each window (e.g., 20 blocks) + cacheOverlapSize uint64 // Overlap between windows (e.g., 10 blocks) + cacheLock sync.RWMutex // Lock for cache operations wg sync.WaitGroup quit chan struct{} // shutdown signal, closed in Stop. @@ -425,10 +430,12 @@ func NewBlockChain(db ethdb.Database, genesis *Genesis, engine consensus.Engine, futureBlocks: lru.NewCache[common.Hash, *types.Block](maxFutureBlocks), engine: engine, - // Initialize span state cache with sufficient capacity for one span - // Typical span has ~100K-500K unique state nodes - spanStateCache: lru.NewCache[string, struct{}](1000000), - currentcacheSpan: 0, // Will be set on first block processing + // Initialize sliding window cache for witness bandwidth reduction + activeCacheMap: make(map[string]struct{}), + nextCacheMap: make(map[string]struct{}), + cacheWindowStart: 0, + cacheWindowSize: 20, // 20 blocks per window + cacheOverlapSize: 10, // 10 blocks overlap borReceiptsCache: lru.NewCache[common.Hash, *types.Receipt](receiptsCacheLimit), borReceiptsRLPCache: lru.NewCache[common.Hash, rlp.RawValue](receiptsCacheLimit), @@ -3896,18 +3903,20 @@ func (bc *BlockChain) SubscribeChain2HeadEvent(ch chan<- Chain2HeadEvent) event. return bc.scope.Track(bc.chain2HeadFeed.Subscribe(ch)) } -// mergeSpanCacheIntoWitness merges cached state nodes from the span cache into the witness. +// mergeSpanCacheIntoWitness merges cached state nodes from the active cache map into the witness. // This allows reduced witnesses to work by adding previously cached state data. func (bc *BlockChain) mergeSpanCacheIntoWitness(witness *stateless.Witness) int { - if bc.spanStateCache == nil || witness == nil { + if witness == nil { return 0 } + bc.cacheLock.RLock() + defer bc.cacheLock.RUnlock() + mergedCount := 0 - keys := bc.spanStateCache.Keys() - for _, key := range keys { - stateNode := key + // Merge from active cache map + for stateNode := range bc.activeCacheMap { // Check if this state node is missing from the witness if _, exists := witness.State[stateNode]; !exists { // Add it from cache @@ -3919,32 +3928,6 @@ func (bc *BlockChain) mergeSpanCacheIntoWitness(witness *stateless.Witness) int return mergedCount } -const ( - defaultSpanLength = 6400 // Default span length i.e. number of bor blocks in a span - zerothSpanEnd = 255 // End block of 0th span -) - -func isSpanEnd(blockNum uint64) bool { - if blockNum > zerothSpanEnd { - return (blockNum-zerothSpanEnd)%defaultSpanLength == 0 - } - return blockNum == zerothSpanEnd -} - -func isSpanStart(blockNum uint64) bool { - if blockNum > zerothSpanEnd { - return (blockNum-zerothSpanEnd-1)%defaultSpanLength == 0 - } - return blockNum == 0 -} - -func getSpanNum(blockNum uint64) uint64 { - if blockNum > zerothSpanEnd { - return (blockNum - zerothSpanEnd) / defaultSpanLength - } - return 0 -} - // ProcessBlockWithWitnesses processes a block in stateless mode using the provided witnesses. func (bc *BlockChain) ProcessBlockWithWitnesses(block *types.Block, witness *stateless.Witness) (*state.StateDB, *ProcessResult, error) { if witness == nil { @@ -3953,23 +3936,27 @@ func (bc *BlockChain) ProcessBlockWithWitnesses(block *types.Block, witness *sta blockNum := block.Number().Uint64() - // Span boundary detection and cache management using isSpanStart - if bc.spanStateCache != nil && isSpanStart(blockNum) { - oldCacheSize := bc.spanStateCache.Len() - bc.spanStateCache.Purge() + // Check if we need to slide the window + blocksSinceWindowStart := blockNum - bc.cacheWindowStart - // Calculate span number for logging - spanNum := getSpanNum(blockNum) - bc.currentcacheSpan = spanNum + if blocksSinceWindowStart >= bc.cacheWindowSize { + // Time to slide the window: discard active, promote next, create new next + bc.cacheLock.Lock() + oldActiveSize := len(bc.activeCacheMap) + bc.activeCacheMap = bc.nextCacheMap + bc.nextCacheMap = make(map[string]struct{}) + bc.cacheWindowStart = blockNum + bc.cacheLock.Unlock() - log.Info("Span boundary: flushed state cache", + log.Info("Sliding window: switched to next cache map", "block", blockNum, - "spanNum", spanNum, - "cachedStates", oldCacheSize) + "newWindowStart", blockNum, + "discardedSize", oldActiveSize, + "newActiveSize", len(bc.activeCacheMap)) } - // Merge cached states into witness for blocks that aren't the first in span - if !isSpanStart(blockNum) { + // Merge cached states into witness for blocks that aren't at window start + if blockNum != bc.cacheWindowStart { witnessStatesBefore := len(witness.State) mergedCount := bc.mergeSpanCacheIntoWitness(witness) if mergedCount > 0 { @@ -4016,48 +4003,70 @@ func (bc *BlockChain) ProcessBlockWithWitnesses(block *types.Block, witness *sta return nil, nil, err } - // Cache witness states and updated states for subsequent blocks in this span - if bc.spanStateCache != nil { - cachedWitnessStates := 0 - cachedUpdatedStates := 0 + // Cache witness states and updated states using sliding window approach + bc.cacheLock.Lock() + defer bc.cacheLock.Unlock() + + cachedWitnessStates := 0 + cachedUpdatedStates := 0 - // Cache all witness state nodes - for stateNode := range witness.State { - if _, exists := bc.spanStateCache.Get(stateNode); !exists { - bc.spanStateCache.Add(stateNode, struct{}{}) - cachedWitnessStates++ + // Determine if we're in overlap period (should cache to both maps) + blocksSinceWindowStart = blockNum - bc.cacheWindowStart + inOverlapPeriod := blocksSinceWindowStart >= bc.cacheOverlapSize + + // Cache all witness state nodes + for stateNode := range witness.State { + // Add to active map + if _, exists := bc.activeCacheMap[stateNode]; !exists { + bc.activeCacheMap[stateNode] = struct{}{} + cachedWitnessStates++ + } + // Also add to next map if in overlap period + if inOverlapPeriod { + if _, exists := bc.nextCacheMap[stateNode]; !exists { + bc.nextCacheMap[stateNode] = struct{}{} } } + } + + // Extract and cache updated states from execution + if statedb != nil { + _, stateUpdate, _ := statedb.CommitAndReturnStateUpdate(blockNum, bc.chainConfig.IsEIP158(block.Number())) + if stateUpdate != nil && stateUpdate.Nodes != nil { + nodes := stateUpdate.Nodes - // Extract and cache updated states from execution - if statedb != nil { - _, stateUpdate, _ := statedb.CommitAndReturnStateUpdate(blockNum, bc.chainConfig.IsEIP158(block.Number())) - if stateUpdate != nil && stateUpdate.Nodes != nil { - nodes := stateUpdate.Nodes - - // Iterate through all node sets - for owner := range nodes.Sets { - subset := nodes.Sets[owner] - subset.ForEachWithOrder(func(path string, n *trienode.Node) { - if !n.IsDeleted() && n.Blob != nil { - stateNode := string(n.Blob) - if _, exists := bc.spanStateCache.Get(stateNode); !exists { - bc.spanStateCache.Add(stateNode, struct{}{}) - cachedUpdatedStates++ + // Iterate through all node sets + for owner := range nodes.Sets { + subset := nodes.Sets[owner] + subset.ForEachWithOrder(func(path string, n *trienode.Node) { + if !n.IsDeleted() && n.Blob != nil { + stateNode := string(n.Blob) + // Add to active map + if _, exists := bc.activeCacheMap[stateNode]; !exists { + bc.activeCacheMap[stateNode] = struct{}{} + cachedUpdatedStates++ + } + // Also add to next map if in overlap period + if inOverlapPeriod { + if _, exists := bc.nextCacheMap[stateNode]; !exists { + bc.nextCacheMap[stateNode] = struct{}{} } } - }) - } + } + }) } } - - log.Debug("Cached states for span", - "block", blockNum, - "cachedWitnessStates", cachedWitnessStates, - "cachedUpdatedStates", cachedUpdatedStates, - "totalCacheSize", bc.spanStateCache.Len()) } + log.Debug("Cached states for sliding window", + "block", blockNum, + "windowStart", bc.cacheWindowStart, + "inOverlap", inOverlapPeriod, + "cachedWitnessStates", cachedWitnessStates, + "cachedUpdatedStates", cachedUpdatedStates, + "activeMapSize", len(bc.activeCacheMap), + "nextMapSize", len(bc.nextCacheMap)) + return statedb, res, nil } From 46139f24be2dfbd3d0722febccaa44c6b8e741af Mon Sep 17 00:00:00 2001 From: Pratik Patil Date: Fri, 24 Oct 2025 13:47:13 +0530 Subject: [PATCH 05/22] core: added a way for full nodes to replicate the witness state cache --- core/blockchain.go | 172 ++++++++++++++++++++++++++------------------- 1 file changed, 100 insertions(+), 72 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 3202a5d162..69eeace56e 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2207,6 +2207,12 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. log.Debug("Writing witness", "block", block.NumberU64(), "hash", block.Hash(), "header", statedb.Witness().Header()) rawdb.WriteWitness(blockBatch, block.Hash(), witBuf.Bytes()) + + // Manage sliding window for full nodes (mirrors witness-receiving nodes) + bc.manageSlidingWindow(block.NumberU64()) + + // Update sliding window cache - allows full nodes to send reduced witnesses + bc.updateSlidingWindowCache(block.NumberU64(), statedb.Witness(), statedb) } else { log.Debug("No witness to write", "block", block.NumberU64()) } @@ -3928,15 +3934,9 @@ func (bc *BlockChain) mergeSpanCacheIntoWitness(witness *stateless.Witness) int return mergedCount } -// ProcessBlockWithWitnesses processes a block in stateless mode using the provided witnesses. -func (bc *BlockChain) ProcessBlockWithWitnesses(block *types.Block, witness *stateless.Witness) (*state.StateDB, *ProcessResult, error) { - if witness == nil { - return nil, nil, errors.New("nil witness") - } - - blockNum := block.Number().Uint64() - - // Check if we need to slide the window +// manageSlidingWindow handles window sliding logic. +// Returns true if window was slid, false otherwise. +func (bc *BlockChain) manageSlidingWindow(blockNum uint64) bool { blocksSinceWindowStart := blockNum - bc.cacheWindowStart if blocksSinceWindowStart >= bc.cacheWindowSize { @@ -3953,8 +3953,97 @@ func (bc *BlockChain) ProcessBlockWithWitnesses(block *types.Block, witness *sta "newWindowStart", blockNum, "discardedSize", oldActiveSize, "newActiveSize", len(bc.activeCacheMap)) + return true + } + return false +} + +// updateSlidingWindowCache updates the sliding window cache after processing a block. +// This is used by both full nodes and witness-receiving nodes to maintain synchronized caches. +func (bc *BlockChain) updateSlidingWindowCache(blockNum uint64, witness *stateless.Witness, statedb *state.StateDB) { + if witness == nil && statedb == nil { + return + } + + bc.cacheLock.Lock() + defer bc.cacheLock.Unlock() + + // Determine if we're in overlap period (should cache to both maps) + blocksSinceWindowStart := blockNum - bc.cacheWindowStart + inOverlapPeriod := blocksSinceWindowStart >= bc.cacheOverlapSize + + cachedToActive := 0 + cachedToNext := 0 + + // Cache witness state nodes if available + if witness != nil { + for stateNode := range witness.State { + // Add to active map + if _, exists := bc.activeCacheMap[stateNode]; !exists { + bc.activeCacheMap[stateNode] = struct{}{} + cachedToActive++ + } + // Also add to next map if in overlap period + if inOverlapPeriod { + if _, exists := bc.nextCacheMap[stateNode]; !exists { + bc.nextCacheMap[stateNode] = struct{}{} + cachedToNext++ + } + } + } + } + + // Extract and cache updated states from execution + if statedb != nil { + _, stateUpdate, _ := statedb.CommitAndReturnStateUpdate(blockNum, true) // Note: EIP158 check done elsewhere + if stateUpdate != nil && stateUpdate.Nodes != nil { + nodes := stateUpdate.Nodes + + // Iterate through all node sets + for owner := range nodes.Sets { + subset := nodes.Sets[owner] + subset.ForEachWithOrder(func(path string, n *trienode.Node) { + if !n.IsDeleted() && n.Blob != nil { + stateNode := string(n.Blob) + // Add to active map + if _, exists := bc.activeCacheMap[stateNode]; !exists { + bc.activeCacheMap[stateNode] = struct{}{} + cachedToActive++ + } + // Also add to next map if in overlap period + if inOverlapPeriod { + if _, exists := bc.nextCacheMap[stateNode]; !exists { + bc.nextCacheMap[stateNode] = struct{}{} + cachedToNext++ + } + } + } + }) + } + } + } + + log.Debug("Updated sliding window cache", + "block", blockNum, + "windowStart", bc.cacheWindowStart, + "inOverlap", inOverlapPeriod, + "cachedToActive", cachedToActive, + "cachedToNext", cachedToNext, + "activeMapSize", len(bc.activeCacheMap), + "nextMapSize", len(bc.nextCacheMap)) +} + +// ProcessBlockWithWitnesses processes a block in stateless mode using the provided witnesses. +func (bc *BlockChain) ProcessBlockWithWitnesses(block *types.Block, witness *stateless.Witness) (*state.StateDB, *ProcessResult, error) { + if witness == nil { + return nil, nil, errors.New("nil witness") } + blockNum := block.Number().Uint64() + + // Manage sliding window (slide if needed) + bc.manageSlidingWindow(blockNum) + // Merge cached states into witness for blocks that aren't at window start if blockNum != bc.cacheWindowStart { witnessStatesBefore := len(witness.State) @@ -4003,69 +4092,8 @@ func (bc *BlockChain) ProcessBlockWithWitnesses(block *types.Block, witness *sta return nil, nil, err } - // Cache witness states and updated states using sliding window approach - bc.cacheLock.Lock() - defer bc.cacheLock.Unlock() - - cachedWitnessStates := 0 - cachedUpdatedStates := 0 - - // Determine if we're in overlap period (should cache to both maps) - blocksSinceWindowStart = blockNum - bc.cacheWindowStart - inOverlapPeriod := blocksSinceWindowStart >= bc.cacheOverlapSize - - // Cache all witness state nodes - for stateNode := range witness.State { - // Add to active map - if _, exists := bc.activeCacheMap[stateNode]; !exists { - bc.activeCacheMap[stateNode] = struct{}{} - cachedWitnessStates++ - } - // Also add to next map if in overlap period - if inOverlapPeriod { - if _, exists := bc.nextCacheMap[stateNode]; !exists { - bc.nextCacheMap[stateNode] = struct{}{} - } - } - } - - // Extract and cache updated states from execution - if statedb != nil { - _, stateUpdate, _ := statedb.CommitAndReturnStateUpdate(blockNum, bc.chainConfig.IsEIP158(block.Number())) - if stateUpdate != nil && stateUpdate.Nodes != nil { - nodes := stateUpdate.Nodes - - // Iterate through all node sets - for owner := range nodes.Sets { - subset := nodes.Sets[owner] - subset.ForEachWithOrder(func(path string, n *trienode.Node) { - if !n.IsDeleted() && n.Blob != nil { - stateNode := string(n.Blob) - // Add to active map - if _, exists := bc.activeCacheMap[stateNode]; !exists { - bc.activeCacheMap[stateNode] = struct{}{} - cachedUpdatedStates++ - } - // Also add to next map if in overlap period - if inOverlapPeriod { - if _, exists := bc.nextCacheMap[stateNode]; !exists { - bc.nextCacheMap[stateNode] = struct{}{} - } - } - } - }) - } - } - } - - log.Debug("Cached states for sliding window", - "block", blockNum, - "windowStart", bc.cacheWindowStart, - "inOverlap", inOverlapPeriod, - "cachedWitnessStates", cachedWitnessStates, - "cachedUpdatedStates", cachedUpdatedStates, - "activeMapSize", len(bc.activeCacheMap), - "nextMapSize", len(bc.nextCacheMap)) + // Update sliding window cache with witness and execution data + bc.updateSlidingWindowCache(blockNum, witness, statedb) return statedb, res, nil } From 464072bbac98a2ed694ec88ee25c41c4eac934fd Mon Sep 17 00:00:00 2001 From: Pratik Patil Date: Mon, 27 Oct 2025 17:25:10 +0530 Subject: [PATCH 06/22] eth/protocols/wit: added message type to get and send reduced witnesses --- eth/protocols/wit/handler.go | 2 ++ eth/protocols/wit/handlers.go | 39 +++++++++++++++++++++++++++++++++++ eth/protocols/wit/peer.go | 27 ++++++++++++++++++++++++ eth/protocols/wit/protocol.go | 8 ++++++- 4 files changed, 75 insertions(+), 1 deletion(-) diff --git a/eth/protocols/wit/handler.go b/eth/protocols/wit/handler.go index 1121e68835..f44c3e94ba 100644 --- a/eth/protocols/wit/handler.go +++ b/eth/protocols/wit/handler.go @@ -131,6 +131,8 @@ var wit2 = map[uint64]msgHandler{ NewWitnessHashesMsg: handleNewWitnessHashes, GetWitnessMetadataMsg: handleGetWitnessMetadata, WitnessMetadataMsg: handleWitnessMetadata, + GetReducedWitnessMsg: handleGetReducedWitness, + ReducedWitnessMsg: handleReducedWitness, } // HandleMessage is invoked whenever an inbound message is received from a diff --git a/eth/protocols/wit/handlers.go b/eth/protocols/wit/handlers.go index b7319d1869..d529129f33 100644 --- a/eth/protocols/wit/handlers.go +++ b/eth/protocols/wit/handlers.go @@ -99,3 +99,42 @@ func handleWitnessMetadata(backend Backend, msg Decoder, peer *Peer) error { log.Debug("Dispatching witness metadata response packet", "peer", peer.ID(), "reqID", packet.RequestId, "count", len(packet.Metadata)) return peer.dispatchResponse(res, nil) } + +// handleGetReducedWitness processes a GetReducedWitnessPacket request from a peer. +// Reuses GetWitnessPacket structure since format is identical. +func handleGetReducedWitness(backend Backend, msg Decoder, peer *Peer) error { + // Decode the GetWitnessPacket request (same structure for reduced witness) + req := new(GetWitnessPacket) + if err := msg.Decode(&req); err != nil { + return fmt.Errorf("failed to decode GetReducedWitnessPacket: %w", err) + } + + // Validate request parameters + if len(req.WitnessPages) == 0 { + return fmt.Errorf("invalid GetReducedWitnessPacket: WitnessPages cannot be empty") + } + + return backend.Handle(peer, req) +} + +// handleReducedWitness processes an incoming reduced witness response from a peer. +// Reuses WitnessPacketRLPPacket structure since format is identical. +func handleReducedWitness(backend Backend, msg Decoder, peer *Peer) error { + // Decode the WitnessPacketRLPPacket response (same structure for reduced witness) + packet := new(WitnessPacketRLPPacket) + if err := msg.Decode(packet); err != nil { + log.Error("Failed to decode reduced witness response packet", "err", err) + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + + // Construct the response object + res := &Response{ + id: packet.RequestId, + code: ReducedWitnessMsg, // Different message code for routing + Res: packet, + } + + // Forward the response to the dispatcher + log.Debug("Dispatching reduced witness response packet", "peer", peer.ID(), "reqID", packet.RequestId, "count", len(packet.WitnessPacketResponse)) + return peer.dispatchResponse(res, nil) +} diff --git a/eth/protocols/wit/peer.go b/eth/protocols/wit/peer.go index 354fd18fbd..a887615208 100644 --- a/eth/protocols/wit/peer.go +++ b/eth/protocols/wit/peer.go @@ -179,6 +179,33 @@ func (p *Peer) RequestWitnessMetadata(hashes []common.Hash, sink chan *Response) return req, nil } +// RequestReducedWitness sends a request to the peer for reduced witnesses (omits cached states). +// Reuses GetWitnessPacket structure since format is identical. +func (p *Peer) RequestReducedWitness(witnessPages []WitnessPageRequest, sink chan *Response) (*Request, error) { + p.lock.Lock() + defer p.lock.Unlock() + + log.Debug("Requesting reduced witness", "peer", p.id, "pages", len(witnessPages)) + id := rand.Uint64() + + req := &Request{ + id: id, + sink: sink, + code: GetReducedWitnessMsg, // Different message code + want: ReducedWitnessMsg, + data: &GetWitnessPacket{ // Same structure as regular witness request + RequestId: id, + GetWitnessRequest: &GetWitnessRequest{ + WitnessPages: witnessPages, + }, + }, + } + if err := p.dispatchRequest(req); err != nil { + return nil, err + } + return req, nil +} + // Close signals the broadcast goroutine to terminate. Only ever call this if // you created the peer yourself via NewPeer. Otherwise let whoever created it // clean it up! diff --git a/eth/protocols/wit/protocol.go b/eth/protocols/wit/protocol.go index 6b28e54c2d..3f156a3ea4 100644 --- a/eth/protocols/wit/protocol.go +++ b/eth/protocols/wit/protocol.go @@ -23,7 +23,7 @@ var ProtocolVersions = []uint{WIT2, WIT1} // protocolLengths are the number of implemented message corresponding to // different protocol versions. -var protocolLengths = map[uint]uint64{WIT2: 6, WIT1: 4} +var protocolLengths = map[uint]uint64{WIT2: 8, WIT1: 4} // maxMessageSize is the maximum cap on the size of a protocol message. const maxMessageSize = 16 * 1024 * 1024 @@ -35,6 +35,8 @@ const ( MsgWitness = 0x03 GetWitnessMetadataMsg = 0x04 WitnessMetadataMsg = 0x05 + GetReducedWitnessMsg = 0x06 // Request reduced witness (omits cached states) + ReducedWitnessMsg = 0x07 // Response with reduced witness data ) var ( @@ -50,6 +52,7 @@ type Packet interface { } // GetWitnessRequest represents a list of witnesses query by witness pages. +// Also used for reduced witness requests (same structure). type GetWitnessRequest struct { WitnessPages []WitnessPageRequest // Request by list of witness pages } @@ -60,12 +63,14 @@ type WitnessPageRequest struct { } // GetWitnessPacket represents a witness query with request ID wrapping. +// Also used for reduced witness requests (same structure). type GetWitnessPacket struct { RequestId uint64 *GetWitnessRequest } // WitnessPacketRLPPacket represents a witness response with request ID wrapping. +// Also used for reduced witness responses (same structure). type WitnessPacketRLPPacket struct { RequestId uint64 WitnessPacketResponse @@ -73,6 +78,7 @@ type WitnessPacketRLPPacket struct { // WitnessPacketResponse represents a witness response, to use when we already // have the witness rlp encoded. +// Also used for reduced witness responses (same structure). type WitnessPacketResponse []WitnessPageResponse type WitnessPageResponse struct { From 4d42f38af1c95b8b007c20d9af2301e3f6aad966 Mon Sep 17 00:00:00 2001 From: Pratik Patil Date: Tue, 28 Oct 2025 14:24:56 +0530 Subject: [PATCH 07/22] eth, core: added logic for reduced witnesses --- core/blockchain.go | 30 +++++++++ eth/handler_wit.go | 117 ++++++++++++++++++++++++++++++++-- eth/protocols/wit/peer.go | 20 +++++- eth/protocols/wit/protocol.go | 3 +- 4 files changed, 161 insertions(+), 9 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 0079079397..d5c62bf56c 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -4212,6 +4212,36 @@ func (bc *BlockChain) manageSlidingWindow(blockNum uint64) bool { return false } +// FilterWitnessWithSlidingCache filters a witness by removing state nodes present in the sliding window cache. +// This is used when sending reduced witnesses to peers. +func (bc *BlockChain) FilterWitnessWithSlidingCache(witness *stateless.Witness) (*stateless.Witness, int, int) { + if witness == nil { + return nil, 0, 0 + } + + filtered := witness.Copy() + + bc.cacheLock.RLock() + defer bc.cacheLock.RUnlock() + + originalStates := len(filtered.State) + filteredStates := make(map[string]struct{}) + removed := 0 + + for stateNode := range filtered.State { + // Only include states NOT in active cache + if _, cached := bc.activeCacheMap[stateNode]; !cached { + filteredStates[stateNode] = struct{}{} + } else { + removed++ + } + } + + filtered.State = filteredStates + + return filtered, originalStates, removed +} + // updateSlidingWindowCache updates the sliding window cache after processing a block. // This is used by both full nodes and witness-receiving nodes to maintain synchronized caches. func (bc *BlockChain) updateSlidingWindowCache(blockNum uint64, witness *stateless.Witness, statedb *state.StateDB) { diff --git a/eth/handler_wit.go b/eth/handler_wit.go index 8792343c3c..4568ae177e 100644 --- a/eth/handler_wit.go +++ b/eth/handler_wit.go @@ -1,6 +1,7 @@ package eth import ( + "bytes" "errors" "fmt" "time" @@ -55,13 +56,27 @@ func (h *witHandler) Handle(peer *wit.Peer, packet wit.Packet) error { case *wit.NewWitnessHashesPacket: return h.handleWitnessHashesAnnounce(peer, packet.Hashes, packet.Numbers) case *wit.GetWitnessPacket: - // Call handleGetWitness which returns the raw RLP data - response, err := h.handleGetWitness(peer, packet) - if err != nil { - return fmt.Errorf("failed to handle GetWitnessPacket: %w", err) + var response wit.WitnessPacketResponse + var err error + + // Check if this is a reduced witness request + if packet.IsReduced { + // Handle reduced witness request with filtering + response, err = h.handleGetReducedWitness(peer, packet) + if err != nil { + return fmt.Errorf("failed to handle GetReducedWitnessPacket: %w", err) + } + // Reply with reduced witness (different message code) + return peer.ReplyReducedWitness(packet.RequestId, &response) + } else { + // Handle regular witness request + response, err = h.handleGetWitness(peer, packet) + if err != nil { + return fmt.Errorf("failed to handle GetWitnessPacket: %w", err) + } + // Reply with regular witness + return peer.ReplyWitness(packet.RequestId, &response) } - // Reply using the retrieved RLP data - return peer.ReplyWitness(packet.RequestId, &response) case *wit.GetWitnessMetadataPacket: // Call handleGetWitnessMetadata which returns only metadata (page count) @@ -182,6 +197,96 @@ func (h *witHandler) handleGetWitness(peer *wit.Peer, req *wit.GetWitnessPacket) return response, nil } +// handleGetReducedWitness retrieves witnesses and filters them using the sliding window cache. +// This reduces bandwidth by omitting state nodes that the receiver should already have cached. +func (h *witHandler) handleGetReducedWitness(peer *wit.Peer, req *wit.GetWitnessPacket) (wit.WitnessPacketResponse, error) { + log.Debug("handleGetReducedWitness processing request", "peer", peer.ID(), "reqID", req.RequestId, "witnessPages", len(req.WitnessPages)) + + // First, get the full witness data (same as regular handleGetWitness) + fullResponse, err := h.handleGetWitness(peer, req) + if err != nil { + return nil, err + } + + // Now filter each witness page by removing cached states + var filteredResponse wit.WitnessPacketResponse + + for _, witnessPage := range fullResponse { + if len(witnessPage.Data) == 0 { + // Empty page, just pass through + filteredResponse = append(filteredResponse, witnessPage) + continue + } + + // Decode the witness from the RLP data + witness, err := stateless.GetWitnessFromRlp(witnessPage.Data) + if err != nil { + log.Warn("Failed to decode witness for filtering", "hash", witnessPage.Hash, "err", err) + // If we can't decode, send the full data + filteredResponse = append(filteredResponse, witnessPage) + continue + } + + // Filter witness using sliding window cache + filteredWitness := h.filterWitnessWithCache(witness) + + // Re-encode the filtered witness + var filteredBuf []byte + if filteredWitness != nil { + var buf bytes.Buffer + err = filteredWitness.EncodeRLP(&buf) + if err != nil { + log.Warn("Failed to encode filtered witness", "hash", witnessPage.Hash, "err", err) + // If encoding fails, send the full data + filteredResponse = append(filteredResponse, witnessPage) + continue + } + filteredBuf = buf.Bytes() + } + + // Create filtered page response + filteredPage := wit.WitnessPageResponse{ + Data: filteredBuf, + Hash: witnessPage.Hash, + Page: witnessPage.Page, + TotalPages: witnessPage.TotalPages, + } + filteredResponse = append(filteredResponse, filteredPage) + + originalSize := len(witnessPage.Data) + filteredSize := len(filteredBuf) + reduction := float64(originalSize-filteredSize) * 100.0 / float64(originalSize) + + log.Debug("Filtered witness for reduced transmission", + "hash", witnessPage.Hash, + "page", witnessPage.Page, + "originalSize", originalSize, + "filteredSize", filteredSize, + "reduction%", fmt.Sprintf("%.2f", reduction)) + } + + log.Debug("handleGetReducedWitness returning filtered witnesses", "peer", peer.ID(), "reqID", req.RequestId, "count", len(filteredResponse)) + return filteredResponse, nil +} + +// filterWitnessWithCache filters a witness by removing state nodes present in the sliding window cache. +func (h *witHandler) filterWitnessWithCache(witness *stateless.Witness) *stateless.Witness { + if witness == nil { + return nil + } + + // Use BlockChain's exported method to filter + filtered, originalStates, removed := h.chain.FilterWitnessWithSlidingCache(witness) + + log.Debug("Filtered witness state using cache", + "blockNum", witness.Header().Number, + "originalStates", originalStates, + "filteredStates", len(filtered.State), + "removed", removed) + + return filtered +} + // handleGetWitnessMetadata retrieves only the metadata (page count, size, block number) for the requested witness hashes. // This is efficient for verification purposes where we don't need the actual witness data. func (h *witHandler) handleGetWitnessMetadata(peer *wit.Peer, req *wit.GetWitnessMetadataPacket) ([]wit.WitnessMetadataResponse, error) { diff --git a/eth/protocols/wit/peer.go b/eth/protocols/wit/peer.go index a887615208..5ed969fbab 100644 --- a/eth/protocols/wit/peer.go +++ b/eth/protocols/wit/peer.go @@ -180,7 +180,7 @@ func (p *Peer) RequestWitnessMetadata(hashes []common.Hash, sink chan *Response) } // RequestReducedWitness sends a request to the peer for reduced witnesses (omits cached states). -// Reuses GetWitnessPacket structure since format is identical. +// Reuses GetWitnessPacket structure with IsReduced flag set to true. func (p *Peer) RequestReducedWitness(witnessPages []WitnessPageRequest, sink chan *Response) (*Request, error) { p.lock.Lock() defer p.lock.Unlock() @@ -193,10 +193,11 @@ func (p *Peer) RequestReducedWitness(witnessPages []WitnessPageRequest, sink cha sink: sink, code: GetReducedWitnessMsg, // Different message code want: ReducedWitnessMsg, - data: &GetWitnessPacket{ // Same structure as regular witness request + data: &GetWitnessPacket{ RequestId: id, GetWitnessRequest: &GetWitnessRequest{ WitnessPages: witnessPages, + IsReduced: true, // Mark as reduced witness request }, }, } @@ -287,6 +288,21 @@ func (p *Peer) ReplyWitnessMetadata(requestID uint64, metadata []WitnessMetadata }) } +// ReplyReducedWitness sends a reduced witness response to the peer. +// Uses ReducedWitnessMsg code but same packet structure as regular witness. +func (p *Peer) ReplyReducedWitness(requestID uint64, response *WitnessPacketResponse) error { + p.lock.Lock() + defer p.lock.Unlock() + + log.Debug("Replying with reduced witness", "peer", p.id, "reqID", requestID, "pages", len(*response)) + + // Send the response with ReducedWitnessMsg code + return p2p.Send(p.rw, ReducedWitnessMsg, &WitnessPacketRLPPacket{ + RequestId: requestID, + WitnessPacketResponse: *response, + }) +} + // KnownCache is a cache for known witness, identified by the hash of the parent witness block. type KnownCache struct { hashes mapset.Set[common.Hash] diff --git a/eth/protocols/wit/protocol.go b/eth/protocols/wit/protocol.go index 3f156a3ea4..95d74f88e6 100644 --- a/eth/protocols/wit/protocol.go +++ b/eth/protocols/wit/protocol.go @@ -52,9 +52,10 @@ type Packet interface { } // GetWitnessRequest represents a list of witnesses query by witness pages. -// Also used for reduced witness requests (same structure). +// Also used for reduced witness requests (distinguished by IsReduced flag). type GetWitnessRequest struct { WitnessPages []WitnessPageRequest // Request by list of witness pages + IsReduced bool // True if requesting reduced witness (omits cached states) } type WitnessPageRequest struct { From 211e1292a58c99fde44a51529428d1125a93a8b2 Mon Sep 17 00:00:00 2001 From: Pratik Patil Date: Wed, 29 Oct 2025 10:56:01 +0530 Subject: [PATCH 08/22] core, eth: sending reduced witness --- core/blockchain.go | 12 +++++++++ eth/handler_eth.go | 65 ++++++++++++++++++++++++++++++++++++---------- eth/peer.go | 35 ++++++++++++++++++------- 3 files changed, 89 insertions(+), 23 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index d5c62bf56c..9073655a80 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -4212,6 +4212,18 @@ func (bc *BlockChain) manageSlidingWindow(blockNum uint64) bool { return false } +// GetCacheWindowStart returns the current cache window start block number. +func (bc *BlockChain) GetCacheWindowStart() uint64 { + bc.cacheLock.RLock() + defer bc.cacheLock.RUnlock() + return bc.cacheWindowStart +} + +// GetCacheWindowSize returns the cache window size. +func (bc *BlockChain) GetCacheWindowSize() uint64 { + return bc.cacheWindowSize +} + // FilterWitnessWithSlidingCache filters a witness by removing state nodes present in the sliding window cache. // This is used when sending reduced witnesses to peers. func (bc *BlockChain) FilterWitnessWithSlidingCache(witness *stateless.Witness) (*stateless.Witness, int, int) { diff --git a/eth/handler_eth.go b/eth/handler_eth.go index dcaa69de1c..c1710cf285 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -118,26 +118,59 @@ func (h *ethHandler) handleBlockAnnounces(peer *eth.Peer, hashes []common.Hash, } } - var witnessRequester func(hash common.Hash, sink chan *eth.Response) (*eth.Request, error) if h.statelessSync.Load() || h.syncWithWitnesses { - // Create a witness requester that uses the wit.Peer's RequestWitness method - witnessRequester = func(hash common.Hash, sink chan *eth.Response) (*eth.Request, error) { - // Get the ethPeer from the peerSet - ethPeer := h.peers.getOnePeerWithWitness(hash) - if ethPeer == nil { - return nil, fmt.Errorf("no peer with witness for hash %s is available", hash) + for i := 0; i < len(unknownHashes); i++ { + hash := unknownHashes[i] + number := unknownNumbers[i] + + // Create witness requester closure that captures block number + witnessRequester := func(hash common.Hash, sink chan *eth.Response) (*eth.Request, error) { + // Get the ethPeer from the peerSet + ethPeer := h.peers.getOnePeerWithWitness(hash) + if ethPeer == nil { + return nil, fmt.Errorf("no peer with witness for hash %s is available", hash) + } + + // Determine if we should request reduced witness based on sliding window + useReduced := h.shouldRequestReducedWitness(number) + + // Request witnesses (reduced or full) using the wit peer with verification + return ethPeer.RequestWitnessesWithVerification([]common.Hash{hash}, sink, h.verifyPageCount, useReduced) } - // Request witnesses using the wit peer with verification - return ethPeer.RequestWitnessesWithVerification([]common.Hash{hash}, sink, h.verifyPageCount) + h.blockFetcher.Notify(peer.ID(), hash, number, time.Now(), peer.RequestOneHeader, peer.RequestBodies, witnessRequester) + } + } else { + for i := 0; i < len(unknownHashes); i++ { + h.blockFetcher.Notify(peer.ID(), unknownHashes[i], unknownNumbers[i], time.Now(), peer.RequestOneHeader, peer.RequestBodies, nil) } } - for i := 0; i < len(unknownHashes); i++ { - h.blockFetcher.Notify(peer.ID(), unknownHashes[i], unknownNumbers[i], time.Now(), peer.RequestOneHeader, peer.RequestBodies, witnessRequester) + return nil +} + +// shouldRequestReducedWitness determines if we should request a reduced witness based on +// the sliding window cache state. Returns true if witness should be reduced, false for full. +func (h *ethHandler) shouldRequestReducedWitness(blockNum uint64) bool { + + // TODO(@pratikspatil024) - handle node out of sync case + + // Get the blockchain's current window start + windowStart := h.chain.GetCacheWindowStart() + windowSize := h.chain.GetCacheWindowSize() + + // If block is at window start, need full witness (cache was just cleared/slid) + if blockNum == windowStart { + return false } - return nil + // If block is within current window but not at start, can use reduced witness + if blockNum > windowStart && blockNum < windowStart+windowSize { + return true + } + + // Block is outside current window, need full witness + return false } // verifyPageCount verifies the witness page count for a given block hash by @@ -192,6 +225,10 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td if h.statelessSync.Load() || h.syncWithWitnesses { log.Debug("Received block broadcast during stateless sync", "blockNumber", block.NumberU64(), "blockHash", block.Hash()) + // Determine if we should request reduced witness for this block + blockNum := block.NumberU64() + useReduced := h.shouldRequestReducedWitness(blockNum) + // Create a witness requester closure *only if* the peer supports the protocol. witnessRequester := func(hash common.Hash, sink chan *eth.Response) (*eth.Request, error) { // Get the ethPeer from the peerSet @@ -200,8 +237,8 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td return nil, fmt.Errorf("no peer with witness for hash %s is available", hash) } - // Request witnesses using the wit peer with verification - return ethPeer.RequestWitnessesWithVerification([]common.Hash{hash}, sink, h.verifyPageCount) + // Request witnesses (reduced or full) using the wit peer with verification + return ethPeer.RequestWitnessesWithVerification([]common.Hash{hash}, sink, h.verifyPageCount, useReduced) } // Call the new fetcher method to inject the block diff --git a/eth/peer.go b/eth/peer.go index cf6c2d65a5..e8a0e6ff17 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -100,6 +100,7 @@ type WitnessPeer interface { AsyncSendNewWitness(witness *stateless.Witness) AsyncSendNewWitnessHash(hash common.Hash, number uint64) RequestWitness(witnessPages []wit.WitnessPageRequest, sink chan *wit.Response) (*wit.Request, error) + RequestReducedWitness(witnessPages []wit.WitnessPageRequest, sink chan *wit.Response) (*wit.Request, error) RequestWitnessMetadata(hashes []common.Hash, sink chan *wit.Response) (*wit.Request, error) Close() ID() string @@ -111,6 +112,7 @@ type WitnessPeer interface { KnownWitnessesContains(witness *stateless.Witness) bool KnownWitnessContainsHash(hash common.Hash) bool ReplyWitness(requestID uint64, response *wit.WitnessPacketResponse) error + ReplyReducedWitness(requestID uint64, response *wit.WitnessPacketResponse) error } // witPeer is wrapper around wit.Peer to maintain a few extra metadata and test compatibility. @@ -174,7 +176,7 @@ func (p *ethPeer) SupportsWitness() bool { // RequestWitnesses implements downloader.Peer. // It requests witnesses using the wit protocol for the given block hashes. func (p *ethPeer) RequestWitnesses(hashes []common.Hash, dlResCh chan *eth.Response) (*eth.Request, error) { - return p.RequestWitnessesWithVerification(hashes, dlResCh, nil) + return p.RequestWitnessesWithVerification(hashes, dlResCh, nil, false) // Default to full witness } // RequestWitnessPageCount requests only the page count for a witness using the new metadata protocol. @@ -287,7 +289,7 @@ func (p *ethPeer) requestWitnessPageCountLegacy(hash common.Hash) (uint64, error } // RequestWitnessesWithVerification requests witnesses with optional page count verification -func (p *ethPeer) RequestWitnessesWithVerification(hashes []common.Hash, dlResCh chan *eth.Response, verifyPageCount func(common.Hash, uint64, string) bool) (*eth.Request, error) { +func (p *ethPeer) RequestWitnessesWithVerification(hashes []common.Hash, dlResCh chan *eth.Response, verifyPageCount func(common.Hash, uint64, string) bool, useReduced bool) (*eth.Request, error) { if p.witPeer == nil { return nil, errors.New("witness peer not found") } @@ -305,7 +307,7 @@ func (p *ethPeer) RequestWitnessesWithVerification(hashes []common.Hash, dlResCh var buildRequestMu sync.RWMutex // Build all the initial requests synchronously. - buildWitReqErr := p.buildWitnessRequests(hashes, &witReqs, &witReqsWg, witTotalPages, witTotalRequest, witReqResCh, witReqSem, &mapsMu, &buildRequestMu, failedRequests) + buildWitReqErr := p.buildWitnessRequests(hashes, &witReqs, &witReqsWg, witTotalPages, witTotalRequest, witReqResCh, witReqSem, &mapsMu, &buildRequestMu, failedRequests, useReduced) if buildWitReqErr != nil { p.witPeer.Peer.Log().Error("Error in building witness requests", "peer", p.ID(), "err", buildWitReqErr) return nil, buildWitReqErr @@ -339,7 +341,7 @@ func (p *ethPeer) RequestWitnessesWithVerification(hashes []common.Hash, dlResCh reconstructedWitness := make(map[common.Hash]*stateless.Witness) var lastWitRes *wit.Response for witRes := range witReqResCh { - p.receiveWitnessPage(witRes, receivedWitPages, reconstructedWitness, hashes, &witReqs, &witReqsWg, witTotalPages, witTotalRequest, witReqResCh, witReqSem, &mapsMu, &buildRequestMu, failedRequests, downloadPaused, verifyPageCount) + p.receiveWitnessPage(witRes, receivedWitPages, reconstructedWitness, hashes, &witReqs, &witReqsWg, witTotalPages, witTotalRequest, witReqResCh, witReqSem, &mapsMu, &buildRequestMu, failedRequests, downloadPaused, verifyPageCount, useReduced) <-witReqSem // Check if the Response is nil before accessing the Done channel. @@ -445,6 +447,7 @@ func (p *ethPeer) receiveWitnessPage( failedRequests map[common.Hash]map[uint64]witReqRetryCount, downloadPaused map[common.Hash]bool, verifyPageCount func(common.Hash, uint64, string) bool, + useReduced bool, ) (retrievedError error) { defer func() { // if fails map on retry count and request again @@ -464,7 +467,7 @@ func (p *ethPeer) receiveWitnessPage( // non blocking call to avoid race condition because of semaphore witReqsWg.Add(1) // protecting from not finishing before requests are built go func() { - buildWitReqErr := p.buildWitnessRequests(hashes, witReqs, witReqsWg, witTotalPages, witTotalRequest, witResCh, witReqSem, mapsMu, buildRequestMu, failedRequests) + buildWitReqErr := p.buildWitnessRequests(hashes, witReqs, witReqsWg, witTotalPages, witTotalRequest, witResCh, witReqSem, mapsMu, buildRequestMu, failedRequests, useReduced) if buildWitReqErr != nil { p.witPeer.Peer.Log().Error("Error in building witness requests", "peer", p.ID(), "err", buildWitReqErr) } @@ -564,7 +567,7 @@ func (p *ethPeer) receiveWitnessPage( // non blocking call to avoid race condition because of semaphore witReqsWg.Add(1) // protecting from not finishing before requests are built go func() { - buildWitReqErr := p.buildWitnessRequests(hashes, witReqs, witReqsWg, witTotalPages, witTotalRequest, witResCh, witReqSem, mapsMu, buildRequestMu, failedRequests) + buildWitReqErr := p.buildWitnessRequests(hashes, witReqs, witReqsWg, witTotalPages, witTotalRequest, witResCh, witReqSem, mapsMu, buildRequestMu, failedRequests, useReduced) if buildWitReqErr != nil { p.witPeer.Peer.Log().Error("Error in building witness requests", "peer", p.ID(), "err", buildWitReqErr) } @@ -603,6 +606,7 @@ func (p *ethPeer) buildWitnessRequests(hashes []common.Hash, mapsMu *sync.RWMutex, buildRequestMu *sync.RWMutex, failedRequests map[common.Hash]map[uint64]witReqRetryCount, + useReduced bool, ) error { buildRequestMu.Lock() defer buildRequestMu.Unlock() @@ -627,6 +631,7 @@ func (p *ethPeer) buildWitnessRequests(hashes []common.Hash, witReqSem, mapsMu, witTotalRequest, + useReduced, ); err != nil { return err } @@ -647,6 +652,7 @@ func (p *ethPeer) buildWitnessRequests(hashes []common.Hash, witReqSem, mapsMu, witTotalRequest, + useReduced, ); err != nil { return err } @@ -668,14 +674,25 @@ func (p *ethPeer) doWitnessRequest( witReqSem chan int, mapsMu *sync.RWMutex, witTotalRequest map[common.Hash]uint64, + useReduced bool, ) error { - p.witPeer.Peer.Log().Debug("RequestWitnesses building a wit request", "peer", p.ID(), "hash", hash, "page", page) + p.witPeer.Peer.Log().Debug("RequestWitnesses building a wit request", "peer", p.ID(), "hash", hash, "page", page, "reduced", useReduced) witReqSem <- 1 witResCh := make(chan *wit.Response) request := []wit.WitnessPageRequest{{Hash: hash, Page: page}} - witReq, err := p.witPeer.Peer.RequestWitness(request, witResCh) + + var witReq *wit.Request + var err error + + // Use reduced or full witness request based on parameter + if useReduced { + witReq, err = p.witPeer.Peer.RequestReducedWitness(request, witResCh) + } else { + witReq, err = p.witPeer.Peer.RequestWitness(request, witResCh) + } + if err != nil { - p.witPeer.Peer.Log().Error("Error in making wit request", "peer", p.ID(), "err", err) + p.witPeer.Peer.Log().Error("Error in making wit request", "peer", p.ID(), "reduced", useReduced, "err", err) return err } go func() { From 3dfe7ad61b89110232d83c2b85ebfd8407364fc7 Mon Sep 17 00:00:00 2001 From: Pratik Patil Date: Wed, 29 Oct 2025 12:18:35 +0530 Subject: [PATCH 09/22] eth: updated mock witness peer --- eth/peer_mock.go | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/eth/peer_mock.go b/eth/peer_mock.go index 825c3187ed..d77592bc98 100644 --- a/eth/peer_mock.go +++ b/eth/peer_mock.go @@ -169,6 +169,20 @@ func (mr *MockWitnessPeerMockRecorder) Log() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Log", reflect.TypeOf((*MockWitnessPeer)(nil).Log)) } +// ReplyReducedWitness mocks base method. +func (m *MockWitnessPeer) ReplyReducedWitness(requestID uint64, response *wit.WitnessPacketResponse) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReplyReducedWitness", requestID, response) + ret0, _ := ret[0].(error) + return ret0 +} + +// ReplyReducedWitness indicates an expected call of ReplyReducedWitness. +func (mr *MockWitnessPeerMockRecorder) ReplyReducedWitness(requestID, response interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReplyReducedWitness", reflect.TypeOf((*MockWitnessPeer)(nil).ReplyReducedWitness), requestID, response) +} + // ReplyWitness mocks base method. func (m *MockWitnessPeer) ReplyWitness(requestID uint64, response *wit.WitnessPacketResponse) error { m.ctrl.T.Helper() @@ -183,6 +197,21 @@ func (mr *MockWitnessPeerMockRecorder) ReplyWitness(requestID, response interfac return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReplyWitness", reflect.TypeOf((*MockWitnessPeer)(nil).ReplyWitness), requestID, response) } +// RequestReducedWitness mocks base method. +func (m *MockWitnessPeer) RequestReducedWitness(witnessPages []wit.WitnessPageRequest, sink chan *wit.Response) (*wit.Request, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RequestReducedWitness", witnessPages, sink) + ret0, _ := ret[0].(*wit.Request) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// RequestReducedWitness indicates an expected call of RequestReducedWitness. +func (mr *MockWitnessPeerMockRecorder) RequestReducedWitness(witnessPages, sink interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequestReducedWitness", reflect.TypeOf((*MockWitnessPeer)(nil).RequestReducedWitness), witnessPages, sink) +} + // RequestWitness mocks base method. func (m *MockWitnessPeer) RequestWitness(witnessPages []wit.WitnessPageRequest, sink chan *wit.Response) (*wit.Request, error) { m.ctrl.T.Helper() From 3683987424732426b6caf87910261fada385dad0 Mon Sep 17 00:00:00 2001 From: Pratik Patil Date: Wed, 29 Oct 2025 20:01:52 +0530 Subject: [PATCH 10/22] fix lint --- eth/handler_eth.go | 1 - 1 file changed, 1 deletion(-) diff --git a/eth/handler_eth.go b/eth/handler_eth.go index c1710cf285..71dfb9370b 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -152,7 +152,6 @@ func (h *ethHandler) handleBlockAnnounces(peer *eth.Peer, hashes []common.Hash, // shouldRequestReducedWitness determines if we should request a reduced witness based on // the sliding window cache state. Returns true if witness should be reduced, false for full. func (h *ethHandler) shouldRequestReducedWitness(blockNum uint64) bool { - // TODO(@pratikspatil024) - handle node out of sync case // Get the blockchain's current window start From 737856fb641ee12e1b4b738a1221883387d38f96 Mon Sep 17 00:00:00 2001 From: Pratik Patil Date: Thu, 30 Oct 2025 16:16:55 +0530 Subject: [PATCH 11/22] updated name reduced -> compact --- eth/handler_eth.go | 24 ++++++++++++------------ eth/handler_wit.go | 24 ++++++++++++------------ eth/peer.go | 34 +++++++++++++++++----------------- eth/peer_mock.go | 24 ++++++++++++------------ eth/protocols/wit/handler.go | 4 ++-- eth/protocols/wit/handlers.go | 22 +++++++++++----------- eth/protocols/wit/peer.go | 26 +++++++++++++------------- eth/protocols/wit/protocol.go | 14 +++++++------- 8 files changed, 86 insertions(+), 86 deletions(-) diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 71dfb9370b..7551195bed 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -131,11 +131,11 @@ func (h *ethHandler) handleBlockAnnounces(peer *eth.Peer, hashes []common.Hash, return nil, fmt.Errorf("no peer with witness for hash %s is available", hash) } - // Determine if we should request reduced witness based on sliding window - useReduced := h.shouldRequestReducedWitness(number) + // Determine if we should request compact witness based on sliding window + useCompact := h.shouldRequestCompactWitness(number) - // Request witnesses (reduced or full) using the wit peer with verification - return ethPeer.RequestWitnessesWithVerification([]common.Hash{hash}, sink, h.verifyPageCount, useReduced) + // Request witnesses (compact or full) using the wit peer with verification + return ethPeer.RequestWitnessesWithVerification([]common.Hash{hash}, sink, h.verifyPageCount, useCompact) } h.blockFetcher.Notify(peer.ID(), hash, number, time.Now(), peer.RequestOneHeader, peer.RequestBodies, witnessRequester) @@ -149,9 +149,9 @@ func (h *ethHandler) handleBlockAnnounces(peer *eth.Peer, hashes []common.Hash, return nil } -// shouldRequestReducedWitness determines if we should request a reduced witness based on -// the sliding window cache state. Returns true if witness should be reduced, false for full. -func (h *ethHandler) shouldRequestReducedWitness(blockNum uint64) bool { +// shouldRequestCompactWitness determines if we should request a compact witness based on +// the sliding window cache state. Returns true if witness should be compact, false for full. +func (h *ethHandler) shouldRequestCompactWitness(blockNum uint64) bool { // TODO(@pratikspatil024) - handle node out of sync case // Get the blockchain's current window start @@ -163,7 +163,7 @@ func (h *ethHandler) shouldRequestReducedWitness(blockNum uint64) bool { return false } - // If block is within current window but not at start, can use reduced witness + // If block is within current window but not at start, can use compact witness if blockNum > windowStart && blockNum < windowStart+windowSize { return true } @@ -224,9 +224,9 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td if h.statelessSync.Load() || h.syncWithWitnesses { log.Debug("Received block broadcast during stateless sync", "blockNumber", block.NumberU64(), "blockHash", block.Hash()) - // Determine if we should request reduced witness for this block + // Determine if we should request compact witness for this block blockNum := block.NumberU64() - useReduced := h.shouldRequestReducedWitness(blockNum) + useCompact := h.shouldRequestCompactWitness(blockNum) // Create a witness requester closure *only if* the peer supports the protocol. witnessRequester := func(hash common.Hash, sink chan *eth.Response) (*eth.Request, error) { @@ -236,8 +236,8 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td return nil, fmt.Errorf("no peer with witness for hash %s is available", hash) } - // Request witnesses (reduced or full) using the wit peer with verification - return ethPeer.RequestWitnessesWithVerification([]common.Hash{hash}, sink, h.verifyPageCount, useReduced) + // Request witnesses (compact or full) using the wit peer with verification + return ethPeer.RequestWitnessesWithVerification([]common.Hash{hash}, sink, h.verifyPageCount, useCompact) } // Call the new fetcher method to inject the block diff --git a/eth/handler_wit.go b/eth/handler_wit.go index 4568ae177e..1c9fc02d9a 100644 --- a/eth/handler_wit.go +++ b/eth/handler_wit.go @@ -59,15 +59,15 @@ func (h *witHandler) Handle(peer *wit.Peer, packet wit.Packet) error { var response wit.WitnessPacketResponse var err error - // Check if this is a reduced witness request - if packet.IsReduced { - // Handle reduced witness request with filtering - response, err = h.handleGetReducedWitness(peer, packet) + // Check if this is a compact witness request + if packet.Compact { + // Handle compact witness request with filtering + response, err = h.handleGetCompactWitness(peer, packet) if err != nil { - return fmt.Errorf("failed to handle GetReducedWitnessPacket: %w", err) + return fmt.Errorf("failed to handle GetCompactWitnessPacket: %w", err) } - // Reply with reduced witness (different message code) - return peer.ReplyReducedWitness(packet.RequestId, &response) + // Reply with compact witness (different message code) + return peer.ReplyCompactWitness(packet.RequestId, &response) } else { // Handle regular witness request response, err = h.handleGetWitness(peer, packet) @@ -197,10 +197,10 @@ func (h *witHandler) handleGetWitness(peer *wit.Peer, req *wit.GetWitnessPacket) return response, nil } -// handleGetReducedWitness retrieves witnesses and filters them using the sliding window cache. +// handleGetCompactWitness retrieves witnesses and filters them using the sliding window cache. // This reduces bandwidth by omitting state nodes that the receiver should already have cached. -func (h *witHandler) handleGetReducedWitness(peer *wit.Peer, req *wit.GetWitnessPacket) (wit.WitnessPacketResponse, error) { - log.Debug("handleGetReducedWitness processing request", "peer", peer.ID(), "reqID", req.RequestId, "witnessPages", len(req.WitnessPages)) +func (h *witHandler) handleGetCompactWitness(peer *wit.Peer, req *wit.GetWitnessPacket) (wit.WitnessPacketResponse, error) { + log.Debug("handleGetCompactWitness processing request", "peer", peer.ID(), "reqID", req.RequestId, "witnessPages", len(req.WitnessPages)) // First, get the full witness data (same as regular handleGetWitness) fullResponse, err := h.handleGetWitness(peer, req) @@ -257,7 +257,7 @@ func (h *witHandler) handleGetReducedWitness(peer *wit.Peer, req *wit.GetWitness filteredSize := len(filteredBuf) reduction := float64(originalSize-filteredSize) * 100.0 / float64(originalSize) - log.Debug("Filtered witness for reduced transmission", + log.Debug("Filtered witness for compact transmission", "hash", witnessPage.Hash, "page", witnessPage.Page, "originalSize", originalSize, @@ -265,7 +265,7 @@ func (h *witHandler) handleGetReducedWitness(peer *wit.Peer, req *wit.GetWitness "reduction%", fmt.Sprintf("%.2f", reduction)) } - log.Debug("handleGetReducedWitness returning filtered witnesses", "peer", peer.ID(), "reqID", req.RequestId, "count", len(filteredResponse)) + log.Debug("handleGetCompactWitness returning filtered witnesses", "peer", peer.ID(), "reqID", req.RequestId, "count", len(filteredResponse)) return filteredResponse, nil } diff --git a/eth/peer.go b/eth/peer.go index e8a0e6ff17..d0bb1d1fe2 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -100,7 +100,7 @@ type WitnessPeer interface { AsyncSendNewWitness(witness *stateless.Witness) AsyncSendNewWitnessHash(hash common.Hash, number uint64) RequestWitness(witnessPages []wit.WitnessPageRequest, sink chan *wit.Response) (*wit.Request, error) - RequestReducedWitness(witnessPages []wit.WitnessPageRequest, sink chan *wit.Response) (*wit.Request, error) + RequestCompactWitness(witnessPages []wit.WitnessPageRequest, sink chan *wit.Response) (*wit.Request, error) RequestWitnessMetadata(hashes []common.Hash, sink chan *wit.Response) (*wit.Request, error) Close() ID() string @@ -112,7 +112,7 @@ type WitnessPeer interface { KnownWitnessesContains(witness *stateless.Witness) bool KnownWitnessContainsHash(hash common.Hash) bool ReplyWitness(requestID uint64, response *wit.WitnessPacketResponse) error - ReplyReducedWitness(requestID uint64, response *wit.WitnessPacketResponse) error + ReplyCompactWitness(requestID uint64, response *wit.WitnessPacketResponse) error } // witPeer is wrapper around wit.Peer to maintain a few extra metadata and test compatibility. @@ -289,7 +289,7 @@ func (p *ethPeer) requestWitnessPageCountLegacy(hash common.Hash) (uint64, error } // RequestWitnessesWithVerification requests witnesses with optional page count verification -func (p *ethPeer) RequestWitnessesWithVerification(hashes []common.Hash, dlResCh chan *eth.Response, verifyPageCount func(common.Hash, uint64, string) bool, useReduced bool) (*eth.Request, error) { +func (p *ethPeer) RequestWitnessesWithVerification(hashes []common.Hash, dlResCh chan *eth.Response, verifyPageCount func(common.Hash, uint64, string) bool, useCompact bool) (*eth.Request, error) { if p.witPeer == nil { return nil, errors.New("witness peer not found") } @@ -307,7 +307,7 @@ func (p *ethPeer) RequestWitnessesWithVerification(hashes []common.Hash, dlResCh var buildRequestMu sync.RWMutex // Build all the initial requests synchronously. - buildWitReqErr := p.buildWitnessRequests(hashes, &witReqs, &witReqsWg, witTotalPages, witTotalRequest, witReqResCh, witReqSem, &mapsMu, &buildRequestMu, failedRequests, useReduced) + buildWitReqErr := p.buildWitnessRequests(hashes, &witReqs, &witReqsWg, witTotalPages, witTotalRequest, witReqResCh, witReqSem, &mapsMu, &buildRequestMu, failedRequests, useCompact) if buildWitReqErr != nil { p.witPeer.Peer.Log().Error("Error in building witness requests", "peer", p.ID(), "err", buildWitReqErr) return nil, buildWitReqErr @@ -341,7 +341,7 @@ func (p *ethPeer) RequestWitnessesWithVerification(hashes []common.Hash, dlResCh reconstructedWitness := make(map[common.Hash]*stateless.Witness) var lastWitRes *wit.Response for witRes := range witReqResCh { - p.receiveWitnessPage(witRes, receivedWitPages, reconstructedWitness, hashes, &witReqs, &witReqsWg, witTotalPages, witTotalRequest, witReqResCh, witReqSem, &mapsMu, &buildRequestMu, failedRequests, downloadPaused, verifyPageCount, useReduced) + p.receiveWitnessPage(witRes, receivedWitPages, reconstructedWitness, hashes, &witReqs, &witReqsWg, witTotalPages, witTotalRequest, witReqResCh, witReqSem, &mapsMu, &buildRequestMu, failedRequests, downloadPaused, verifyPageCount, useCompact) <-witReqSem // Check if the Response is nil before accessing the Done channel. @@ -447,7 +447,7 @@ func (p *ethPeer) receiveWitnessPage( failedRequests map[common.Hash]map[uint64]witReqRetryCount, downloadPaused map[common.Hash]bool, verifyPageCount func(common.Hash, uint64, string) bool, - useReduced bool, + useCompact bool, ) (retrievedError error) { defer func() { // if fails map on retry count and request again @@ -467,7 +467,7 @@ func (p *ethPeer) receiveWitnessPage( // non blocking call to avoid race condition because of semaphore witReqsWg.Add(1) // protecting from not finishing before requests are built go func() { - buildWitReqErr := p.buildWitnessRequests(hashes, witReqs, witReqsWg, witTotalPages, witTotalRequest, witResCh, witReqSem, mapsMu, buildRequestMu, failedRequests, useReduced) + buildWitReqErr := p.buildWitnessRequests(hashes, witReqs, witReqsWg, witTotalPages, witTotalRequest, witResCh, witReqSem, mapsMu, buildRequestMu, failedRequests, useCompact) if buildWitReqErr != nil { p.witPeer.Peer.Log().Error("Error in building witness requests", "peer", p.ID(), "err", buildWitReqErr) } @@ -567,7 +567,7 @@ func (p *ethPeer) receiveWitnessPage( // non blocking call to avoid race condition because of semaphore witReqsWg.Add(1) // protecting from not finishing before requests are built go func() { - buildWitReqErr := p.buildWitnessRequests(hashes, witReqs, witReqsWg, witTotalPages, witTotalRequest, witResCh, witReqSem, mapsMu, buildRequestMu, failedRequests, useReduced) + buildWitReqErr := p.buildWitnessRequests(hashes, witReqs, witReqsWg, witTotalPages, witTotalRequest, witResCh, witReqSem, mapsMu, buildRequestMu, failedRequests, useCompact) if buildWitReqErr != nil { p.witPeer.Peer.Log().Error("Error in building witness requests", "peer", p.ID(), "err", buildWitReqErr) } @@ -606,7 +606,7 @@ func (p *ethPeer) buildWitnessRequests(hashes []common.Hash, mapsMu *sync.RWMutex, buildRequestMu *sync.RWMutex, failedRequests map[common.Hash]map[uint64]witReqRetryCount, - useReduced bool, + useCompact bool, ) error { buildRequestMu.Lock() defer buildRequestMu.Unlock() @@ -631,7 +631,7 @@ func (p *ethPeer) buildWitnessRequests(hashes []common.Hash, witReqSem, mapsMu, witTotalRequest, - useReduced, + useCompact, ); err != nil { return err } @@ -652,7 +652,7 @@ func (p *ethPeer) buildWitnessRequests(hashes []common.Hash, witReqSem, mapsMu, witTotalRequest, - useReduced, + useCompact, ); err != nil { return err } @@ -674,9 +674,9 @@ func (p *ethPeer) doWitnessRequest( witReqSem chan int, mapsMu *sync.RWMutex, witTotalRequest map[common.Hash]uint64, - useReduced bool, + useCompact bool, ) error { - p.witPeer.Peer.Log().Debug("RequestWitnesses building a wit request", "peer", p.ID(), "hash", hash, "page", page, "reduced", useReduced) + p.witPeer.Peer.Log().Debug("RequestWitnesses building a wit request", "peer", p.ID(), "hash", hash, "page", page, "compact", useCompact) witReqSem <- 1 witResCh := make(chan *wit.Response) request := []wit.WitnessPageRequest{{Hash: hash, Page: page}} @@ -684,15 +684,15 @@ func (p *ethPeer) doWitnessRequest( var witReq *wit.Request var err error - // Use reduced or full witness request based on parameter - if useReduced { - witReq, err = p.witPeer.Peer.RequestReducedWitness(request, witResCh) + // Use compact or full witness request based on parameter + if useCompact { + witReq, err = p.witPeer.Peer.RequestCompactWitness(request, witResCh) } else { witReq, err = p.witPeer.Peer.RequestWitness(request, witResCh) } if err != nil { - p.witPeer.Peer.Log().Error("Error in making wit request", "peer", p.ID(), "reduced", useReduced, "err", err) + p.witPeer.Peer.Log().Error("Error in making wit request", "peer", p.ID(), "compact", useCompact, "err", err) return err } go func() { diff --git a/eth/peer_mock.go b/eth/peer_mock.go index d77592bc98..ec15292a08 100644 --- a/eth/peer_mock.go +++ b/eth/peer_mock.go @@ -169,18 +169,18 @@ func (mr *MockWitnessPeerMockRecorder) Log() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Log", reflect.TypeOf((*MockWitnessPeer)(nil).Log)) } -// ReplyReducedWitness mocks base method. -func (m *MockWitnessPeer) ReplyReducedWitness(requestID uint64, response *wit.WitnessPacketResponse) error { +// ReplyCompactWitness mocks base method. +func (m *MockWitnessPeer) ReplyCompactWitness(requestID uint64, response *wit.WitnessPacketResponse) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ReplyReducedWitness", requestID, response) + ret := m.ctrl.Call(m, "ReplyCompactWitness", requestID, response) ret0, _ := ret[0].(error) return ret0 } -// ReplyReducedWitness indicates an expected call of ReplyReducedWitness. -func (mr *MockWitnessPeerMockRecorder) ReplyReducedWitness(requestID, response interface{}) *gomock.Call { +// ReplyCompactWitness indicates an expected call of ReplyCompactWitness. +func (mr *MockWitnessPeerMockRecorder) ReplyCompactWitness(requestID, response interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReplyReducedWitness", reflect.TypeOf((*MockWitnessPeer)(nil).ReplyReducedWitness), requestID, response) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReplyCompactWitness", reflect.TypeOf((*MockWitnessPeer)(nil).ReplyCompactWitness), requestID, response) } // ReplyWitness mocks base method. @@ -197,19 +197,19 @@ func (mr *MockWitnessPeerMockRecorder) ReplyWitness(requestID, response interfac return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReplyWitness", reflect.TypeOf((*MockWitnessPeer)(nil).ReplyWitness), requestID, response) } -// RequestReducedWitness mocks base method. -func (m *MockWitnessPeer) RequestReducedWitness(witnessPages []wit.WitnessPageRequest, sink chan *wit.Response) (*wit.Request, error) { +// RequestCompactWitness mocks base method. +func (m *MockWitnessPeer) RequestCompactWitness(witnessPages []wit.WitnessPageRequest, sink chan *wit.Response) (*wit.Request, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "RequestReducedWitness", witnessPages, sink) + ret := m.ctrl.Call(m, "RequestCompactWitness", witnessPages, sink) ret0, _ := ret[0].(*wit.Request) ret1, _ := ret[1].(error) return ret0, ret1 } -// RequestReducedWitness indicates an expected call of RequestReducedWitness. -func (mr *MockWitnessPeerMockRecorder) RequestReducedWitness(witnessPages, sink interface{}) *gomock.Call { +// RequestCompactWitness indicates an expected call of RequestCompactWitness. +func (mr *MockWitnessPeerMockRecorder) RequestCompactWitness(witnessPages, sink interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequestReducedWitness", reflect.TypeOf((*MockWitnessPeer)(nil).RequestReducedWitness), witnessPages, sink) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequestCompactWitness", reflect.TypeOf((*MockWitnessPeer)(nil).RequestCompactWitness), witnessPages, sink) } // RequestWitness mocks base method. diff --git a/eth/protocols/wit/handler.go b/eth/protocols/wit/handler.go index f44c3e94ba..e47ff2e6fe 100644 --- a/eth/protocols/wit/handler.go +++ b/eth/protocols/wit/handler.go @@ -131,8 +131,8 @@ var wit2 = map[uint64]msgHandler{ NewWitnessHashesMsg: handleNewWitnessHashes, GetWitnessMetadataMsg: handleGetWitnessMetadata, WitnessMetadataMsg: handleWitnessMetadata, - GetReducedWitnessMsg: handleGetReducedWitness, - ReducedWitnessMsg: handleReducedWitness, + GetCompactWitnessMsg: handleGetCompactWitness, + CompactWitnessMsg: handleCompactWitness, } // HandleMessage is invoked whenever an inbound message is received from a diff --git a/eth/protocols/wit/handlers.go b/eth/protocols/wit/handlers.go index d529129f33..9f71a33305 100644 --- a/eth/protocols/wit/handlers.go +++ b/eth/protocols/wit/handlers.go @@ -100,41 +100,41 @@ func handleWitnessMetadata(backend Backend, msg Decoder, peer *Peer) error { return peer.dispatchResponse(res, nil) } -// handleGetReducedWitness processes a GetReducedWitnessPacket request from a peer. +// handleGetCompactWitness processes a GetCompactWitnessPacket request from a peer. // Reuses GetWitnessPacket structure since format is identical. -func handleGetReducedWitness(backend Backend, msg Decoder, peer *Peer) error { - // Decode the GetWitnessPacket request (same structure for reduced witness) +func handleGetCompactWitness(backend Backend, msg Decoder, peer *Peer) error { + // Decode the GetWitnessPacket request (same structure for compact witness) req := new(GetWitnessPacket) if err := msg.Decode(&req); err != nil { - return fmt.Errorf("failed to decode GetReducedWitnessPacket: %w", err) + return fmt.Errorf("failed to decode GetCompactWitnessPacket: %w", err) } // Validate request parameters if len(req.WitnessPages) == 0 { - return fmt.Errorf("invalid GetReducedWitnessPacket: WitnessPages cannot be empty") + return fmt.Errorf("invalid GetCompactWitnessPacket: WitnessPages cannot be empty") } return backend.Handle(peer, req) } -// handleReducedWitness processes an incoming reduced witness response from a peer. +// handleCompactWitness processes an incoming compact witness response from a peer. // Reuses WitnessPacketRLPPacket structure since format is identical. -func handleReducedWitness(backend Backend, msg Decoder, peer *Peer) error { - // Decode the WitnessPacketRLPPacket response (same structure for reduced witness) +func handleCompactWitness(backend Backend, msg Decoder, peer *Peer) error { + // Decode the WitnessPacketRLPPacket response (same structure for compact witness) packet := new(WitnessPacketRLPPacket) if err := msg.Decode(packet); err != nil { - log.Error("Failed to decode reduced witness response packet", "err", err) + log.Error("Failed to decode compact witness response packet", "err", err) return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) } // Construct the response object res := &Response{ id: packet.RequestId, - code: ReducedWitnessMsg, // Different message code for routing + code: CompactWitnessMsg, // Different message code for routing Res: packet, } // Forward the response to the dispatcher - log.Debug("Dispatching reduced witness response packet", "peer", peer.ID(), "reqID", packet.RequestId, "count", len(packet.WitnessPacketResponse)) + log.Debug("Dispatching compact witness response packet", "peer", peer.ID(), "reqID", packet.RequestId, "count", len(packet.WitnessPacketResponse)) return peer.dispatchResponse(res, nil) } diff --git a/eth/protocols/wit/peer.go b/eth/protocols/wit/peer.go index 5ed969fbab..84e7e6b0ae 100644 --- a/eth/protocols/wit/peer.go +++ b/eth/protocols/wit/peer.go @@ -179,25 +179,25 @@ func (p *Peer) RequestWitnessMetadata(hashes []common.Hash, sink chan *Response) return req, nil } -// RequestReducedWitness sends a request to the peer for reduced witnesses (omits cached states). -// Reuses GetWitnessPacket structure with IsReduced flag set to true. -func (p *Peer) RequestReducedWitness(witnessPages []WitnessPageRequest, sink chan *Response) (*Request, error) { +// RequestCompactWitness sends a request to the peer for compact witnesses (omits cached states). +// Reuses GetWitnessPacket structure with Compact flag set to true. +func (p *Peer) RequestCompactWitness(witnessPages []WitnessPageRequest, sink chan *Response) (*Request, error) { p.lock.Lock() defer p.lock.Unlock() - log.Debug("Requesting reduced witness", "peer", p.id, "pages", len(witnessPages)) + log.Debug("Requesting compact witness", "peer", p.id, "pages", len(witnessPages)) id := rand.Uint64() req := &Request{ id: id, sink: sink, - code: GetReducedWitnessMsg, // Different message code - want: ReducedWitnessMsg, + code: GetCompactWitnessMsg, // Different message code + want: CompactWitnessMsg, data: &GetWitnessPacket{ RequestId: id, GetWitnessRequest: &GetWitnessRequest{ WitnessPages: witnessPages, - IsReduced: true, // Mark as reduced witness request + Compact: true, // Mark as compact witness request }, }, } @@ -288,16 +288,16 @@ func (p *Peer) ReplyWitnessMetadata(requestID uint64, metadata []WitnessMetadata }) } -// ReplyReducedWitness sends a reduced witness response to the peer. -// Uses ReducedWitnessMsg code but same packet structure as regular witness. -func (p *Peer) ReplyReducedWitness(requestID uint64, response *WitnessPacketResponse) error { +// ReplyCompactWitness sends a compact witness response to the peer. +// Uses CompactWitnessMsg code but same packet structure as regular witness. +func (p *Peer) ReplyCompactWitness(requestID uint64, response *WitnessPacketResponse) error { p.lock.Lock() defer p.lock.Unlock() - log.Debug("Replying with reduced witness", "peer", p.id, "reqID", requestID, "pages", len(*response)) + log.Debug("Replying with compact witness", "peer", p.id, "reqID", requestID, "pages", len(*response)) - // Send the response with ReducedWitnessMsg code - return p2p.Send(p.rw, ReducedWitnessMsg, &WitnessPacketRLPPacket{ + // Send the response with CompactWitnessMsg code + return p2p.Send(p.rw, CompactWitnessMsg, &WitnessPacketRLPPacket{ RequestId: requestID, WitnessPacketResponse: *response, }) diff --git a/eth/protocols/wit/protocol.go b/eth/protocols/wit/protocol.go index 95d74f88e6..f91563988d 100644 --- a/eth/protocols/wit/protocol.go +++ b/eth/protocols/wit/protocol.go @@ -35,8 +35,8 @@ const ( MsgWitness = 0x03 GetWitnessMetadataMsg = 0x04 WitnessMetadataMsg = 0x05 - GetReducedWitnessMsg = 0x06 // Request reduced witness (omits cached states) - ReducedWitnessMsg = 0x07 // Response with reduced witness data + GetCompactWitnessMsg = 0x06 // Request compact witness (omits cached states) + CompactWitnessMsg = 0x07 // Response with compact witness data ) var ( @@ -52,10 +52,10 @@ type Packet interface { } // GetWitnessRequest represents a list of witnesses query by witness pages. -// Also used for reduced witness requests (distinguished by IsReduced flag). +// Also used for compact witness requests (distinguished by Compact flag). type GetWitnessRequest struct { WitnessPages []WitnessPageRequest // Request by list of witness pages - IsReduced bool // True if requesting reduced witness (omits cached states) + Compact bool // True if requesting compact witness (omits cached states) } type WitnessPageRequest struct { @@ -64,14 +64,14 @@ type WitnessPageRequest struct { } // GetWitnessPacket represents a witness query with request ID wrapping. -// Also used for reduced witness requests (same structure). +// Also used for compact witness requests (same structure). type GetWitnessPacket struct { RequestId uint64 *GetWitnessRequest } // WitnessPacketRLPPacket represents a witness response with request ID wrapping. -// Also used for reduced witness responses (same structure). +// Also used for compact witness responses (same structure). type WitnessPacketRLPPacket struct { RequestId uint64 WitnessPacketResponse @@ -79,7 +79,7 @@ type WitnessPacketRLPPacket struct { // WitnessPacketResponse represents a witness response, to use when we already // have the witness rlp encoded. -// Also used for reduced witness responses (same structure). +// Also used for compact witness responses (same structure). type WitnessPacketResponse []WitnessPageResponse type WitnessPageResponse struct { From f05239e56c2a775359eb1bd62bba46391fd1c027 Mon Sep 17 00:00:00 2001 From: Pratik Patil Date: Thu, 30 Oct 2025 16:41:12 +0530 Subject: [PATCH 12/22] core: made compactWitnessCacheWindowSize and compactWitnessCacheOverlapSize as constant --- core/blockchain.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 9073655a80..d0b25753ca 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -144,6 +144,11 @@ const ( maxFutureBlocks = 256 maxTimeFutureBlocks = 30 + // Compact witness cache parameters (consensus-critical) + // These values must be identical across all nodes for compact witness protocol to work + compactWitnessCacheWindowSize = 20 // Blocks per cache window + compactWitnessCacheOverlapSize = 10 // Overlap between consecutive windows + // BlockChainVersion ensures that an incompatible database forces a resync from scratch. // // Changelog: @@ -366,8 +371,6 @@ type BlockChain struct { activeCacheMap map[string]struct{} // Currently active cache map nextCacheMap map[string]struct{} // Pre-warming map for next window cacheWindowStart uint64 // Start block of current window - cacheWindowSize uint64 // Size of each window (e.g., 20 blocks) - cacheOverlapSize uint64 // Overlap between windows (e.g., 10 blocks) cacheLock sync.RWMutex // Lock for cache operations wg sync.WaitGroup @@ -452,8 +455,6 @@ func NewBlockChain(db ethdb.Database, genesis *Genesis, engine consensus.Engine, activeCacheMap: make(map[string]struct{}), nextCacheMap: make(map[string]struct{}), cacheWindowStart: 0, - cacheWindowSize: 20, // 20 blocks per window - cacheOverlapSize: 10, // 10 blocks overlap borReceiptsCache: lru.NewCache[common.Hash, *types.Receipt](receiptsCacheLimit), borReceiptsRLPCache: lru.NewCache[common.Hash, rlp.RawValue](receiptsCacheLimit), @@ -4193,7 +4194,7 @@ func (bc *BlockChain) mergeSpanCacheIntoWitness(witness *stateless.Witness) int func (bc *BlockChain) manageSlidingWindow(blockNum uint64) bool { blocksSinceWindowStart := blockNum - bc.cacheWindowStart - if blocksSinceWindowStart >= bc.cacheWindowSize { + if blocksSinceWindowStart >= compactWitnessCacheWindowSize { // Time to slide the window: discard active, promote next, create new next bc.cacheLock.Lock() oldActiveSize := len(bc.activeCacheMap) @@ -4221,7 +4222,7 @@ func (bc *BlockChain) GetCacheWindowStart() uint64 { // GetCacheWindowSize returns the cache window size. func (bc *BlockChain) GetCacheWindowSize() uint64 { - return bc.cacheWindowSize + return compactWitnessCacheWindowSize } // FilterWitnessWithSlidingCache filters a witness by removing state nodes present in the sliding window cache. @@ -4266,7 +4267,7 @@ func (bc *BlockChain) updateSlidingWindowCache(blockNum uint64, witness *statele // Determine if we're in overlap period (should cache to both maps) blocksSinceWindowStart := blockNum - bc.cacheWindowStart - inOverlapPeriod := blocksSinceWindowStart >= bc.cacheOverlapSize + inOverlapPeriod := blocksSinceWindowStart >= compactWitnessCacheOverlapSize cachedToActive := 0 cachedToNext := 0 From 40a905135b4bdf467d53e9d24c73fa65166b9010 Mon Sep 17 00:00:00 2001 From: Pratik Patil Date: Thu, 30 Oct 2025 16:50:16 +0530 Subject: [PATCH 13/22] core: added timer metric here to understand the performance impact of constructing the caches --- core/blockchain.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/blockchain.go b/core/blockchain.go index d0b25753ca..69a4c4f2ad 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -128,6 +128,8 @@ var ( blockPrefetchTxsInvalidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/invalid", nil) blockPrefetchTxsValidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/valid", nil) + compactWitnessCacheUpdateTimer = metrics.NewRegisteredTimer("chain/witness/compactcache/update", nil) + errInsertionInterrupted = errors.New("insertion is interrupted") errChainStopped = errors.New("blockchain is stopped") errInvalidOldChain = errors.New("invalid old chain") @@ -4258,6 +4260,9 @@ func (bc *BlockChain) FilterWitnessWithSlidingCache(witness *stateless.Witness) // updateSlidingWindowCache updates the sliding window cache after processing a block. // This is used by both full nodes and witness-receiving nodes to maintain synchronized caches. func (bc *BlockChain) updateSlidingWindowCache(blockNum uint64, witness *stateless.Witness, statedb *state.StateDB) { + start := time.Now() + defer func() { compactWitnessCacheUpdateTimer.UpdateSince(start) }() + if witness == nil && statedb == nil { return } From bee613647511c1778d25960cf4361e1bab510f50 Mon Sep 17 00:00:00 2001 From: Pratik Patil Date: Thu, 30 Oct 2025 17:16:50 +0530 Subject: [PATCH 14/22] eth: added caching for compact witnesses --- eth/handler.go | 14 ++++++++++ eth/handler_wit.go | 65 +++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 78 insertions(+), 1 deletion(-) diff --git a/eth/handler.go b/eth/handler.go index 60ec318276..5d4fa51e85 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -28,6 +28,7 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/lru" "github.com/ethereum/go-ethereum/consensus/beacon" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/forkid" @@ -157,6 +158,11 @@ type handler struct { syncWithWitnesses bool syncAndProduceWitnesses bool // Whether to sync blocks and produce witnesses simultaneously + // Compact witness cache: caches filtered witnesses to avoid recomputation for multiple peers + // Cache key: witnessHash + cacheWindowStart (as cache validity depends on sliding window state) + compactWitnessCache *lru.Cache[compactWitnessCacheKey, []byte] + compactWitnessCacheLock sync.RWMutex + // channels for fetcher, syncer, txsyncLoop quitSync chan struct{} @@ -167,6 +173,13 @@ type handler struct { handlerDoneCh chan struct{} } +// compactWitnessCacheKey uniquely identifies a filtered compact witness +// The filtered witness is valid only for a specific cacheWindowStart +type compactWitnessCacheKey struct { + hash common.Hash + windowStart uint64 +} + // newHandler returns a handler for all Ethereum chain management protocol. func newHandler(config *handlerConfig) (*handler, error) { // Create the protocol manager with the base fields @@ -192,6 +205,7 @@ func newHandler(config *handlerConfig) (*handler, error) { handlerStartCh: make(chan struct{}), syncWithWitnesses: config.syncWithWitnesses, syncAndProduceWitnesses: config.syncAndProduceWitnesses, + compactWitnessCache: lru.NewCache[compactWitnessCacheKey, []byte](256), // Cache up to 256 filtered witnesses } log.Info("Sync with witnesses", "enabled", config.syncWithWitnesses) diff --git a/eth/handler_wit.go b/eth/handler_wit.go index 1c9fc02d9a..27fdd494e1 100644 --- a/eth/handler_wit.go +++ b/eth/handler_wit.go @@ -12,9 +12,15 @@ import ( "github.com/ethereum/go-ethereum/core/stateless" "github.com/ethereum/go-ethereum/eth/protocols/wit" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p/enode" ) +var ( + compactWitnessCacheHitMeter = metrics.NewRegisteredMeter("eth/witness/compactcache/hit", nil) + compactWitnessCacheMissMeter = metrics.NewRegisteredMeter("eth/witness/compactcache/miss", nil) +) + const ( // witnessRequestTimeout defines how long to wait for an in-flight witness computation. witnessRequestTimeout = 5 * time.Second @@ -202,6 +208,9 @@ func (h *witHandler) handleGetWitness(peer *wit.Peer, req *wit.GetWitnessPacket) func (h *witHandler) handleGetCompactWitness(peer *wit.Peer, req *wit.GetWitnessPacket) (wit.WitnessPacketResponse, error) { log.Debug("handleGetCompactWitness processing request", "peer", peer.ID(), "reqID", req.RequestId, "witnessPages", len(req.WitnessPages)) + // Get current cache window start for cache key + cacheWindowStart := h.chain.GetCacheWindowStart() + // First, get the full witness data (same as regular handleGetWitness) fullResponse, err := h.handleGetWitness(peer, req) if err != nil { @@ -218,7 +227,33 @@ func (h *witHandler) handleGetCompactWitness(peer *wit.Peer, req *wit.GetWitness continue } - // Decode the witness from the RLP data + // Check cache first + cacheKey := compactWitnessCacheKey{ + hash: witnessPage.Hash, + windowStart: cacheWindowStart, + } + + (*handler)(h).compactWitnessCacheLock.RLock() + cachedFiltered, cacheHit := (*handler)(h).compactWitnessCache.Get(cacheKey) + (*handler)(h).compactWitnessCacheLock.RUnlock() + + if cacheHit { + compactWitnessCacheHitMeter.Mark(1) + log.Debug("Compact witness cache hit", "hash", witnessPage.Hash, "windowStart", cacheWindowStart) + + filteredPage := wit.WitnessPageResponse{ + Data: cachedFiltered, + Hash: witnessPage.Hash, + Page: witnessPage.Page, + TotalPages: witnessPage.TotalPages, + } + filteredResponse = append(filteredResponse, filteredPage) + continue + } + + compactWitnessCacheMissMeter.Mark(1) + + // Cache miss: decode, filter, and encode witness, err := stateless.GetWitnessFromRlp(witnessPage.Data) if err != nil { log.Warn("Failed to decode witness for filtering", "hash", witnessPage.Hash, "err", err) @@ -244,6 +279,11 @@ func (h *witHandler) handleGetCompactWitness(peer *wit.Peer, req *wit.GetWitness filteredBuf = buf.Bytes() } + // Store in cache + (*handler)(h).compactWitnessCacheLock.Lock() + (*handler)(h).compactWitnessCache.Add(cacheKey, filteredBuf) + (*handler)(h).compactWitnessCacheLock.Unlock() + // Create filtered page response filteredPage := wit.WitnessPageResponse{ Data: filteredBuf, @@ -287,6 +327,29 @@ func (h *witHandler) filterWitnessWithCache(witness *stateless.Witness) *statele return filtered } +// ClearStaleCompactWitnessCache removes cache entries that are no longer valid +// due to sliding window movement. Should be called when cache window slides. +func (h *witHandler) ClearStaleCompactWitnessCache(currentWindowStart uint64) { + (*handler)(h).compactWitnessCacheLock.Lock() + defer (*handler)(h).compactWitnessCacheLock.Unlock() + + // Get all keys and remove those with old windowStart + keys := (*handler)(h).compactWitnessCache.Keys() + removed := 0 + for _, key := range keys { + if key.windowStart < currentWindowStart { + (*handler)(h).compactWitnessCache.Remove(key) + removed++ + } + } + + if removed > 0 { + log.Debug("Cleared stale compact witness cache entries", + "removed", removed, + "currentWindowStart", currentWindowStart) + } +} + // handleGetWitnessMetadata retrieves only the metadata (page count, size, block number) for the requested witness hashes. // This is efficient for verification purposes where we don't need the actual witness data. func (h *witHandler) handleGetWitnessMetadata(peer *wit.Peer, req *wit.GetWitnessMetadataPacket) ([]wit.WitnessMetadataResponse, error) { From ee7342f6610eb94cdd70140521a4749508dd267f Mon Sep 17 00:00:00 2001 From: Pratik Patil Date: Fri, 31 Oct 2025 10:04:18 +0530 Subject: [PATCH 15/22] core, eth: made parallel stateless import and compact witness (witness state caching) mutually exclusive --- core/blockchain.go | 35 +++++++++++++++++++++-------------- eth/handler_eth.go | 6 ++++++ eth/handler_wit.go | 15 ++++++++++++--- 3 files changed, 39 insertions(+), 17 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 69a4c4f2ad..985e558e06 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -4343,19 +4343,23 @@ func (bc *BlockChain) ProcessBlockWithWitnesses(block *types.Block, witness *sta blockNum := block.Number().Uint64() - // Manage sliding window (slide if needed) - bc.manageSlidingWindow(blockNum) - - // Merge cached states into witness for blocks that aren't at window start - if blockNum != bc.cacheWindowStart { - witnessStatesBefore := len(witness.State) - mergedCount := bc.mergeSpanCacheIntoWitness(witness) - if mergedCount > 0 { - log.Debug("Merged cached states into witness", - "block", blockNum, - "witnessStatesBefore", witnessStatesBefore, - "mergedFromCache", mergedCount, - "witnessStatesAfter", len(witness.State)) + // Compact witness cache operations are only compatible with sequential import + // In parallel mode, blocks are processed concurrently which breaks cache assumptions + if !bc.parallelStatelessImportEnabled.Load() { + // Manage sliding window (slide if needed) + bc.manageSlidingWindow(blockNum) + + // Merge cached states into witness for blocks that aren't at window start + if blockNum != bc.cacheWindowStart { + witnessStatesBefore := len(witness.State) + mergedCount := bc.mergeSpanCacheIntoWitness(witness) + if mergedCount > 0 { + log.Debug("Merged cached states into witness", + "block", blockNum, + "witnessStatesBefore", witnessStatesBefore, + "mergedFromCache", mergedCount, + "witnessStatesAfter", len(witness.State)) + } } } @@ -4403,7 +4407,10 @@ func (bc *BlockChain) ProcessBlockWithWitnesses(block *types.Block, witness *sta } // Update sliding window cache with witness and execution data - bc.updateSlidingWindowCache(blockNum, witness, statedb) + // Only in sequential mode - parallel import is incompatible with cache + if !bc.parallelStatelessImportEnabled.Load() { + bc.updateSlidingWindowCache(blockNum, witness, statedb) + } return statedb, res, nil } diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 7551195bed..09492772a9 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -154,6 +154,12 @@ func (h *ethHandler) handleBlockAnnounces(peer *eth.Peer, hashes []common.Hash, func (h *ethHandler) shouldRequestCompactWitness(blockNum uint64) bool { // TODO(@pratikspatil024) - handle node out of sync case + // Compact witness requires sequential block processing + // If parallel import is enabled, cache won't be maintained, so always request full witness + if h.chain.IsParallelStatelessImportEnabled() { + return false + } + // Get the blockchain's current window start windowStart := h.chain.GetCacheWindowStart() windowSize := h.chain.GetCacheWindowSize() diff --git a/eth/handler_wit.go b/eth/handler_wit.go index 27fdd494e1..92dfacfcba 100644 --- a/eth/handler_wit.go +++ b/eth/handler_wit.go @@ -208,15 +208,24 @@ func (h *witHandler) handleGetWitness(peer *wit.Peer, req *wit.GetWitnessPacket) func (h *witHandler) handleGetCompactWitness(peer *wit.Peer, req *wit.GetWitnessPacket) (wit.WitnessPacketResponse, error) { log.Debug("handleGetCompactWitness processing request", "peer", peer.ID(), "reqID", req.RequestId, "witnessPages", len(req.WitnessPages)) - // Get current cache window start for cache key - cacheWindowStart := h.chain.GetCacheWindowStart() - // First, get the full witness data (same as regular handleGetWitness) fullResponse, err := h.handleGetWitness(peer, req) if err != nil { return nil, err } + // If parallel import is enabled on this node, cache isn't maintained + // Return full witness instead of trying to filter + if h.chain.IsParallelStatelessImportEnabled() { + log.Debug("Parallel import enabled, returning full witness instead of compact", + "peer", peer.ID(), + "reqID", req.RequestId) + return fullResponse, nil + } + + // Get current cache window start for cache key + cacheWindowStart := h.chain.GetCacheWindowStart() + // Now filter each witness page by removing cached states var filteredResponse wit.WitnessPacketResponse From 61b1a40cedcb9e719aece0a5bdb1e82d86130c9a Mon Sep 17 00:00:00 2001 From: Pratik Patil Date: Fri, 31 Oct 2025 10:22:41 +0530 Subject: [PATCH 16/22] core: bug fix in witness caching - prevent cache bloat from recaching merged states --- core/blockchain.go | 38 +++++++++++++++++++++++++++++--------- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 985e558e06..b830c6d279 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2250,8 +2250,13 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. // Manage sliding window for full nodes (mirrors witness-receiving nodes) bc.manageSlidingWindow(block.NumberU64()) - // Update sliding window cache - allows full nodes to send reduced witnesses - bc.updateSlidingWindowCache(block.NumberU64(), statedb.Witness(), statedb) + // Update sliding window cache - allows full nodes to send compact witnesses + // For full nodes, witness is generated during execution (no merging), so pass witness.State directly + var witnessStates map[string]struct{} + if witness := statedb.Witness(); witness != nil { + witnessStates = witness.State + } + bc.updateSlidingWindowCache(block.NumberU64(), witnessStates, statedb) } else { log.Debug("No witness to write", "block", block.NumberU64()) } @@ -4259,11 +4264,13 @@ func (bc *BlockChain) FilterWitnessWithSlidingCache(witness *stateless.Witness) // updateSlidingWindowCache updates the sliding window cache after processing a block. // This is used by both full nodes and witness-receiving nodes to maintain synchronized caches. -func (bc *BlockChain) updateSlidingWindowCache(blockNum uint64, witness *stateless.Witness, statedb *state.StateDB) { +// originalWitnessStates should contain ONLY the states from the original compact witness (before merging), +// to avoid re-caching already-cached states and causing cache bloat. +func (bc *BlockChain) updateSlidingWindowCache(blockNum uint64, originalWitnessStates map[string]struct{}, statedb *state.StateDB) { start := time.Now() defer func() { compactWitnessCacheUpdateTimer.UpdateSince(start) }() - if witness == nil && statedb == nil { + if originalWitnessStates == nil && statedb == nil { return } @@ -4277,9 +4284,10 @@ func (bc *BlockChain) updateSlidingWindowCache(blockNum uint64, witness *statele cachedToActive := 0 cachedToNext := 0 - // Cache witness state nodes if available - if witness != nil { - for stateNode := range witness.State { + // Cache ORIGINAL witness state nodes (not merged states) + // This ensures we only cache new states for this block, not re-cache old states + if originalWitnessStates != nil { + for stateNode := range originalWitnessStates { // Add to active map if _, exists := bc.activeCacheMap[stateNode]; !exists { bc.activeCacheMap[stateNode] = struct{}{} @@ -4343,6 +4351,17 @@ func (bc *BlockChain) ProcessBlockWithWitnesses(block *types.Block, witness *sta blockNum := block.Number().Uint64() + // Track original witness states before any merging (for cache update later) + // We only want to cache NEW states from this block, not re-cache merged states + var originalWitnessStates map[string]struct{} + if !bc.parallelStatelessImportEnabled.Load() { + // Copy original witness states before merging + originalWitnessStates = make(map[string]struct{}, len(witness.State)) + for stateNode := range witness.State { + originalWitnessStates[stateNode] = struct{}{} + } + } + // Compact witness cache operations are only compatible with sequential import // In parallel mode, blocks are processed concurrently which breaks cache assumptions if !bc.parallelStatelessImportEnabled.Load() { @@ -4406,10 +4425,11 @@ func (bc *BlockChain) ProcessBlockWithWitnesses(block *types.Block, witness *sta return nil, nil, err } - // Update sliding window cache with witness and execution data + // Update sliding window cache with ORIGINAL witness states and execution data // Only in sequential mode - parallel import is incompatible with cache + // Pass originalWitnessStates (not full witness) to avoid re-caching merged states if !bc.parallelStatelessImportEnabled.Load() { - bc.updateSlidingWindowCache(blockNum, witness, statedb) + bc.updateSlidingWindowCache(blockNum, originalWitnessStates, statedb) } return statedb, res, nil From eddf23582c06031e9472efe8cf965f1cd4c104c5 Mon Sep 17 00:00:00 2001 From: Pratik Patil Date: Fri, 31 Oct 2025 11:24:14 +0530 Subject: [PATCH 17/22] core, eth: bug fix - setting the correct start block accross the nodes --- core/blockchain.go | 20 +++++++++++++++----- eth/handler_eth.go | 20 +++++++++----------- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index b830c6d279..7ae4abbe1c 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -456,7 +456,7 @@ func NewBlockChain(db ethdb.Database, genesis *Genesis, engine consensus.Engine, // Initialize sliding window cache for witness bandwidth reduction activeCacheMap: make(map[string]struct{}), nextCacheMap: make(map[string]struct{}), - cacheWindowStart: 0, + cacheWindowStart: 0, // Will be set to consensus-aligned value on first block processing borReceiptsCache: lru.NewCache[common.Hash, *types.Receipt](receiptsCacheLimit), borReceiptsRLPCache: lru.NewCache[common.Hash, rlp.RawValue](receiptsCacheLimit), @@ -4196,23 +4196,33 @@ func (bc *BlockChain) mergeSpanCacheIntoWitness(witness *stateless.Witness) int return mergedCount } +// calculateCacheWindowStart returns the consensus-aligned window start for a given block number. +// This ensures all nodes agree on window boundaries regardless of when they started syncing. +func calculateCacheWindowStart(blockNum uint64) uint64 { + return (blockNum / compactWitnessCacheWindowSize) * compactWitnessCacheWindowSize +} + // manageSlidingWindow handles window sliding logic. // Returns true if window was slid, false otherwise. func (bc *BlockChain) manageSlidingWindow(blockNum uint64) bool { - blocksSinceWindowStart := blockNum - bc.cacheWindowStart + // Calculate consensus-aligned window start for this block + expectedWindowStart := calculateCacheWindowStart(blockNum) - if blocksSinceWindowStart >= compactWitnessCacheWindowSize { + // Check if we need to slide to a new window + if bc.cacheWindowStart != expectedWindowStart { // Time to slide the window: discard active, promote next, create new next bc.cacheLock.Lock() oldActiveSize := len(bc.activeCacheMap) + oldWindowStart := bc.cacheWindowStart bc.activeCacheMap = bc.nextCacheMap bc.nextCacheMap = make(map[string]struct{}) - bc.cacheWindowStart = blockNum + bc.cacheWindowStart = expectedWindowStart bc.cacheLock.Unlock() log.Info("Sliding window: switched to next cache map", "block", blockNum, - "newWindowStart", blockNum, + "oldWindowStart", oldWindowStart, + "newWindowStart", expectedWindowStart, "discardedSize", oldActiveSize, "newActiveSize", len(bc.activeCacheMap)) return true diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 09492772a9..b30e7b0b54 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -160,22 +160,20 @@ func (h *ethHandler) shouldRequestCompactWitness(blockNum uint64) bool { return false } - // Get the blockchain's current window start - windowStart := h.chain.GetCacheWindowStart() + // Calculate the consensus-aligned window start for the requested block + // This must match the sender's calculation to ensure cache consistency windowSize := h.chain.GetCacheWindowSize() + expectedWindowStart := (blockNum / windowSize) * windowSize - // If block is at window start, need full witness (cache was just cleared/slid) - if blockNum == windowStart { + // If block is at window start, need full witness (this is first block of window) + // The sender will have just cleared/slid their cache at this block + if blockNum == expectedWindowStart { return false } - // If block is within current window but not at start, can use compact witness - if blockNum > windowStart && blockNum < windowStart+windowSize { - return true - } - - // Block is outside current window, need full witness - return false + // Block is within window but not at start, can use compact witness + // The sender should have cached states from earlier blocks in this window + return true } // verifyPageCount verifies the witness page count for a given block hash by From f95b0d659f2f13161ea9f61c2e21ed51995a5c8c Mon Sep 17 00:00:00 2001 From: Pratik Patil Date: Mon, 3 Nov 2025 12:51:28 +0530 Subject: [PATCH 18/22] eth: use full witness if peer is wit/1 --- eth/peer.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/eth/peer.go b/eth/peer.go index d0bb1d1fe2..38b5499954 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -676,6 +676,13 @@ func (p *ethPeer) doWitnessRequest( witTotalRequest map[common.Hash]uint64, useCompact bool, ) error { + // Compact witness requires WIT2 protocol support + // Fallback to full witness for WIT1 peers for backward compatibility + if useCompact && p.witPeer.Peer.Version() < wit.WIT2 { + p.witPeer.Peer.Log().Debug("Peer doesn't support WIT2, falling back to full witness", "peer", p.ID(), "version", p.witPeer.Peer.Version()) + useCompact = false + } + p.witPeer.Peer.Log().Debug("RequestWitnesses building a wit request", "peer", p.ID(), "hash", hash, "page", page, "compact", useCompact) witReqSem <- 1 witResCh := make(chan *wit.Response) From 6127cb19664b857ca9f956c8deee84343fe75d18 Mon Sep 17 00:00:00 2001 From: Pratik Patil Date: Mon, 3 Nov 2025 16:40:23 +0530 Subject: [PATCH 19/22] eth: bug fix: fixed compatibility between wit/1 and wit/2 caused by updating GetWitnessRequest --- core/blockchain.go | 6 +++-- eth/handler_wit.go | 41 +++++++++++++++++------------------ eth/peer.go | 2 +- eth/protocols/wit/handlers.go | 16 +++++++++----- eth/protocols/wit/peer.go | 3 +-- eth/protocols/wit/protocol.go | 11 +++++++--- 6 files changed, 45 insertions(+), 34 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 8e37378dbe..a4340bb85c 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -4238,6 +4238,7 @@ func (bc *BlockChain) manageSlidingWindow(blockNum uint64) bool { "newActiveSize", len(bc.activeCacheMap)) return true } + return false } @@ -4371,11 +4372,12 @@ func (bc *BlockChain) ProcessBlockWithWitnesses(block *types.Block, witness *sta } blockNum := block.Number().Uint64() + parallelMode := bc.parallelStatelessImportEnabled.Load() // Track original witness states before any merging (for cache update later) // We only want to cache NEW states from this block, not re-cache merged states var originalWitnessStates map[string]struct{} - if !bc.parallelStatelessImportEnabled.Load() { + if !parallelMode { // Copy original witness states before merging originalWitnessStates = make(map[string]struct{}, len(witness.State)) for stateNode := range witness.State { @@ -4385,7 +4387,7 @@ func (bc *BlockChain) ProcessBlockWithWitnesses(block *types.Block, witness *sta // Compact witness cache operations are only compatible with sequential import // In parallel mode, blocks are processed concurrently which breaks cache assumptions - if !bc.parallelStatelessImportEnabled.Load() { + if !parallelMode { // Manage sliding window (slide if needed) bc.manageSlidingWindow(blockNum) diff --git a/eth/handler_wit.go b/eth/handler_wit.go index 92dfacfcba..86e438a82c 100644 --- a/eth/handler_wit.go +++ b/eth/handler_wit.go @@ -62,27 +62,27 @@ func (h *witHandler) Handle(peer *wit.Peer, packet wit.Packet) error { case *wit.NewWitnessHashesPacket: return h.handleWitnessHashesAnnounce(peer, packet.Hashes, packet.Numbers) case *wit.GetWitnessPacket: - var response wit.WitnessPacketResponse - var err error - - // Check if this is a compact witness request - if packet.Compact { - // Handle compact witness request with filtering - response, err = h.handleGetCompactWitness(peer, packet) - if err != nil { - return fmt.Errorf("failed to handle GetCompactWitnessPacket: %w", err) - } - // Reply with compact witness (different message code) - return peer.ReplyCompactWitness(packet.RequestId, &response) - } else { - // Handle regular witness request - response, err = h.handleGetWitness(peer, packet) - if err != nil { - return fmt.Errorf("failed to handle GetWitnessPacket: %w", err) - } - // Reply with regular witness - return peer.ReplyWitness(packet.RequestId, &response) + // Handle regular witness request + response, err := h.handleGetWitness(peer, packet) + if err != nil { + return fmt.Errorf("failed to handle GetWitnessPacket: %w", err) + } + // Reply with regular witness + return peer.ReplyWitness(packet.RequestId, &response) + + case *wit.GetCompactWitnessPacket: + // Handle compact witness request with filtering + // Convert to regular GetWitnessPacket for internal handling + regularPacket := &wit.GetWitnessPacket{ + RequestId: packet.RequestId, + GetWitnessRequest: packet.GetWitnessRequest, + } + response, err := h.handleGetCompactWitness(peer, regularPacket) + if err != nil { + return fmt.Errorf("failed to handle GetCompactWitnessPacket: %w", err) } + // Reply with compact witness (different message code) + return peer.ReplyCompactWitness(packet.RequestId, &response) case *wit.GetWitnessMetadataPacket: // Call handleGetWitnessMetadata which returns only metadata (page count) @@ -248,7 +248,6 @@ func (h *witHandler) handleGetCompactWitness(peer *wit.Peer, req *wit.GetWitness if cacheHit { compactWitnessCacheHitMeter.Mark(1) - log.Debug("Compact witness cache hit", "hash", witnessPage.Hash, "windowStart", cacheWindowStart) filteredPage := wit.WitnessPageResponse{ Data: cachedFiltered, diff --git a/eth/peer.go b/eth/peer.go index 38b5499954..06551b2906 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -679,7 +679,7 @@ func (p *ethPeer) doWitnessRequest( // Compact witness requires WIT2 protocol support // Fallback to full witness for WIT1 peers for backward compatibility if useCompact && p.witPeer.Peer.Version() < wit.WIT2 { - p.witPeer.Peer.Log().Debug("Peer doesn't support WIT2, falling back to full witness", "peer", p.ID(), "version", p.witPeer.Peer.Version()) + p.witPeer.Peer.Log().Info("Peer doesn't support WIT2, falling back to full witness", "peer", p.ID(), "version", p.witPeer.Peer.Version()) useCompact = false } diff --git a/eth/protocols/wit/handlers.go b/eth/protocols/wit/handlers.go index 9f71a33305..bebc592cb4 100644 --- a/eth/protocols/wit/handlers.go +++ b/eth/protocols/wit/handlers.go @@ -101,19 +101,25 @@ func handleWitnessMetadata(backend Backend, msg Decoder, peer *Peer) error { } // handleGetCompactWitness processes a GetCompactWitnessPacket request from a peer. -// Reuses GetWitnessPacket structure since format is identical. +// Uses GetCompactWitnessPacket marker type to distinguish from regular witness requests. func handleGetCompactWitness(backend Backend, msg Decoder, peer *Peer) error { - // Decode the GetWitnessPacket request (same structure for compact witness) - req := new(GetWitnessPacket) - if err := msg.Decode(&req); err != nil { + // Decode into GetWitnessRequest first (wire format) + wireReq := new(GetWitnessPacket) + if err := msg.Decode(&wireReq); err != nil { return fmt.Errorf("failed to decode GetCompactWitnessPacket: %w", err) } // Validate request parameters - if len(req.WitnessPages) == 0 { + if len(wireReq.WitnessPages) == 0 { return fmt.Errorf("invalid GetCompactWitnessPacket: WitnessPages cannot be empty") } + // Wrap in marker type for backend + req := &GetCompactWitnessPacket{ + RequestId: wireReq.RequestId, + GetWitnessRequest: wireReq.GetWitnessRequest, + } + return backend.Handle(peer, req) } diff --git a/eth/protocols/wit/peer.go b/eth/protocols/wit/peer.go index 84e7e6b0ae..f0255a8f7c 100644 --- a/eth/protocols/wit/peer.go +++ b/eth/protocols/wit/peer.go @@ -191,13 +191,12 @@ func (p *Peer) RequestCompactWitness(witnessPages []WitnessPageRequest, sink cha req := &Request{ id: id, sink: sink, - code: GetCompactWitnessMsg, // Different message code + code: GetCompactWitnessMsg, // Message code distinguishes compact from full want: CompactWitnessMsg, data: &GetWitnessPacket{ RequestId: id, GetWitnessRequest: &GetWitnessRequest{ WitnessPages: witnessPages, - Compact: true, // Mark as compact witness request }, }, } diff --git a/eth/protocols/wit/protocol.go b/eth/protocols/wit/protocol.go index f91563988d..0c465988b5 100644 --- a/eth/protocols/wit/protocol.go +++ b/eth/protocols/wit/protocol.go @@ -52,10 +52,9 @@ type Packet interface { } // GetWitnessRequest represents a list of witnesses query by witness pages. -// Also used for compact witness requests (distinguished by Compact flag). +// Used for both full and compact witness requests - message code distinguishes the type. type GetWitnessRequest struct { WitnessPages []WitnessPageRequest // Request by list of witness pages - Compact bool // True if requesting compact witness (omits cached states) } type WitnessPageRequest struct { @@ -64,12 +63,18 @@ type WitnessPageRequest struct { } // GetWitnessPacket represents a witness query with request ID wrapping. -// Also used for compact witness requests (same structure). type GetWitnessPacket struct { RequestId uint64 *GetWitnessRequest } +// GetCompactWitnessPacket is a marker type for compact witness requests. +// It has the same wire format as GetWitnessPacket but allows backend to distinguish. +type GetCompactWitnessPacket struct { + RequestId uint64 + *GetWitnessRequest +} + // WitnessPacketRLPPacket represents a witness response with request ID wrapping. // Also used for compact witness responses (same structure). type WitnessPacketRLPPacket struct { From 3d2f77e11caec411e091b70df4757cfb36bf0f9c Mon Sep 17 00:00:00 2001 From: Pratik Patil Date: Tue, 4 Nov 2025 16:01:34 +0530 Subject: [PATCH 20/22] core, eth: bug fix at window start --- core/blockchain.go | 5 +++++ eth/handler_eth.go | 15 ++++++++++----- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index a4340bb85c..6df095f994 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -4254,6 +4254,11 @@ func (bc *BlockChain) GetCacheWindowSize() uint64 { return compactWitnessCacheWindowSize } +// GetCacheOverlapSize returns the cache overlap size. +func (bc *BlockChain) GetCacheOverlapSize() uint64 { + return compactWitnessCacheOverlapSize +} + // FilterWitnessWithSlidingCache filters a witness by removing state nodes present in the sliding window cache. // This is used when sending reduced witnesses to peers. func (bc *BlockChain) FilterWitnessWithSlidingCache(witness *stateless.Witness) (*stateless.Witness, int, int) { diff --git a/eth/handler_eth.go b/eth/handler_eth.go index b30e7b0b54..19c7605599 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -163,16 +163,21 @@ func (h *ethHandler) shouldRequestCompactWitness(blockNum uint64) bool { // Calculate the consensus-aligned window start for the requested block // This must match the sender's calculation to ensure cache consistency windowSize := h.chain.GetCacheWindowSize() + overlapSize := h.chain.GetCacheOverlapSize() expectedWindowStart := (blockNum / windowSize) * windowSize - // If block is at window start, need full witness (this is first block of window) - // The sender will have just cleared/slid their cache at this block - if blockNum == expectedWindowStart { + // Calculate position within the window + blocksSinceWindowStart := blockNum - expectedWindowStart + + // Need full witness at: + // 1. Window start (block 0, 20, 40...) - new window begins + // 2. Overlap start (block 10, 30, 50...) - pre-warming next window + // This ensures nextCacheMap has complete state coverage for the next window + if blockNum == expectedWindowStart || blocksSinceWindowStart == overlapSize { return false } - // Block is within window but not at start, can use compact witness - // The sender should have cached states from earlier blocks in this window + // Block is within window (not at window start or overlap start), can use compact witness return true } From 3ddea02539e6a268a7fc2af3df248feac78def1f Mon Sep 17 00:00:00 2001 From: Pratik Patil Date: Wed, 5 Nov 2025 20:54:02 +0530 Subject: [PATCH 21/22] core, eth: added tests for compact witness --- core/blockchain_test.go | 336 ++++++++++++++++++++++++++++++++++++++++ eth/handler_eth_test.go | 80 ++++++++++ 2 files changed, 416 insertions(+) diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 0856a91368..31fa42f0f5 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -5892,3 +5892,339 @@ func TestSplitReceiptsAndDeriveFields(t *testing.T) { require.Equal(t, rlp.RawValue(stateSyncEncoded), stateSync, fmt.Sprintf("case: %s, state-sync receipts mismatch, got: %v, expected: %v", test.name, stateSync, stateSyncEncoded)) } } + +// TestCalculateCacheWindowStart tests the consensus-aligned window calculation +func TestCalculateCacheWindowStart(t *testing.T) { + tests := []struct { + name string + blockNum uint64 + expectedStart uint64 + }{ + {"Genesis block", 0, 0}, + {"Within first window", 5, 0}, + {"At window boundary", 20, 20}, + {"Just after window boundary", 21, 20}, + {"Within second window", 35, 20}, + {"At second window boundary", 40, 40}, + {"Large block number", 1000, 1000}, + {"Large block in window", 1015, 1000}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := calculateCacheWindowStart(tt.blockNum) + require.Equal(t, tt.expectedStart, result, + "calculateCacheWindowStart(%d) = %d, want %d", + tt.blockNum, result, tt.expectedStart) + }) + } +} + +// TestManageSlidingWindow tests window sliding logic +func TestManageSlidingWindow(t *testing.T) { + // Create a minimal blockchain instance for testing + bc := &BlockChain{ + activeCacheMap: make(map[string]struct{}), + nextCacheMap: make(map[string]struct{}), + cacheWindowStart: 0, + } + + // Populate active cache with test data + bc.activeCacheMap["state1"] = struct{}{} + bc.activeCacheMap["state2"] = struct{}{} + + // Populate next cache with test data + bc.nextCacheMap["state3"] = struct{}{} + bc.nextCacheMap["state4"] = struct{}{} + + tests := []struct { + name string + blockNum uint64 + expectSlide bool + expectedWindowStart uint64 + expectedActiveSize int + }{ + { + name: "No slide - within same window", + blockNum: 5, + expectSlide: false, + expectedWindowStart: 0, + expectedActiveSize: 2, + }, + { + name: "Slide at window boundary", + blockNum: 20, + expectSlide: true, + expectedWindowStart: 20, + expectedActiveSize: 2, // next becomes active + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + slid := bc.manageSlidingWindow(tt.blockNum) + require.Equal(t, tt.expectSlide, slid, "Slide result mismatch") + require.Equal(t, tt.expectedWindowStart, bc.cacheWindowStart, "Window start mismatch") + require.Equal(t, tt.expectedActiveSize, len(bc.activeCacheMap), "Active cache size mismatch") + + if tt.expectSlide { + // After slide, next cache should be empty + require.Equal(t, 0, len(bc.nextCacheMap), "Next cache should be empty after slide") + } + }) + } +} + +// TestMergeSpanCacheIntoWitness tests merging cached states into witness +func TestMergeSpanCacheIntoWitness(t *testing.T) { + bc := &BlockChain{ + activeCacheMap: make(map[string]struct{}), + } + + // Populate cache + bc.activeCacheMap["cachedState1"] = struct{}{} + bc.activeCacheMap["cachedState2"] = struct{}{} + + tests := []struct { + name string + witness *stateless.Witness + expectedMerged int + expectedTotal int + }{ + { + name: "Nil witness", + witness: nil, + expectedMerged: 0, + expectedTotal: 0, + }, + { + name: "Empty witness state", + witness: &stateless.Witness{ + State: make(map[string]struct{}), + }, + expectedMerged: 2, + expectedTotal: 2, + }, + { + name: "Witness with one cached state", + witness: &stateless.Witness{ + State: map[string]struct{}{ + "cachedState1": {}, + "newState1": {}, + }, + }, + expectedMerged: 1, // Only cachedState2 is new + expectedTotal: 3, // cachedState1, cachedState2, newState1 + }, + { + name: "Witness with no cached states", + witness: &stateless.Witness{ + State: map[string]struct{}{ + "newState1": {}, + "newState2": {}, + }, + }, + expectedMerged: 2, // Both cached states merged + expectedTotal: 4, // newState1, newState2, cachedState1, cachedState2 + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.witness != nil { + originalSize := len(tt.witness.State) + merged := bc.mergeSpanCacheIntoWitness(tt.witness) + require.Equal(t, tt.expectedMerged, merged, "Merged count mismatch") + require.Equal(t, tt.expectedTotal, len(tt.witness.State), "Total witness size mismatch") + require.Equal(t, originalSize+merged, len(tt.witness.State), "Size calculation mismatch") + } else { + merged := bc.mergeSpanCacheIntoWitness(tt.witness) + require.Equal(t, tt.expectedMerged, merged, "Merged count mismatch for nil witness") + } + }) + } +} + +// TestUpdateSlidingWindowCache tests cache updates during overlap period +func TestUpdateSlidingWindowCache(t *testing.T) { + bc := &BlockChain{ + activeCacheMap: make(map[string]struct{}), + nextCacheMap: make(map[string]struct{}), + cacheWindowStart: 0, + } + + tests := []struct { + name string + blockNum uint64 + originalStates map[string]struct{} + inOverlapPeriod bool + expectedActiveSize int + expectedNextSize int + }{ + { + name: "Block before overlap - cache to active only", + blockNum: 5, + originalStates: map[string]struct{}{ + "state1": {}, + "state2": {}, + }, + inOverlapPeriod: false, + expectedActiveSize: 2, + expectedNextSize: 0, + }, + { + name: "Block in overlap period - cache to both", + blockNum: 15, // 15 >= 10 (overlap start) + originalStates: map[string]struct{}{ + "state3": {}, + "state4": {}, + }, + inOverlapPeriod: true, + expectedActiveSize: 4, // state1, state2, state3, state4 + expectedNextSize: 2, // state3, state4 + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + bc.updateSlidingWindowCache(tt.blockNum, tt.originalStates, nil) + require.Equal(t, tt.expectedActiveSize, len(bc.activeCacheMap), + "Active cache size mismatch at block %d", tt.blockNum) + require.Equal(t, tt.expectedNextSize, len(bc.nextCacheMap), + "Next cache size mismatch at block %d", tt.blockNum) + }) + } +} + +// TestFilterWitnessWithSlidingCache tests witness filtering +func TestFilterWitnessWithSlidingCache(t *testing.T) { + bc := &BlockChain{ + activeCacheMap: make(map[string]struct{}), + } + + // Populate cache + bc.activeCacheMap["cachedState1"] = struct{}{} + bc.activeCacheMap["cachedState2"] = struct{}{} + + tests := []struct { + name string + witness *stateless.Witness + expectedRemoved int + expectedFiltered int + }{ + { + name: "Nil witness", + witness: nil, + expectedRemoved: 0, + expectedFiltered: 0, + }, + { + name: "Witness with all cached states", + witness: &stateless.Witness{ + State: map[string]struct{}{ + "cachedState1": {}, + "cachedState2": {}, + }, + }, + expectedRemoved: 2, + expectedFiltered: 0, + }, + { + name: "Witness with no cached states", + witness: &stateless.Witness{ + State: map[string]struct{}{ + "newState1": {}, + "newState2": {}, + }, + }, + expectedRemoved: 0, + expectedFiltered: 2, + }, + { + name: "Witness with mixed states", + witness: &stateless.Witness{ + State: map[string]struct{}{ + "cachedState1": {}, + "newState1": {}, + "cachedState2": {}, + "newState2": {}, + }, + }, + expectedRemoved: 2, + expectedFiltered: 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + filtered, originalSize, removed := bc.FilterWitnessWithSlidingCache(tt.witness) + + if tt.witness == nil { + require.Nil(t, filtered, "Filtered witness should be nil") + require.Equal(t, 0, originalSize, "Original size should be 0 for nil") + require.Equal(t, 0, removed, "Removed count should be 0 for nil") + } else { + require.Equal(t, tt.expectedRemoved, removed, "Removed count mismatch") + require.Equal(t, tt.expectedFiltered, len(filtered.State), "Filtered size mismatch") + require.Equal(t, len(tt.witness.State), originalSize, "Original size mismatch") + } + }) + } +} + +// TestSlidingWindowEndToEnd tests the complete sliding window lifecycle +func TestSlidingWindowEndToEnd(t *testing.T) { + bc := &BlockChain{ + activeCacheMap: make(map[string]struct{}), + nextCacheMap: make(map[string]struct{}), + cacheWindowStart: 0, + } + + // Simulate processing blocks with sliding window + // Window size: 20, Overlap: 10 + + // Blocks 0-9: Build active cache + for blockNum := uint64(0); blockNum < 10; blockNum++ { + bc.manageSlidingWindow(blockNum) + states := map[string]struct{}{ + fmt.Sprintf("state_%d", blockNum): {}, + } + bc.updateSlidingWindowCache(blockNum, states, nil) + } + + require.Equal(t, 10, len(bc.activeCacheMap), "Active cache should have 10 states after blocks 0-9") + require.Equal(t, 0, len(bc.nextCacheMap), "Next cache should be empty before overlap") + + // Blocks 10-19: Overlap period - cache to both + for blockNum := uint64(10); blockNum < 20; blockNum++ { + bc.manageSlidingWindow(blockNum) + states := map[string]struct{}{ + fmt.Sprintf("state_%d", blockNum): {}, + } + bc.updateSlidingWindowCache(blockNum, states, nil) + } + + require.Equal(t, 20, len(bc.activeCacheMap), "Active cache should have 20 states after blocks 0-19") + require.Equal(t, 10, len(bc.nextCacheMap), "Next cache should have 10 states from overlap") + + // Block 20: Window slide + slid := bc.manageSlidingWindow(20) + require.True(t, slid, "Window should slide at block 20") + require.Equal(t, uint64(20), bc.cacheWindowStart, "Window start should be 20") + require.Equal(t, 10, len(bc.activeCacheMap), "Active cache should have 10 states from previous next cache") + require.Equal(t, 0, len(bc.nextCacheMap), "Next cache should be empty after slide") + + // Verify the active cache has states from overlap period (10-19) + for blockNum := uint64(10); blockNum < 20; blockNum++ { + stateKey := fmt.Sprintf("state_%d", blockNum) + _, exists := bc.activeCacheMap[stateKey] + require.True(t, exists, "Active cache should contain %s after slide", stateKey) + } + + // Verify states from before overlap (0-9) are gone + for blockNum := uint64(0); blockNum < 10; blockNum++ { + stateKey := fmt.Sprintf("state_%d", blockNum) + _, exists := bc.activeCacheMap[stateKey] + require.False(t, exists, "Active cache should NOT contain %s after slide", stateKey) + } +} diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index 193531ee67..95e4cad360 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -33,6 +33,7 @@ import ( "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/params" + "github.com/stretchr/testify/require" ) // testEthHandler is a mock event handler to listen for inbound network requests @@ -703,3 +704,82 @@ func testBroadcastMalformedBlock(t *testing.T, protocol uint) { } } } + +const ( + // Test constants matching core.blockchain.go consensus constants + testCacheWindowSize = 20 // Must match compactWitnessCacheWindowSize + testCacheOverlapSize = 10 // Must match compactWitnessCacheOverlapSize +) + +// mockBlockChainForCompactWitness mocks the minimal BlockChain interface needed for compact witness tests +type mockBlockChainForCompactWitness struct { + parallelImportEnabled bool + cacheWindowStart uint64 +} + +func (m *mockBlockChainForCompactWitness) IsParallelStatelessImportEnabled() bool { + return m.parallelImportEnabled +} + +func (m *mockBlockChainForCompactWitness) GetCacheWindowStart() uint64 { + return m.cacheWindowStart +} + +func (m *mockBlockChainForCompactWitness) GetCacheWindowSize() uint64 { + return testCacheWindowSize +} + +func (m *mockBlockChainForCompactWitness) GetCacheOverlapSize() uint64 { + return testCacheOverlapSize +} + +// TestShouldRequestCompactWitness tests the compact witness request decision logic +// This test validates the critical window and overlap boundaries +func TestShouldRequestCompactWitness(t *testing.T) { + // windowSize=20, overlapSize=10 + // Windows: [0-19], [20-39], [40-59]... + // Overlaps: blocks 10-19, 30-39, 50-59... + // Full witness needed at: 0, 10, 20, 30, 40, 50... (window starts and overlap starts) + + tests := []struct { + blockNum uint64 + expectedCompact bool + reason string + }{ + {0, false, "window start"}, + {5, true, "before overlap"}, + {9, true, "just before overlap"}, + {10, false, "overlap start - needs full for nextCache"}, + {11, true, "in overlap, after start"}, + {15, true, "middle of overlap"}, + {19, true, "end of window"}, + {20, false, "window start (slide)"}, + {21, true, "after slide"}, + {30, false, "overlap start in second window"}, + {35, true, "in second window overlap"}, + {40, false, "window start in third window"}, + {50, false, "overlap start in third window"}, + {100, false, "window start"}, + {110, false, "overlap start"}, + {115, true, "in overlap"}, + } + + for _, tt := range tests { + t.Run(fmt.Sprintf("block_%d_%s", tt.blockNum, tt.reason), func(t *testing.T) { + // Calculate expected result using the same logic as shouldRequestCompactWitness + windowSize := uint64(testCacheWindowSize) + overlapSize := uint64(testCacheOverlapSize) + expectedWindowStart := (tt.blockNum / windowSize) * windowSize + blocksSinceWindowStart := tt.blockNum - expectedWindowStart + + isWindowStart := tt.blockNum == expectedWindowStart + isOverlapStart := blocksSinceWindowStart == overlapSize + + expectedResult := !isWindowStart && !isOverlapStart + + require.Equal(t, tt.expectedCompact, expectedResult, + "Block %d (%s): logic mismatch - expected compact=%v but logic gives %v", + tt.blockNum, tt.reason, tt.expectedCompact, expectedResult) + }) + } +} From 5f49e5542a580526a4477a255cf7c2263145e992 Mon Sep 17 00:00:00 2001 From: Pratik Patil Date: Thu, 6 Nov 2025 13:06:48 +0530 Subject: [PATCH 22/22] lint fix --- eth/handler_eth_test.go | 22 ---------------------- 1 file changed, 22 deletions(-) diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index 95e4cad360..bbf02eddef 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -711,28 +711,6 @@ const ( testCacheOverlapSize = 10 // Must match compactWitnessCacheOverlapSize ) -// mockBlockChainForCompactWitness mocks the minimal BlockChain interface needed for compact witness tests -type mockBlockChainForCompactWitness struct { - parallelImportEnabled bool - cacheWindowStart uint64 -} - -func (m *mockBlockChainForCompactWitness) IsParallelStatelessImportEnabled() bool { - return m.parallelImportEnabled -} - -func (m *mockBlockChainForCompactWitness) GetCacheWindowStart() uint64 { - return m.cacheWindowStart -} - -func (m *mockBlockChainForCompactWitness) GetCacheWindowSize() uint64 { - return testCacheWindowSize -} - -func (m *mockBlockChainForCompactWitness) GetCacheOverlapSize() uint64 { - return testCacheOverlapSize -} - // TestShouldRequestCompactWitness tests the compact witness request decision logic // This test validates the critical window and overlap boundaries func TestShouldRequestCompactWitness(t *testing.T) {