@@ -2,7 +2,6 @@ package driver
22
33import (
44 "context"
5- "errors"
65
76 "github.com/ethereum-optimism/optimism/op-service/eth"
87 "github.com/ethereum/go-ethereum/log"
@@ -17,8 +16,6 @@ import (
1716 "github.com/ethereum-optimism/optimism/op-service/event"
1817)
1918
20- var errTooManyEvents = errors .New ("way too many events queued up, something is wrong" )
21-
2219type EndCondition interface {
2320 Closing () bool
2421 Result () (eth.L2BlockRef , error )
@@ -27,76 +24,74 @@ type EndCondition interface {
2724type Driver struct {
2825 logger log.Logger
2926
30- events []event.Event
27+ // Event system components
28+ sys event.System
29+ exec * event.CooperativeExec
30+ driverEmiter event.Emitter
3131
32- end EndCondition
33- deriver event.Deriver
32+ end EndCondition
3433}
3534
3635func NewDriver (logger log.Logger , cfg * rollup.Config , depSet derive.DependencySet , l1Source derive.L1Fetcher ,
3736 l1BlobsSource derive.L1BlobsFetcher , l2Source engine.Engine , targetBlockNum uint64 ) * Driver {
3837
39- d := & Driver {
40- logger : logger ,
41- }
38+ exec := event .NewCooperative (context .Background ())
39+ sys := event .NewSystem (logger , exec )
4240
41+ // Create derivation pipeline and register as deriver (emitter auto-attached)
4342 pipeline := derive .NewDerivationPipeline (logger , cfg , depSet , l1Source , l1BlobsSource , altda .Disabled , l2Source , metrics .NoopMetrics , false )
4443 pipelineDeriver := derive .NewPipelineDeriver (context .Background (), pipeline )
45- pipelineDeriver . AttachEmitter ( d )
44+ sys . Register ( "pipeline" , pipelineDeriver )
4645
47- ec := engine .NewEngineController (context .Background (), l2Source , logger , metrics .NoopMetrics , cfg , & sync.Config {SyncMode : sync .CLSync }, d )
48- syncCfg := & sync.Config {SyncMode : sync .CLSync }
46+ // Engine controller needs an emitter at construction time
47+ ecEmitter := sys .Register ("engine-controller" , nil )
48+ ec := engine .NewEngineController (context .Background (), l2Source , logger , metrics .NoopMetrics , cfg , & sync.Config {SyncMode : sync .CLSync }, ecEmitter )
49+ // And also needs to be registered as a deriver to consume events
50+ sys .Register ("engine" , ec )
4951
52+ // Attributes handler only used as a resetter in this client path
5053 attrHandler := attributes .NewAttributesHandler (logger , cfg , context .Background (), l2Source , ec )
5154 ec .SetAttributesResetter (attrHandler )
5255 ec .SetPipelineResetter (pipelineDeriver )
5356
57+ // Register engine reset deriver
58+ syncCfg := & sync.Config {SyncMode : sync .CLSync }
5459 engResetDeriv := engine .NewEngineResetDeriver (context .Background (), logger , cfg , l1Source , l2Source , syncCfg )
55- engResetDeriv . AttachEmitter ( d )
60+ sys . Register ( "engine-reset" , engResetDeriv )
5661 engResetDeriv .SetEngController (ec )
5762
63+ // Program deriver coordinates high-level flow
5864 prog := & ProgramDeriver {
5965 logger : logger ,
60- Emitter : d ,
6166 engineController : ec ,
6267 closing : false ,
6368 result : eth.L2BlockRef {},
6469 targetBlockNum : targetBlockNum ,
6570 }
71+ sys .Register ("program" , prog )
6672
67- d .deriver = & event.DeriverMux {
68- prog ,
69- ec ,
70- pipelineDeriver ,
71- engResetDeriv ,
73+ d := & Driver {
74+ logger : logger ,
75+ sys : sys ,
76+ exec : exec ,
77+ driverEmiter : sys .Register ("driver" , nil ),
78+ end : prog ,
7279 }
73- d .end = prog
74-
7580 return d
7681}
7782
78- func (d * Driver ) Emit (ctx context.Context , ev event.Event ) {
79- if d .end .Closing () {
80- return
81- }
82- d .events = append (d .events , ev )
83- }
84-
8583func (d * Driver ) RunComplete () (eth.L2BlockRef , error ) {
8684 // Initial reset
87- d .Emit (context .Background (), engine.ResetEngineRequestEvent {})
85+ d .driverEmiter . Emit (context .Background (), engine.ResetEngineRequestEvent {})
8886
8987 for ! d .end .Closing () {
90- if len (d .events ) == 0 {
91- d .logger .Info ("Derivation complete: no further data to process" )
92- return d .end .Result ()
93- }
94- if len (d .events ) > 10000 { // sanity check, in case of bugs. Better than going OOM.
95- return eth.L2BlockRef {}, errTooManyEvents
88+ // Drain all queued events
89+ _ = d .exec .Drain ()
90+ if d .end .Closing () {
91+ break
9692 }
97- ev := d .events [0 ]
98- d .events = d .events [1 :]
99- d .deriver .OnEvent (context .Background (), ev )
93+ // Await more events to avoid busy-spinning
94+ <- d .exec .Await ()
10095 }
10196 return d .end .Result ()
10297}
0 commit comments