Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions op-node/rollup/attributes/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type EngineController interface {
// RequestForkchoiceUpdate requests a forkchoice update
RequestForkchoiceUpdate(ctx context.Context)
RequestPendingSafeUpdate(ctx context.Context)
StartBuildAsync(ctx context.Context, attrs *derive.AttributesWithParent) event.Promise0[error]
}

type L2 interface {
Expand Down Expand Up @@ -184,20 +185,27 @@ func (eq *AttributesHandler) onPendingSafeUpdate(ctx context.Context, x engine.P
} else {
// if there already exists a block we can just consolidate it
if x.PendingSafe.Number < x.Unsafe.Number {
eq.consolidateNextSafeAttributes(eq.attributes, x.PendingSafe)
eq.consolidateNextSafeAttributes(ctx, eq.attributes, x.PendingSafe)
} else {
// append to tip otherwise
eq.sentAttributes = true
eq.emitter.Emit(ctx, engine.BuildStartEvent{Attributes: eq.attributes})
p := eq.engineController.StartBuildAsync(ctx, eq.attributes)
if err := p.Await(ctx); err != nil {
// context cancelled
return
}
if err, _ := p.Result(); err != nil {
eq.emitter.Emit(ctx, rollup.CriticalErrorEvent{Err: err})
}
}
}
}

// consolidateNextSafeAttributes tries to match the next safe attributes against the existing unsafe chain,
// to avoid extra processing or unnecessary unwinding of the chain.
// However, if the attributes do not match, they will be forced to process the attributes.
func (eq *AttributesHandler) consolidateNextSafeAttributes(attributes *derive.AttributesWithParent, onto eth.L2BlockRef) {
ctx, cancel := context.WithTimeout(eq.ctx, time.Second*10)
func (eq *AttributesHandler) consolidateNextSafeAttributes(ctx context.Context, attributes *derive.AttributesWithParent, onto eth.L2BlockRef) {
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()

envelope, err := eq.l2.PayloadByNumber(ctx, attributes.Parent.Number+1)
Expand All @@ -220,7 +228,14 @@ func (eq *AttributesHandler) consolidateNextSafeAttributes(attributes *derive.At

eq.sentAttributes = true
// geth cannot wind back a chain without reorging to a new, previously non-canonical, block
eq.emitter.Emit(eq.ctx, engine.BuildStartEvent{Attributes: attributes})
p := eq.engineController.StartBuildAsync(ctx, attributes)
if err := p.Await(ctx); err != nil {
// context cancelled
return
}
if err, _ := p.Result(); err != nil {
eq.emitter.Emit(ctx, rollup.CriticalErrorEvent{Err: err})
}
return
} else {
ref, err := derive.PayloadToBlockRef(eq.cfg, envelope.ExecutionPayload)
Expand Down
6 changes: 4 additions & 2 deletions op-node/rollup/attributes/attributes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"

"github.com/holiman/uint256"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -266,11 +267,12 @@ func TestAttributesHandler(t *testing.T) {
// The payloadA1 is going to get reorged out in favor of attrA1Alt (turns into payloadA1Alt)
l2.ExpectPayloadByNumber(refA1.Number, payloadA1, nil)
// fail consolidation, perform force reorg
emitter.ExpectOnce(engine.BuildStartEvent{Attributes: attrA1Alt})
engDeriver.On("StartBuildAsync", mock.Anything, attrA1Alt).Return(nil, nil).Once()
ah.OnEvent(context.Background(), engine.PendingSafeUpdateEvent{
PendingSafe: refA0,
Unsafe: refA1,
})
engDeriver.AssertExpectations(t)
l2.AssertExpectations(t)
emitter.AssertExpectations(t)
require.NotNil(t, ah.attributes, "still have attributes, processing still unconfirmed")
Expand Down Expand Up @@ -360,7 +362,7 @@ func TestAttributesHandler(t *testing.T) {
require.True(t, attrA1Alt.Concluding, "must be concluding attributes")

// attrA1Alt will fit right on top of A0
emitter.ExpectOnce(engine.BuildStartEvent{Attributes: attrA1Alt})
engDeriver.On("StartBuildAsync", mock.Anything, attrA1Alt).Return(nil, nil).Once()
ah.OnEvent(context.Background(), engine.PendingSafeUpdateEvent{
PendingSafe: refA0,
Unsafe: refA0,
Expand Down
7 changes: 7 additions & 0 deletions op-node/rollup/attributes/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package attributes
import (
"context"

"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/event"
"github.com/stretchr/testify/mock"
)

Expand All @@ -28,3 +30,8 @@ func (m *MockEngineController) RequestForkchoiceUpdate(ctx context.Context) {
func (m *MockEngineController) RequestPendingSafeUpdate(ctx context.Context) {
m.Mock.MethodCalled("RequestPendingSafeUpdate", ctx)
}

func (m *MockEngineController) StartBuildAsync(ctx context.Context, attrs *derive.AttributesWithParent) event.Promise0[error] {
m.Mock.MethodCalled("StartBuildAsync", ctx, attrs)
return nil
}
4 changes: 4 additions & 0 deletions op-node/rollup/clsync/clsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/event"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -31,6 +32,9 @@ func (f *fakeEngController) TryUpdateLocalSafe(ctx context.Context, ref eth.L2Bl
}
func (f *fakeEngController) RequestPendingSafeUpdate(ctx context.Context) {
}
func (f *fakeEngController) StartBuildAsync(ctx context.Context, attrs *derive.AttributesWithParent) event.Promise0[error] {
return nil
}

func TestCLSync_InvalidPayloadDropsHead(t *testing.T) {
logger := testlog.Logger(t, 0)
Expand Down
39 changes: 23 additions & 16 deletions op-node/rollup/engine/build_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/event"
)

type BuildStartEvent struct {
Expand All @@ -18,64 +19,70 @@ func (ev BuildStartEvent) String() string {
return "build-start"
}

func (eq *EngineController) onBuildStart(ctx context.Context, ev BuildStartEvent) {
// startBuild contains the core logic for beginning a block build. It is used by
// StartBuildAsync in both event-system (async) and non-event (sync) contexts.
func (eq *EngineController) startBuild(ctx context.Context, attrs *derive.AttributesWithParent) error {
rpcCtx, cancel := context.WithTimeout(eq.ctx, buildStartTimeout)
defer cancel()

if ev.Attributes.DerivedFrom != (eth.L1BlockRef{}) &&
eq.PendingSafeL2Head().Hash != ev.Attributes.Parent.Hash {
if attrs.DerivedFrom != (eth.L1BlockRef{}) &&
eq.PendingSafeL2Head().Hash != attrs.Parent.Hash {
// Warn about small reorgs, happens when pending safe head is getting rolled back
eq.log.Warn("block-attributes derived from L1 do not build on pending safe head, likely reorg",
"pending_safe", eq.PendingSafeL2Head(), "attributes_parent", ev.Attributes.Parent)
"pending_safe", eq.PendingSafeL2Head(), "attributes_parent", attrs.Parent)
}

fcEvent := ForkchoiceUpdateEvent{
UnsafeL2Head: ev.Attributes.Parent,
UnsafeL2Head: attrs.Parent,
SafeL2Head: eq.safeHead,
FinalizedL2Head: eq.finalizedHead,
}
if fcEvent.UnsafeL2Head.Number < fcEvent.FinalizedL2Head.Number {
err := fmt.Errorf("invalid block-building pre-state, unsafe head %s is behind finalized head %s", fcEvent.UnsafeL2Head, fcEvent.FinalizedL2Head)
eq.emitter.Emit(ctx, rollup.CriticalErrorEvent{Err: err}) // make the node exit, things are very wrong.
return
return err
}
fc := eth.ForkchoiceState{
HeadBlockHash: fcEvent.UnsafeL2Head.Hash,
SafeBlockHash: fcEvent.SafeL2Head.Hash,
FinalizedBlockHash: fcEvent.FinalizedL2Head.Hash,
}
buildStartTime := time.Now()
id, errTyp, err := startPayload(rpcCtx, eq.engine, fc, ev.Attributes.Attributes)
id, errTyp, err := startPayload(rpcCtx, eq.engine, fc, attrs.Attributes)
if err != nil {
switch errTyp {
case BlockInsertTemporaryErr:
// RPC errors are recoverable, we can retry the buffered payload attributes later.
eq.emitter.Emit(ctx, rollup.EngineTemporaryErrorEvent{
Err: fmt.Errorf("temporarily cannot insert new safe block: %w", err),
})
return
case BlockInsertPrestateErr:
eq.emitter.Emit(ctx, rollup.ResetEvent{
Err: fmt.Errorf("need reset to resolve pre-state problem: %w", err),
})
return
case BlockInsertPayloadErr:
eq.emitter.Emit(ctx, BuildInvalidEvent{Attributes: ev.Attributes, Err: err})
return
eq.emitter.Emit(ctx, BuildInvalidEvent{Attributes: attrs, Err: err})
default:
eq.emitter.Emit(ctx, rollup.CriticalErrorEvent{
Err: fmt.Errorf("unknown error type %d: %w", errTyp, err),
})
return
}
return err
}
eq.emitter.Emit(ctx, fcEvent)

eq.emitter.Emit(ctx, BuildStartedEvent{
Info: eth.PayloadInfo{ID: id, Timestamp: uint64(ev.Attributes.Attributes.Timestamp)},
Info: eth.PayloadInfo{ID: id, Timestamp: uint64(attrs.Attributes.Timestamp)},
BuildStarted: buildStartTime,
Concluding: ev.Attributes.Concluding,
DerivedFrom: ev.Attributes.DerivedFrom,
Parent: ev.Attributes.Parent,
Concluding: attrs.Concluding,
DerivedFrom: attrs.DerivedFrom,
Parent: attrs.Parent,
})
return nil
}

func (eq *EngineController) StartBuildAsync(ctx context.Context, attrs *derive.AttributesWithParent) event.Promise0[error] {
return event.Spawn0(ctx, func(ctx context.Context) error {
return eq.startBuild(ctx, attrs)
}, event.WithSpawnLegacyEvent(BuildStartEvent{Attributes: attrs}))
}
11 changes: 7 additions & 4 deletions op-node/rollup/engine/engine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ type EngineController struct {
rollupCfg *rollup.Config
elStart time.Time
clock clock.Clock

// TODO(#16917) Remove Event System Refactor Comments
// Event system fields (moved from EngDeriver)
ctx context.Context
Expand Down Expand Up @@ -699,9 +698,13 @@ func (d *EngineController) OnEvent(ctx context.Context, ev event.Event) bool {
d.PromoteSafe(ctx, x.Ref, x.Source)
}
case InteropInvalidateBlockEvent:
d.emitter.Emit(ctx, BuildStartEvent{Attributes: x.Attributes})
case BuildStartEvent:
d.onBuildStart(ctx, x)
p := d.StartBuildAsync(ctx, x.Attributes)
if err := p.Await(ctx); err != nil {
d.emitter.Emit(ctx, rollup.CriticalErrorEvent{Err: err})
}
if err, _ := p.Result(); err != nil {
d.emitter.Emit(ctx, rollup.CriticalErrorEvent{Err: err})
}
case BuildStartedEvent:
d.onBuildStarted(ctx, x)
case BuildSealEvent:
Expand Down
23 changes: 15 additions & 8 deletions op-node/rollup/sequencing/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (d *Sequencer) OnEvent(ctx context.Context, ev event.Event) bool {
case engine.PayloadSuccessEvent:
d.onPayloadSuccess(x)
case SequencerActionEvent:
d.onSequencerAction(x)
d.onSequencerAction(ctx, x)
case rollup.EngineTemporaryErrorEvent:
d.onEngineTemporaryError(x)
case rollup.ResetEvent:
Expand Down Expand Up @@ -345,7 +345,7 @@ func (d *Sequencer) onPayloadSuccess(x engine.PayloadSuccessEvent) {
d.asyncGossip.Clear()
}

func (d *Sequencer) onSequencerAction(ev SequencerActionEvent) {
func (d *Sequencer) onSequencerAction(ctx context.Context, ev SequencerActionEvent) {
d.log.Debug("Sequencer action")
payload := d.asyncGossip.Get()
if payload != nil {
Expand Down Expand Up @@ -387,7 +387,7 @@ func (d *Sequencer) onSequencerAction(ev SequencerActionEvent) {
})
} else if d.latest == (BuildingState{}) {
// If we have not started building anything, start building.
d.startBuildingBlock()
d.startBuildingBlock(ctx)
}
}
}
Expand Down Expand Up @@ -483,8 +483,7 @@ func (d *Sequencer) setLatestHead(head eth.L2BlockRef) {
}

// StartBuildingBlock initiates a block building job on top of the given L2 head, safe and finalized blocks, and using the provided l1Origin.
func (d *Sequencer) startBuildingBlock() {
ctx := d.ctx
func (d *Sequencer) startBuildingBlock(ctx context.Context) {
l2Head := d.latestHead

// If we do not have data to know what to build on, then request a forkchoice update
Expand Down Expand Up @@ -601,9 +600,17 @@ func (d *Sequencer) startBuildingBlock() {
// If we get a forkchoice update that conflicts, we will have to abort building.
d.latest = BuildingState{Onto: l2Head}

d.emitter.Emit(d.ctx, engine.BuildStartEvent{
Attributes: withParent,
})
p := d.eng.StartBuildAsync(ctx, withParent)
if err := p.Await(ctx); err != nil {
// context cancelled
return
}
if err, _ := p.Result(); err != nil {
d.log.Error("Failed to start building block", "err", err)
d.emitter.Emit(d.ctx, rollup.CriticalErrorEvent{Err: err})
return
}
// Not awaiting the result as we don't need it.
}

func (d *Sequencer) NextAction() (t time.Time, ok bool) {
Expand Down
Loading