Skip to content

Commit 91060d2

Browse files
committed
Convert OnBuildStart to StartBuildAsync
1 parent 2d489b1 commit 91060d2

File tree

13 files changed

+217
-204
lines changed

13 files changed

+217
-204
lines changed

op-node/rollup/attributes/attributes.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type EngineController interface {
2727
// RequestForkchoiceUpdate requests a forkchoice update
2828
RequestForkchoiceUpdate(ctx context.Context)
2929
RequestPendingSafeUpdate(ctx context.Context)
30+
StartBuildAsync(ctx context.Context, attrs *derive.AttributesWithParent) event.Promise0[error]
3031
}
3132

3233
type L2 interface {
@@ -184,20 +185,27 @@ func (eq *AttributesHandler) onPendingSafeUpdate(ctx context.Context, x engine.P
184185
} else {
185186
// if there already exists a block we can just consolidate it
186187
if x.PendingSafe.Number < x.Unsafe.Number {
187-
eq.consolidateNextSafeAttributes(eq.attributes, x.PendingSafe)
188+
eq.consolidateNextSafeAttributes(ctx, eq.attributes, x.PendingSafe)
188189
} else {
189190
// append to tip otherwise
190191
eq.sentAttributes = true
191-
eq.emitter.Emit(ctx, engine.BuildStartEvent{Attributes: eq.attributes})
192+
p := eq.engineController.StartBuildAsync(ctx, eq.attributes)
193+
if err := p.Await(ctx); err != nil {
194+
// context cancelled
195+
return
196+
}
197+
if err, _ := p.Result(); err != nil {
198+
eq.emitter.Emit(ctx, rollup.CriticalErrorEvent{Err: err})
199+
}
192200
}
193201
}
194202
}
195203

196204
// consolidateNextSafeAttributes tries to match the next safe attributes against the existing unsafe chain,
197205
// to avoid extra processing or unnecessary unwinding of the chain.
198206
// However, if the attributes do not match, they will be forced to process the attributes.
199-
func (eq *AttributesHandler) consolidateNextSafeAttributes(attributes *derive.AttributesWithParent, onto eth.L2BlockRef) {
200-
ctx, cancel := context.WithTimeout(eq.ctx, time.Second*10)
207+
func (eq *AttributesHandler) consolidateNextSafeAttributes(ctx context.Context, attributes *derive.AttributesWithParent, onto eth.L2BlockRef) {
208+
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
201209
defer cancel()
202210

203211
envelope, err := eq.l2.PayloadByNumber(ctx, attributes.Parent.Number+1)
@@ -220,7 +228,14 @@ func (eq *AttributesHandler) consolidateNextSafeAttributes(attributes *derive.At
220228

221229
eq.sentAttributes = true
222230
// geth cannot wind back a chain without reorging to a new, previously non-canonical, block
223-
eq.emitter.Emit(eq.ctx, engine.BuildStartEvent{Attributes: attributes})
231+
p := eq.engineController.StartBuildAsync(ctx, attributes)
232+
if err := p.Await(ctx); err != nil {
233+
// context cancelled
234+
return
235+
}
236+
if err, _ := p.Result(); err != nil {
237+
eq.emitter.Emit(ctx, rollup.CriticalErrorEvent{Err: err})
238+
}
224239
return
225240
} else {
226241
ref, err := derive.PayloadToBlockRef(eq.cfg, envelope.ExecutionPayload)

op-node/rollup/attributes/attributes_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"testing"
88

99
"github.com/holiman/uint256"
10+
"github.com/stretchr/testify/mock"
1011
"github.com/stretchr/testify/require"
1112

1213
"github.com/ethereum/go-ethereum/common"
@@ -266,11 +267,12 @@ func TestAttributesHandler(t *testing.T) {
266267
// The payloadA1 is going to get reorged out in favor of attrA1Alt (turns into payloadA1Alt)
267268
l2.ExpectPayloadByNumber(refA1.Number, payloadA1, nil)
268269
// fail consolidation, perform force reorg
269-
emitter.ExpectOnce(engine.BuildStartEvent{Attributes: attrA1Alt})
270+
engDeriver.On("StartBuildAsync", mock.Anything, attrA1Alt).Return(nil, nil).Once()
270271
ah.OnEvent(context.Background(), engine.PendingSafeUpdateEvent{
271272
PendingSafe: refA0,
272273
Unsafe: refA1,
273274
})
275+
engDeriver.AssertExpectations(t)
274276
l2.AssertExpectations(t)
275277
emitter.AssertExpectations(t)
276278
require.NotNil(t, ah.attributes, "still have attributes, processing still unconfirmed")
@@ -360,7 +362,7 @@ func TestAttributesHandler(t *testing.T) {
360362
require.True(t, attrA1Alt.Concluding, "must be concluding attributes")
361363

362364
// attrA1Alt will fit right on top of A0
363-
emitter.ExpectOnce(engine.BuildStartEvent{Attributes: attrA1Alt})
365+
engDeriver.On("StartBuildAsync", mock.Anything, attrA1Alt).Return(nil, nil).Once()
364366
ah.OnEvent(context.Background(), engine.PendingSafeUpdateEvent{
365367
PendingSafe: refA0,
366368
Unsafe: refA0,

op-node/rollup/attributes/testutils.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ package attributes
33
import (
44
"context"
55

6+
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
67
"github.com/ethereum-optimism/optimism/op-service/eth"
8+
"github.com/ethereum-optimism/optimism/op-service/event"
79
"github.com/stretchr/testify/mock"
810
)
911

@@ -28,3 +30,8 @@ func (m *MockEngineController) RequestForkchoiceUpdate(ctx context.Context) {
2830
func (m *MockEngineController) RequestPendingSafeUpdate(ctx context.Context) {
2931
m.Mock.MethodCalled("RequestPendingSafeUpdate", ctx)
3032
}
33+
34+
func (m *MockEngineController) StartBuildAsync(ctx context.Context, attrs *derive.AttributesWithParent) event.Promise0[error] {
35+
m.Mock.MethodCalled("StartBuildAsync", ctx, attrs)
36+
return nil
37+
}

op-node/rollup/clsync/clsync_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
1313
"github.com/ethereum-optimism/optimism/op-node/rollup/engine"
1414
"github.com/ethereum-optimism/optimism/op-service/eth"
15+
"github.com/ethereum-optimism/optimism/op-service/event"
1516
"github.com/ethereum-optimism/optimism/op-service/testlog"
1617
"github.com/ethereum-optimism/optimism/op-service/testutils"
1718
"github.com/ethereum/go-ethereum/common"
@@ -31,6 +32,9 @@ func (f *fakeEngController) TryUpdateLocalSafe(ctx context.Context, ref eth.L2Bl
3132
}
3233
func (f *fakeEngController) RequestPendingSafeUpdate(ctx context.Context) {
3334
}
35+
func (f *fakeEngController) StartBuildAsync(ctx context.Context, attrs *derive.AttributesWithParent) event.Promise0[error] {
36+
return nil
37+
}
3438

3539
func TestCLSync_InvalidPayloadDropsHead(t *testing.T) {
3640
logger := testlog.Logger(t, 0)

op-node/rollup/engine/build_start.go

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/ethereum-optimism/optimism/op-node/rollup"
99
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
1010
"github.com/ethereum-optimism/optimism/op-service/eth"
11+
"github.com/ethereum-optimism/optimism/op-service/event"
1112
)
1213

1314
type BuildStartEvent struct {
@@ -18,64 +19,70 @@ func (ev BuildStartEvent) String() string {
1819
return "build-start"
1920
}
2021

21-
func (eq *EngineController) onBuildStart(ctx context.Context, ev BuildStartEvent) {
22+
// startBuild contains the core logic for beginning a block build. It is used by
23+
// StartBuildAsync in both event-system (async) and non-event (sync) contexts.
24+
func (eq *EngineController) startBuild(ctx context.Context, attrs *derive.AttributesWithParent) error {
2225
rpcCtx, cancel := context.WithTimeout(eq.ctx, buildStartTimeout)
2326
defer cancel()
2427

25-
if ev.Attributes.DerivedFrom != (eth.L1BlockRef{}) &&
26-
eq.PendingSafeL2Head().Hash != ev.Attributes.Parent.Hash {
28+
if attrs.DerivedFrom != (eth.L1BlockRef{}) &&
29+
eq.PendingSafeL2Head().Hash != attrs.Parent.Hash {
2730
// Warn about small reorgs, happens when pending safe head is getting rolled back
2831
eq.log.Warn("block-attributes derived from L1 do not build on pending safe head, likely reorg",
29-
"pending_safe", eq.PendingSafeL2Head(), "attributes_parent", ev.Attributes.Parent)
32+
"pending_safe", eq.PendingSafeL2Head(), "attributes_parent", attrs.Parent)
3033
}
3134

3235
fcEvent := ForkchoiceUpdateEvent{
33-
UnsafeL2Head: ev.Attributes.Parent,
36+
UnsafeL2Head: attrs.Parent,
3437
SafeL2Head: eq.safeHead,
3538
FinalizedL2Head: eq.finalizedHead,
3639
}
3740
if fcEvent.UnsafeL2Head.Number < fcEvent.FinalizedL2Head.Number {
3841
err := fmt.Errorf("invalid block-building pre-state, unsafe head %s is behind finalized head %s", fcEvent.UnsafeL2Head, fcEvent.FinalizedL2Head)
3942
eq.emitter.Emit(ctx, rollup.CriticalErrorEvent{Err: err}) // make the node exit, things are very wrong.
40-
return
43+
return err
4144
}
4245
fc := eth.ForkchoiceState{
4346
HeadBlockHash: fcEvent.UnsafeL2Head.Hash,
4447
SafeBlockHash: fcEvent.SafeL2Head.Hash,
4548
FinalizedBlockHash: fcEvent.FinalizedL2Head.Hash,
4649
}
4750
buildStartTime := time.Now()
48-
id, errTyp, err := startPayload(rpcCtx, eq.engine, fc, ev.Attributes.Attributes)
51+
id, errTyp, err := startPayload(rpcCtx, eq.engine, fc, attrs.Attributes)
4952
if err != nil {
5053
switch errTyp {
5154
case BlockInsertTemporaryErr:
5255
// RPC errors are recoverable, we can retry the buffered payload attributes later.
5356
eq.emitter.Emit(ctx, rollup.EngineTemporaryErrorEvent{
5457
Err: fmt.Errorf("temporarily cannot insert new safe block: %w", err),
5558
})
56-
return
5759
case BlockInsertPrestateErr:
5860
eq.emitter.Emit(ctx, rollup.ResetEvent{
5961
Err: fmt.Errorf("need reset to resolve pre-state problem: %w", err),
6062
})
61-
return
6263
case BlockInsertPayloadErr:
63-
eq.emitter.Emit(ctx, BuildInvalidEvent{Attributes: ev.Attributes, Err: err})
64-
return
64+
eq.emitter.Emit(ctx, BuildInvalidEvent{Attributes: attrs, Err: err})
6565
default:
6666
eq.emitter.Emit(ctx, rollup.CriticalErrorEvent{
6767
Err: fmt.Errorf("unknown error type %d: %w", errTyp, err),
6868
})
69-
return
7069
}
70+
return err
7171
}
7272
eq.emitter.Emit(ctx, fcEvent)
7373

7474
eq.emitter.Emit(ctx, BuildStartedEvent{
75-
Info: eth.PayloadInfo{ID: id, Timestamp: uint64(ev.Attributes.Attributes.Timestamp)},
75+
Info: eth.PayloadInfo{ID: id, Timestamp: uint64(attrs.Attributes.Timestamp)},
7676
BuildStarted: buildStartTime,
77-
Concluding: ev.Attributes.Concluding,
78-
DerivedFrom: ev.Attributes.DerivedFrom,
79-
Parent: ev.Attributes.Parent,
77+
Concluding: attrs.Concluding,
78+
DerivedFrom: attrs.DerivedFrom,
79+
Parent: attrs.Parent,
8080
})
81+
return nil
82+
}
83+
84+
func (eq *EngineController) StartBuildAsync(ctx context.Context, attrs *derive.AttributesWithParent) event.Promise0[error] {
85+
return event.Spawn0(ctx, func(ctx context.Context) error {
86+
return eq.startBuild(ctx, attrs)
87+
}, event.WithSpawnLegacyEvent(BuildStartEvent{Attributes: attrs}))
8188
}

op-node/rollup/engine/engine_controller.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ type EngineController struct {
7979
rollupCfg *rollup.Config
8080
elStart time.Time
8181
clock clock.Clock
82-
8382
// TODO(#16917) Remove Event System Refactor Comments
8483
// Event system fields (moved from EngDeriver)
8584
ctx context.Context
@@ -699,9 +698,13 @@ func (d *EngineController) OnEvent(ctx context.Context, ev event.Event) bool {
699698
d.PromoteSafe(ctx, x.Ref, x.Source)
700699
}
701700
case InteropInvalidateBlockEvent:
702-
d.emitter.Emit(ctx, BuildStartEvent{Attributes: x.Attributes})
703-
case BuildStartEvent:
704-
d.onBuildStart(ctx, x)
701+
p := d.StartBuildAsync(ctx, x.Attributes)
702+
if err := p.Await(ctx); err != nil {
703+
d.emitter.Emit(ctx, rollup.CriticalErrorEvent{Err: err})
704+
}
705+
if err, _ := p.Result(); err != nil {
706+
d.emitter.Emit(ctx, rollup.CriticalErrorEvent{Err: err})
707+
}
705708
case BuildStartedEvent:
706709
d.onBuildStarted(ctx, x)
707710
case BuildSealEvent:

op-node/rollup/sequencing/sequencer.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ func (d *Sequencer) OnEvent(ctx context.Context, ev event.Event) bool {
189189
case engine.PayloadSuccessEvent:
190190
d.onPayloadSuccess(x)
191191
case SequencerActionEvent:
192-
d.onSequencerAction(x)
192+
d.onSequencerAction(ctx, x)
193193
case rollup.EngineTemporaryErrorEvent:
194194
d.onEngineTemporaryError(x)
195195
case rollup.ResetEvent:
@@ -345,7 +345,7 @@ func (d *Sequencer) onPayloadSuccess(x engine.PayloadSuccessEvent) {
345345
d.asyncGossip.Clear()
346346
}
347347

348-
func (d *Sequencer) onSequencerAction(ev SequencerActionEvent) {
348+
func (d *Sequencer) onSequencerAction(ctx context.Context, ev SequencerActionEvent) {
349349
d.log.Debug("Sequencer action")
350350
payload := d.asyncGossip.Get()
351351
if payload != nil {
@@ -387,7 +387,7 @@ func (d *Sequencer) onSequencerAction(ev SequencerActionEvent) {
387387
})
388388
} else if d.latest == (BuildingState{}) {
389389
// If we have not started building anything, start building.
390-
d.startBuildingBlock()
390+
d.startBuildingBlock(ctx)
391391
}
392392
}
393393
}
@@ -483,8 +483,7 @@ func (d *Sequencer) setLatestHead(head eth.L2BlockRef) {
483483
}
484484

485485
// StartBuildingBlock initiates a block building job on top of the given L2 head, safe and finalized blocks, and using the provided l1Origin.
486-
func (d *Sequencer) startBuildingBlock() {
487-
ctx := d.ctx
486+
func (d *Sequencer) startBuildingBlock(ctx context.Context) {
488487
l2Head := d.latestHead
489488

490489
// If we do not have data to know what to build on, then request a forkchoice update
@@ -601,9 +600,17 @@ func (d *Sequencer) startBuildingBlock() {
601600
// If we get a forkchoice update that conflicts, we will have to abort building.
602601
d.latest = BuildingState{Onto: l2Head}
603602

604-
d.emitter.Emit(d.ctx, engine.BuildStartEvent{
605-
Attributes: withParent,
606-
})
603+
p := d.eng.StartBuildAsync(ctx, withParent)
604+
if err := p.Await(ctx); err != nil {
605+
// context cancelled
606+
return
607+
}
608+
if err, _ := p.Result(); err != nil {
609+
d.log.Error("Failed to start building block", "err", err)
610+
d.emitter.Emit(d.ctx, rollup.CriticalErrorEvent{Err: err})
611+
return
612+
}
613+
// Not awaiting the result as we don't need it.
607614
}
608615

609616
func (d *Sequencer) NextAction() (t time.Time, ok bool) {

0 commit comments

Comments
 (0)