diff --git a/polygon/sync/sync.go b/polygon/sync/sync.go index c6bfb330201..8ff692cbaa4 100644 --- a/polygon/sync/sync.go +++ b/polygon/sync/sync.go @@ -49,6 +49,10 @@ const maxBlockBatchDownloadSize = 256 const heimdallSyncRetryIntervalOnTip = 200 * time.Millisecond const heimdallSyncRetryIntervalOnStartup = 30 * time.Second +// catchUpAgeThreshold is the maximum acceptable tip age before the event loop +// breaks out and re-enters syncToTip for efficient waypoint-based catch-up batching. +const catchUpAgeThreshold = 30 * time.Second + var ( futureMilestoneDelay = 1 * time.Second // amount of time to wait before putting a future milestone back in the event queue errAlreadyProcessed = errors.New("already processed") @@ -143,6 +147,11 @@ type Sync struct { wiggleCalculator wiggleCalculator engineAPISwitcher EngineAPISwitcher blockRequestsCache *lru.ARCCache[common.Hash, struct{}] + + // lastTipAge tracks how far behind the chain tip the node is. + // Updated in commitExecution, used to detect when the event loop + // should break out and re-enter syncToTip for catch-up batching. + lastTipAge time.Duration } func (s *Sync) commitExecution(ctx context.Context, newTip *types.Header, finalizedHeader *types.Header) error { @@ -153,6 +162,7 @@ func (s *Sync) commitExecution(ctx context.Context, newTip *types.Header, finali blockNum := newTip.Number.Uint64() age := common.PrettyAge(time.Unix(int64(newTip.Time), 0)) + s.lastTipAge = time.Since(time.Unix(int64(newTip.Time), 0)) s.logger.Info(syncLogPrefix("update fork choice"), "block", blockNum, "hash", newTip.Hash(), "age", age) fcStartTime := time.Now() @@ -862,23 +872,51 @@ func (s *Sync) Run(ctx context.Context) error { } s.logger.Info(syncLogPrefix("running sync component")) - result, err := s.syncToTip(ctx) - if err != nil { - return err - } - if s.config.PolygonPosSingleSlotFinality { - if result.latestTip.Number.Uint64() >= s.config.PolygonPosSingleSlotFinalityBlockAt { - s.engineAPISwitcher.SetConsuming(true) + // Outer catch-up loop: when the event loop falls behind, break out and + // re-enter syncToTip which uses efficient waypoint-based batching. + for { + result, err := s.syncToTip(ctx) + if err != nil { + return err + } + + if s.config.PolygonPosSingleSlotFinality { + if result.latestTip.Number.Uint64() >= s.config.PolygonPosSingleSlotFinalityBlockAt { + s.logger.Info(syncLogPrefix("switching to engine API mode (SSF)"), "tip", result.latestTip.Number.Uint64()) + s.engineAPISwitcher.SetConsuming(true) + return nil + } + } + + ccBuilder, err := s.initialiseCcb(ctx, result) + if err != nil { + return err + } + + needsCatchUp, err := s.runEventLoop(ctx, ccBuilder) + if err != nil { + return err + } + if !needsCatchUp { return nil } - } - ccBuilder, err := s.initialiseCcb(ctx, result) - if err != nil { - return err + s.logger.Info(syncLogPrefix("re-entering syncToTip for catch-up"), + "lastTipAge", s.lastTipAge, + "threshold", catchUpAgeThreshold, + ) } +} +// runEventLoop processes tip events (new blocks, milestones, block hashes) one at a time. +// It returns needsCatchUp=true when the node has fallen too far behind and should re-enter +// syncToTip for efficient waypoint-based batch catch-up. +// +// Known limitations: +// - initialCycle is never true during catch-up re-entries (conservative pruning applies) +// - Span rotation every 128 blocks (~256s) adds ~12s overhead, which can trigger catch-up +func (s *Sync) runEventLoop(ctx context.Context, ccBuilder *CanonicalChainBuilder) (needsCatchUp bool, err error) { inactivityDuration := 30 * time.Second lastProcessedEventTime := time.Now() inactivityTicker := time.NewTicker(inactivityDuration) @@ -889,36 +927,48 @@ func (s *Sync) Run(ctx context.Context) error { if s.config.PolygonPosSingleSlotFinality { block, err := s.execution.CurrentHeader(ctx) if err != nil { - return err + return false, err } if block.Number.Uint64() >= s.config.PolygonPosSingleSlotFinalityBlockAt { s.engineAPISwitcher.SetConsuming(true) - return nil + return false, nil } } + var checkAge bool switch event.Type { case EventTypeNewMilestone: if err = s.applyNewMilestoneOnTip(ctx, event.AsNewMilestone(), ccBuilder); err != nil { - return err + return false, err } case EventTypeNewBlock: if err = s.applyNewBlockOnTip(ctx, event.AsNewBlock(), ccBuilder); err != nil { - return err + return false, err } + checkAge = true case EventTypeNewBlockBatch: if err = s.applyNewBlockBatchOnTip(ctx, event.AsNewBlockBatch(), ccBuilder); err != nil { - return err + return false, err } + checkAge = true case EventTypeNewBlockHashes: if err = s.applyNewBlockHashesOnTip(ctx, event.AsNewBlockHashes(), ccBuilder); err != nil { - return err + return false, err } + checkAge = true default: panic(fmt.Sprintf("unexpected event type: %v", event.Type)) } + // After processing block events (not milestones, which are finality metadata), + // check if we've fallen too far behind and need to switch to batch catch-up mode. + if checkAge && s.lastTipAge > catchUpAgeThreshold { + s.logger.Info(syncLogPrefix("node is behind, switching to catch-up mode"), + "tipAge", s.lastTipAge, "threshold", catchUpAgeThreshold) + return true, nil + } + lastProcessedEventTime = time.Now() case <-inactivityTicker.C: if time.Since(lastProcessedEventTime) < inactivityDuration { @@ -927,7 +977,7 @@ func (s *Sync) Run(ctx context.Context) error { s.logger.Info(syncLogPrefix("waiting for chain tip events...")) case <-ctx.Done(): - return ctx.Err() + return false, ctx.Err() } } } diff --git a/polygon/sync/sync_test.go b/polygon/sync/sync_test.go new file mode 100644 index 00000000000..8a98fe76cc4 --- /dev/null +++ b/polygon/sync/sync_test.go @@ -0,0 +1,94 @@ +package sync + +import ( + "context" + "math/big" + "testing" + "time" + + "github.com/erigontech/erigon-lib/common" + "github.com/erigontech/erigon-lib/log/v3" + "github.com/erigontech/erigon/execution/types" + "go.uber.org/mock/gomock" +) + +// stubExecutionClient is a minimal no-op implementation of ExecutionClient for unit tests. +type stubExecutionClient struct { + tipHash common.Hash +} + +func (s *stubExecutionClient) Prepare(context.Context) error { return nil } +func (s *stubExecutionClient) InsertBlocks(context.Context, []*types.Block) error { + return nil +} +func (s *stubExecutionClient) UpdateForkChoice(_ context.Context, tip *types.Header, _ *types.Header) (common.Hash, error) { + return tip.Hash(), nil +} +func (s *stubExecutionClient) CurrentHeader(context.Context) (*types.Header, error) { + return nil, nil +} +func (s *stubExecutionClient) GetHeader(context.Context, uint64) (*types.Header, error) { + return nil, nil +} +func (s *stubExecutionClient) GetTd(context.Context, uint64, common.Hash) (*big.Int, error) { + return big.NewInt(0), nil +} + +func newTestSync(t *testing.T) *Sync { + t.Helper() + ctrl := gomock.NewController(t) + store := NewMockStore(ctrl) + store.EXPECT().Flush(gomock.Any()).Return(nil).AnyTimes() + + return &Sync{ + store: store, + execution: &stubExecutionClient{}, + logger: log.New(), + } +} + +func TestCommitExecutionTracksLastTipAge(t *testing.T) { + s := newTestSync(t) + + // Create a header with timestamp 60 seconds in the past. + oldTime := time.Now().Add(-60 * time.Second) + header := &types.Header{ + Number: big.NewInt(100), + Time: uint64(oldTime.Unix()), + } + + err := s.commitExecution(context.Background(), header, header) + if err != nil { + t.Fatalf("commitExecution failed: %v", err) + } + + if s.lastTipAge <= catchUpAgeThreshold { + t.Errorf("expected lastTipAge > %v for a 60s-old block, got %v", catchUpAgeThreshold, s.lastTipAge) + } +} + +func TestCommitExecutionRecentBlockHasLowAge(t *testing.T) { + s := newTestSync(t) + + // Create a header with timestamp 2 seconds in the past. + recentTime := time.Now().Add(-2 * time.Second) + header := &types.Header{ + Number: big.NewInt(200), + Time: uint64(recentTime.Unix()), + } + + err := s.commitExecution(context.Background(), header, header) + if err != nil { + t.Fatalf("commitExecution failed: %v", err) + } + + if s.lastTipAge > catchUpAgeThreshold { + t.Errorf("expected lastTipAge < %v for a 2s-old block, got %v", catchUpAgeThreshold, s.lastTipAge) + } +} + +func TestCatchUpAgeThresholdValue(t *testing.T) { + if catchUpAgeThreshold != 30*time.Second { + t.Errorf("expected catchUpAgeThreshold = 30s, got %v", catchUpAgeThreshold) + } +}