Skip to content

Commit 2d489b1

Browse files
committed
Cooperative Event Scheduling
* Update op-program to use cooperative (MT) event loop.
1 parent 5844981 commit 2d489b1

File tree

16 files changed

+2145
-53
lines changed

16 files changed

+2145
-53
lines changed

op-e2e/actions/helpers/l2_verifier.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher,
118118
ctx, cancel := context.WithCancel(context.Background())
119119
t.Cleanup(cancel)
120120

121-
executor := event.NewGlobalSynchronous(ctx)
121+
executor := event.NewCooperative(ctx)
122122
sys := event.NewSystem(log, executor)
123123
t.Cleanup(sys.Stop)
124124
opts := event.WithEmitLimiter(

op-node/node/node.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ func (n *OpNode) init(ctx context.Context, cfg *config.Config) error {
173173

174174
func (n *OpNode) initEventSystem() {
175175
// This executor will be configurable in the future, for parallel event processing
176-
executor := event.NewGlobalSynchronous(n.resourcesCtx).WithMetrics(n.metrics)
176+
executor := event.NewCooperative(n.resourcesCtx).WithMetrics(n.metrics)
177177
sys := event.NewSystem(n.log, executor)
178178
sys.AddTracer(event.NewMetricsTracer(n.metrics))
179179
sys.Register("node", event.DeriverFunc(n.onEvent))

op-program/client/driver/driver.go

Lines changed: 34 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package driver
22

33
import (
44
"context"
5-
"errors"
65

76
"github.com/ethereum-optimism/optimism/op-service/eth"
87
"github.com/ethereum/go-ethereum/log"
@@ -17,8 +16,6 @@ import (
1716
"github.com/ethereum-optimism/optimism/op-service/event"
1817
)
1918

20-
var errTooManyEvents = errors.New("way too many events queued up, something is wrong")
21-
2219
type EndCondition interface {
2320
Closing() bool
2421
Result() (eth.L2BlockRef, error)
@@ -27,76 +24,75 @@ type EndCondition interface {
2724
type Driver struct {
2825
logger log.Logger
2926

30-
events []event.Event
27+
// Event system components
28+
sys event.System
29+
exec *event.SingleThreadCooperativeExec
30+
driverEmiter event.Emitter
3131

32-
end EndCondition
33-
deriver event.Deriver
32+
end EndCondition
3433
}
3534

3635
func NewDriver(logger log.Logger, cfg *rollup.Config, depSet derive.DependencySet, l1Source derive.L1Fetcher,
3736
l1BlobsSource derive.L1BlobsFetcher, l2Source engine.Engine, targetBlockNum uint64) *Driver {
3837

39-
d := &Driver{
40-
logger: logger,
41-
}
38+
exec := event.NewSingleThreadCooperative(context.Background())
39+
sys := event.NewSystem(logger, exec)
4240

41+
// Create derivation pipeline and register as deriver (emitter auto-attached)
4342
pipeline := derive.NewDerivationPipeline(logger, cfg, depSet, l1Source, l1BlobsSource, altda.Disabled, l2Source, metrics.NoopMetrics, false)
4443
pipelineDeriver := derive.NewPipelineDeriver(context.Background(), pipeline)
45-
pipelineDeriver.AttachEmitter(d)
44+
sys.Register("pipeline", pipelineDeriver)
4645

47-
ec := engine.NewEngineController(context.Background(), l2Source, logger, metrics.NoopMetrics, cfg, &sync.Config{SyncMode: sync.CLSync}, d)
48-
syncCfg := &sync.Config{SyncMode: sync.CLSync}
46+
// Engine controller needs an emitter at construction time
47+
ecEmitter := sys.Register("engine-controller", nil)
48+
ec := engine.NewEngineController(context.Background(), l2Source, logger, metrics.NoopMetrics, cfg, &sync.Config{SyncMode: sync.CLSync}, ecEmitter)
49+
// And also needs to be registered as a deriver to consume events
50+
sys.Register("engine", ec)
4951

52+
// Attributes handler only used as a resetter in this client path
5053
attrHandler := attributes.NewAttributesHandler(logger, cfg, context.Background(), l2Source, ec)
5154
ec.SetAttributesResetter(attrHandler)
5255
ec.SetPipelineResetter(pipelineDeriver)
5356

57+
// Register engine reset deriver
58+
syncCfg := &sync.Config{SyncMode: sync.CLSync}
5459
engResetDeriv := engine.NewEngineResetDeriver(context.Background(), logger, cfg, l1Source, l2Source, syncCfg)
55-
engResetDeriv.AttachEmitter(d)
60+
sys.Register("engine-reset", engResetDeriv)
5661
engResetDeriv.SetEngController(ec)
5762

63+
// Program deriver coordinates high-level flow
5864
prog := &ProgramDeriver{
5965
logger: logger,
60-
Emitter: d,
6166
engineController: ec,
6267
closing: false,
6368
result: eth.L2BlockRef{},
6469
targetBlockNum: targetBlockNum,
6570
}
71+
sys.Register("program", prog)
6672

67-
d.deriver = &event.DeriverMux{
68-
prog,
69-
ec,
70-
pipelineDeriver,
71-
engResetDeriv,
73+
d := &Driver{
74+
logger: logger,
75+
sys: sys,
76+
exec: exec,
77+
driverEmiter: sys.Register("driver", nil),
78+
end: prog,
7279
}
73-
d.end = prog
74-
7580
return d
7681
}
7782

78-
func (d *Driver) Emit(ctx context.Context, ev event.Event) {
79-
if d.end.Closing() {
80-
return
81-
}
82-
d.events = append(d.events, ev)
83-
}
84-
8583
func (d *Driver) RunComplete() (eth.L2BlockRef, error) {
8684
// Initial reset
87-
d.Emit(context.Background(), engine.ResetEngineRequestEvent{})
85+
ctx := event.WithSystem(context.Background(), d.sys)
86+
d.driverEmiter.Emit(ctx, engine.ResetEngineRequestEvent{})
8887

8988
for !d.end.Closing() {
90-
if len(d.events) == 0 {
91-
d.logger.Info("Derivation complete: no further data to process")
92-
return d.end.Result()
93-
}
94-
if len(d.events) > 10000 { // sanity check, in case of bugs. Better than going OOM.
95-
return eth.L2BlockRef{}, errTooManyEvents
89+
// Drain all queued events
90+
_ = d.exec.Drain()
91+
if d.end.Closing() {
92+
break
9693
}
97-
ev := d.events[0]
98-
d.events = d.events[1:]
99-
d.deriver.OnEvent(context.Background(), ev)
94+
// Await more events to avoid busy-spinning
95+
<-d.exec.Await()
10096
}
10197
return d.end.Result()
10298
}

op-program/client/driver/driver_test.go

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,20 @@ func TestDriver(t *testing.T) {
3333
newTestDriver := func(t *testing.T, onEvent func(d *Driver, end *fakeEnd, ev event.Event)) *Driver {
3434
logger := testlog.Logger(t, log.LevelInfo)
3535
end := &fakeEnd{}
36+
37+
exec := event.NewSingleThreadCooperative(context.Background())
38+
sys := event.NewSystem(logger, exec)
3639
d := &Driver{
37-
logger: logger,
38-
end: end,
40+
logger: logger,
41+
sys: sys,
42+
exec: exec,
43+
driverEmiter: sys.Register("driver", nil),
44+
end: end,
3945
}
40-
d.deriver = event.DeriverFunc(func(ctx context.Context, ev event.Event) bool {
46+
sys.Register("test-deriver", event.DeriverFunc(func(ctx context.Context, ev event.Event) bool {
4147
onEvent(d, end, ev)
4248
return true
43-
})
49+
}))
4450
return d
4551
}
4652

@@ -69,7 +75,7 @@ func TestDriver(t *testing.T) {
6975
return
7076
}
7177
count += 1
72-
d.Emit(context.Background(), TestEvent{})
78+
d.driverEmiter.Emit(context.Background(), TestEvent{})
7379
})
7480
_, err := d.RunComplete()
7581
require.NoError(t, err)
@@ -84,7 +90,7 @@ func TestDriver(t *testing.T) {
8490
return
8591
}
8692
count += 1
87-
d.Emit(context.Background(), TestEvent{})
93+
d.driverEmiter.Emit(context.Background(), TestEvent{})
8894
})
8995
_, err := d.RunComplete()
9096
require.ErrorIs(t, mockErr, err)
@@ -93,8 +99,10 @@ func TestDriver(t *testing.T) {
9399
t.Run("exhaust events", func(t *testing.T) {
94100
count := 0
95101
d := newTestDriver(t, func(d *Driver, end *fakeEnd, ev event.Event) {
96-
if count < 3 { // stop generating events after a while, without changing end condition
97-
d.Emit(context.Background(), TestEvent{})
102+
if count < 3 { // generate a few events, then stop and close
103+
d.driverEmiter.Emit(context.Background(), TestEvent{})
104+
} else {
105+
end.closing = true
98106
}
99107
count += 1
100108
})
@@ -107,8 +115,10 @@ func TestDriver(t *testing.T) {
107115
count := 0
108116
d := newTestDriver(t, func(d *Driver, end *fakeEnd, ev event.Event) {
109117
if count < 3 {
110-
d.Emit(context.Background(), TestEvent{})
111-
d.Emit(context.Background(), TestEvent{})
118+
d.driverEmiter.Emit(context.Background(), TestEvent{})
119+
d.driverEmiter.Emit(context.Background(), TestEvent{})
120+
} else {
121+
end.closing = true
112122
}
113123
count += 1
114124
})

op-program/client/driver/program.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ type ProgramDeriver struct {
3434
targetBlockNum uint64
3535
}
3636

37+
// AttachEmitter implements event.AttachEmitter to receive the emitter when
38+
// registered with the event System.
39+
func (d *ProgramDeriver) AttachEmitter(em event.Emitter) {
40+
d.Emitter = em
41+
}
42+
3743
func (d *ProgramDeriver) Closing() bool {
3844
return d.closing
3945
}
@@ -87,9 +93,10 @@ func (d *ProgramDeriver) OnEvent(ctx context.Context, ev event.Event) bool {
8793
d.closing = true
8894
}
8995
case derive.DeriverIdleEvent:
90-
// We don't close the deriver yet, as the engine may still be processing events to reach
91-
// the target. A ForkchoiceUpdateEvent will close the deriver when the target is reached.
96+
// No more L1 data to process. In trace-extension scenarios the target block number
97+
// may not be reached; we should stop and return the best safe head observed so far.
9298
d.logger.Info("Derivation complete: no further L1 data to process")
99+
d.closing = true
93100
case rollup.ResetEvent:
94101
d.closing = true
95102
d.resultError = fmt.Errorf("unexpected reset error: %w", x.Err)

0 commit comments

Comments
 (0)