Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 68 additions & 18 deletions polygon/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ const maxBlockBatchDownloadSize = 256
const heimdallSyncRetryIntervalOnTip = 200 * time.Millisecond
const heimdallSyncRetryIntervalOnStartup = 30 * time.Second

// catchUpAgeThreshold is the maximum acceptable tip age before the event loop
// breaks out and re-enters syncToTip for efficient waypoint-based catch-up batching.
const catchUpAgeThreshold = 30 * time.Second

var (
futureMilestoneDelay = 1 * time.Second // amount of time to wait before putting a future milestone back in the event queue
errAlreadyProcessed = errors.New("already processed")
Expand Down Expand Up @@ -143,6 +147,11 @@ type Sync struct {
wiggleCalculator wiggleCalculator
engineAPISwitcher EngineAPISwitcher
blockRequestsCache *lru.ARCCache[common.Hash, struct{}]

// lastTipAge tracks how far behind the chain tip the node is.
// Updated in commitExecution, used to detect when the event loop
// should break out and re-enter syncToTip for catch-up batching.
lastTipAge time.Duration
}

func (s *Sync) commitExecution(ctx context.Context, newTip *types.Header, finalizedHeader *types.Header) error {
Expand All @@ -153,6 +162,7 @@ func (s *Sync) commitExecution(ctx context.Context, newTip *types.Header, finali
blockNum := newTip.Number.Uint64()

age := common.PrettyAge(time.Unix(int64(newTip.Time), 0))
s.lastTipAge = time.Since(time.Unix(int64(newTip.Time), 0))
s.logger.Info(syncLogPrefix("update fork choice"), "block", blockNum, "hash", newTip.Hash(), "age", age)
fcStartTime := time.Now()

Expand Down Expand Up @@ -862,23 +872,51 @@ func (s *Sync) Run(ctx context.Context) error {
}

s.logger.Info(syncLogPrefix("running sync component"))
result, err := s.syncToTip(ctx)
if err != nil {
return err
}

if s.config.PolygonPosSingleSlotFinality {
if result.latestTip.Number.Uint64() >= s.config.PolygonPosSingleSlotFinalityBlockAt {
s.engineAPISwitcher.SetConsuming(true)
// Outer catch-up loop: when the event loop falls behind, break out and
// re-enter syncToTip which uses efficient waypoint-based batching.
for {
result, err := s.syncToTip(ctx)
if err != nil {
return err
}

if s.config.PolygonPosSingleSlotFinality {
if result.latestTip.Number.Uint64() >= s.config.PolygonPosSingleSlotFinalityBlockAt {
s.logger.Info(syncLogPrefix("switching to engine API mode (SSF)"), "tip", result.latestTip.Number.Uint64())
s.engineAPISwitcher.SetConsuming(true)
return nil
}
}

ccBuilder, err := s.initialiseCcb(ctx, result)
if err != nil {
return err
}

needsCatchUp, err := s.runEventLoop(ctx, ccBuilder)
if err != nil {
return err
}
if !needsCatchUp {
return nil
}
}

ccBuilder, err := s.initialiseCcb(ctx, result)
if err != nil {
return err
s.logger.Info(syncLogPrefix("re-entering syncToTip for catch-up"),
"lastTipAge", s.lastTipAge,
"threshold", catchUpAgeThreshold,
)
}
}

// runEventLoop processes tip events (new blocks, milestones, block hashes) one at a time.
// It returns needsCatchUp=true when the node has fallen too far behind and should re-enter
// syncToTip for efficient waypoint-based batch catch-up.
//
// Known limitations:
// - initialCycle is never true during catch-up re-entries (conservative pruning applies)
// - Span rotation every 128 blocks (~256s) adds ~12s overhead, which can trigger catch-up
func (s *Sync) runEventLoop(ctx context.Context, ccBuilder *CanonicalChainBuilder) (needsCatchUp bool, err error) {
inactivityDuration := 30 * time.Second
lastProcessedEventTime := time.Now()
inactivityTicker := time.NewTicker(inactivityDuration)
Expand All @@ -889,36 +927,48 @@ func (s *Sync) Run(ctx context.Context) error {
if s.config.PolygonPosSingleSlotFinality {
block, err := s.execution.CurrentHeader(ctx)
if err != nil {
return err
return false, err
}

if block.Number.Uint64() >= s.config.PolygonPosSingleSlotFinalityBlockAt {
s.engineAPISwitcher.SetConsuming(true)
return nil
return false, nil
}
}

var checkAge bool
switch event.Type {
case EventTypeNewMilestone:
if err = s.applyNewMilestoneOnTip(ctx, event.AsNewMilestone(), ccBuilder); err != nil {
return err
return false, err
}
case EventTypeNewBlock:
if err = s.applyNewBlockOnTip(ctx, event.AsNewBlock(), ccBuilder); err != nil {
return err
return false, err
}
checkAge = true
case EventTypeNewBlockBatch:
if err = s.applyNewBlockBatchOnTip(ctx, event.AsNewBlockBatch(), ccBuilder); err != nil {
return err
return false, err
}
checkAge = true
case EventTypeNewBlockHashes:
if err = s.applyNewBlockHashesOnTip(ctx, event.AsNewBlockHashes(), ccBuilder); err != nil {
return err
return false, err
}
checkAge = true
default:
panic(fmt.Sprintf("unexpected event type: %v", event.Type))
}

// After processing block events (not milestones, which are finality metadata),
// check if we've fallen too far behind and need to switch to batch catch-up mode.
if checkAge && s.lastTipAge > catchUpAgeThreshold {
s.logger.Info(syncLogPrefix("node is behind, switching to catch-up mode"),
"tipAge", s.lastTipAge, "threshold", catchUpAgeThreshold)
return true, nil
}

lastProcessedEventTime = time.Now()
case <-inactivityTicker.C:
if time.Since(lastProcessedEventTime) < inactivityDuration {
Expand All @@ -927,7 +977,7 @@ func (s *Sync) Run(ctx context.Context) error {

s.logger.Info(syncLogPrefix("waiting for chain tip events..."))
case <-ctx.Done():
return ctx.Err()
return false, ctx.Err()
}
}
}
Expand Down
94 changes: 94 additions & 0 deletions polygon/sync/sync_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package sync

import (
"context"
"math/big"
"testing"
"time"

"github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/log/v3"
"github.com/erigontech/erigon/execution/types"
"go.uber.org/mock/gomock"
)

// stubExecutionClient is a minimal no-op implementation of ExecutionClient for unit tests.
type stubExecutionClient struct {
tipHash common.Hash
}

func (s *stubExecutionClient) Prepare(context.Context) error { return nil }
func (s *stubExecutionClient) InsertBlocks(context.Context, []*types.Block) error {
return nil
}
func (s *stubExecutionClient) UpdateForkChoice(_ context.Context, tip *types.Header, _ *types.Header) (common.Hash, error) {
return tip.Hash(), nil
}
func (s *stubExecutionClient) CurrentHeader(context.Context) (*types.Header, error) {
return nil, nil
}
func (s *stubExecutionClient) GetHeader(context.Context, uint64) (*types.Header, error) {
return nil, nil
}
func (s *stubExecutionClient) GetTd(context.Context, uint64, common.Hash) (*big.Int, error) {
return big.NewInt(0), nil
}

func newTestSync(t *testing.T) *Sync {
t.Helper()
ctrl := gomock.NewController(t)
store := NewMockStore(ctrl)
store.EXPECT().Flush(gomock.Any()).Return(nil).AnyTimes()

return &Sync{
store: store,
execution: &stubExecutionClient{},
logger: log.New(),
}
}

func TestCommitExecutionTracksLastTipAge(t *testing.T) {
s := newTestSync(t)

// Create a header with timestamp 60 seconds in the past.
oldTime := time.Now().Add(-60 * time.Second)
header := &types.Header{
Number: big.NewInt(100),
Time: uint64(oldTime.Unix()),
}

err := s.commitExecution(context.Background(), header, header)
if err != nil {
t.Fatalf("commitExecution failed: %v", err)
}

if s.lastTipAge <= catchUpAgeThreshold {
t.Errorf("expected lastTipAge > %v for a 60s-old block, got %v", catchUpAgeThreshold, s.lastTipAge)
}
}

func TestCommitExecutionRecentBlockHasLowAge(t *testing.T) {
s := newTestSync(t)

// Create a header with timestamp 2 seconds in the past.
recentTime := time.Now().Add(-2 * time.Second)
header := &types.Header{
Number: big.NewInt(200),
Time: uint64(recentTime.Unix()),
}

err := s.commitExecution(context.Background(), header, header)
if err != nil {
t.Fatalf("commitExecution failed: %v", err)
}

if s.lastTipAge > catchUpAgeThreshold {
t.Errorf("expected lastTipAge < %v for a 2s-old block, got %v", catchUpAgeThreshold, s.lastTipAge)
}
}

func TestCatchUpAgeThresholdValue(t *testing.T) {
if catchUpAgeThreshold != 30*time.Second {
t.Errorf("expected catchUpAgeThreshold = 30s, got %v", catchUpAgeThreshold)
}
}