Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion op-e2e/actions/helpers/l2_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher,
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

executor := event.NewGlobalSynchronous(ctx)
executor := event.NewCooperative(ctx)
sys := event.NewSystem(log, executor)
t.Cleanup(sys.Stop)
opts := event.WithEmitLimiter(
Expand Down
2 changes: 1 addition & 1 deletion op-node/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (n *OpNode) init(ctx context.Context, cfg *config.Config) error {

func (n *OpNode) initEventSystem() {
// This executor will be configurable in the future, for parallel event processing
executor := event.NewGlobalSynchronous(n.resourcesCtx).WithMetrics(n.metrics)
executor := event.NewCooperative(n.resourcesCtx).WithMetrics(n.metrics)
sys := event.NewSystem(n.log, executor)
sys.AddTracer(event.NewMetricsTracer(n.metrics))
sys.Register("node", event.DeriverFunc(n.onEvent))
Expand Down
77 changes: 39 additions & 38 deletions op-program/client/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package driver

import (
"context"
"errors"

"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/log"
Expand All @@ -17,8 +16,6 @@ import (
"github.com/ethereum-optimism/optimism/op-service/event"
)

var errTooManyEvents = errors.New("way too many events queued up, something is wrong")

type EndCondition interface {
Closing() bool
Result() (eth.L2BlockRef, error)
Expand All @@ -27,76 +24,80 @@ type EndCondition interface {
type Driver struct {
logger log.Logger

events []event.Event
// Event system components
sys event.System
exec *event.SingleThreadCooperativeExec
driverEmiter event.Emitter

end EndCondition
deriver event.Deriver
end EndCondition
}

func NewDriver(logger log.Logger, cfg *rollup.Config, depSet derive.DependencySet, l1Source derive.L1Fetcher,
l1BlobsSource derive.L1BlobsFetcher, l2Source engine.Engine, targetBlockNum uint64) *Driver {

d := &Driver{
logger: logger,
}
exec := event.NewSingleThreadCooperative(context.Background())
sys := event.NewSystem(logger, exec)

// Create derivation pipeline and register as deriver (emitter auto-attached)
pipeline := derive.NewDerivationPipeline(logger, cfg, depSet, l1Source, l1BlobsSource, altda.Disabled, l2Source, metrics.NoopMetrics, false)
pipelineDeriver := derive.NewPipelineDeriver(context.Background(), pipeline)
pipelineDeriver.AttachEmitter(d)
sys.Register("pipeline", pipelineDeriver)

ec := engine.NewEngineController(context.Background(), l2Source, logger, metrics.NoopMetrics, cfg, &sync.Config{SyncMode: sync.CLSync}, d)
syncCfg := &sync.Config{SyncMode: sync.CLSync}
// Engine controller needs an emitter at construction time
ecEmitter := sys.Register("engine-controller", nil)
ec := engine.NewEngineController(context.Background(), l2Source, logger, metrics.NoopMetrics, cfg, &sync.Config{SyncMode: sync.CLSync}, ecEmitter)
// And also needs to be registered as a deriver to consume events
sys.Register("engine", ec)

// Attributes handler only used as a resetter in this client path
attrHandler := attributes.NewAttributesHandler(logger, cfg, context.Background(), l2Source, ec)
ec.SetAttributesResetter(attrHandler)
ec.SetPipelineResetter(pipelineDeriver)

// Register engine reset deriver
syncCfg := &sync.Config{SyncMode: sync.CLSync}
engResetDeriv := engine.NewEngineResetDeriver(context.Background(), logger, cfg, l1Source, l2Source, syncCfg)
engResetDeriv.AttachEmitter(d)
sys.Register("engine-reset", engResetDeriv)
engResetDeriv.SetEngController(ec)

// Program deriver coordinates high-level flow
prog := &ProgramDeriver{
logger: logger,
Emitter: d,
engineController: ec,
closing: false,
result: eth.L2BlockRef{},
targetBlockNum: targetBlockNum,
// In the op-program trace-extension path there is no external sequencer
// injecting more events after the pipeline idles; close on idle.
closeOnIdle: true,
}
sys.Register("program", prog)

d.deriver = &event.DeriverMux{
prog,
ec,
pipelineDeriver,
engResetDeriv,
d := &Driver{
logger: logger,
sys: sys,
exec: exec,
driverEmiter: sys.Register("driver", nil),
end: prog,
}
d.end = prog

return d
}

func (d *Driver) Emit(ctx context.Context, ev event.Event) {
if d.end.Closing() {
return
}
d.events = append(d.events, ev)
}

func (d *Driver) RunComplete() (eth.L2BlockRef, error) {
// Initial reset
d.Emit(context.Background(), engine.ResetEngineRequestEvent{})
ctx := event.WithSystem(context.Background(), d.sys)
d.driverEmiter.Emit(ctx, engine.ResetEngineRequestEvent{})

// Drive the single-thread executor inline until completion.
// We avoid a separate Drive goroutine to ensure run-to-completion without await races.
for !d.end.Closing() {
if len(d.events) == 0 {
d.logger.Info("Derivation complete: no further data to process")
return d.end.Result()
}
if len(d.events) > 10000 { // sanity check, in case of bugs. Better than going OOM.
return eth.L2BlockRef{}, errTooManyEvents
// Drain any queued events synchronously
_ = d.exec.Drain()
if d.end.Closing() {
break
}
ev := d.events[0]
d.events = d.events[1:]
d.deriver.OnEvent(context.Background(), ev)
// Await for more events to avoid busy spinning
<-d.exec.Await()
}
return d.end.Result()
}
30 changes: 20 additions & 10 deletions op-program/client/driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,20 @@ func TestDriver(t *testing.T) {
newTestDriver := func(t *testing.T, onEvent func(d *Driver, end *fakeEnd, ev event.Event)) *Driver {
logger := testlog.Logger(t, log.LevelInfo)
end := &fakeEnd{}

exec := event.NewSingleThreadCooperative(context.Background())
sys := event.NewSystem(logger, exec)
d := &Driver{
logger: logger,
end: end,
logger: logger,
sys: sys,
exec: exec,
driverEmiter: sys.Register("driver", nil),
end: end,
}
d.deriver = event.DeriverFunc(func(ctx context.Context, ev event.Event) bool {
sys.Register("test-deriver", event.DeriverFunc(func(ctx context.Context, ev event.Event) bool {
onEvent(d, end, ev)
return true
})
}))
return d
}

Expand Down Expand Up @@ -69,7 +75,7 @@ func TestDriver(t *testing.T) {
return
}
count += 1
d.Emit(context.Background(), TestEvent{})
d.driverEmiter.Emit(context.Background(), TestEvent{})
})
_, err := d.RunComplete()
require.NoError(t, err)
Expand All @@ -84,7 +90,7 @@ func TestDriver(t *testing.T) {
return
}
count += 1
d.Emit(context.Background(), TestEvent{})
d.driverEmiter.Emit(context.Background(), TestEvent{})
})
_, err := d.RunComplete()
require.ErrorIs(t, mockErr, err)
Expand All @@ -93,8 +99,10 @@ func TestDriver(t *testing.T) {
t.Run("exhaust events", func(t *testing.T) {
count := 0
d := newTestDriver(t, func(d *Driver, end *fakeEnd, ev event.Event) {
if count < 3 { // stop generating events after a while, without changing end condition
d.Emit(context.Background(), TestEvent{})
if count < 3 { // generate a few events, then stop and close
d.driverEmiter.Emit(context.Background(), TestEvent{})
} else {
end.closing = true
}
count += 1
})
Expand All @@ -107,8 +115,10 @@ func TestDriver(t *testing.T) {
count := 0
d := newTestDriver(t, func(d *Driver, end *fakeEnd, ev event.Event) {
if count < 3 {
d.Emit(context.Background(), TestEvent{})
d.Emit(context.Background(), TestEvent{})
d.driverEmiter.Emit(context.Background(), TestEvent{})
d.driverEmiter.Emit(context.Background(), TestEvent{})
} else {
end.closing = true
}
count += 1
})
Expand Down
17 changes: 14 additions & 3 deletions op-program/client/driver/program.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ type ProgramDeriver struct {
result eth.L2BlockRef
resultError error
targetBlockNum uint64
// If true, close the program when derivation goes idle (trace-extension case).
closeOnIdle bool
}

// AttachEmitter implements event.AttachEmitter to receive the emitter when
// registered with the event System.
func (d *ProgramDeriver) AttachEmitter(em event.Emitter) {
d.Emitter = em
}

func (d *ProgramDeriver) Closing() bool {
Expand Down Expand Up @@ -87,9 +95,12 @@ func (d *ProgramDeriver) OnEvent(ctx context.Context, ev event.Event) bool {
d.closing = true
}
case derive.DeriverIdleEvent:
// We don't close the deriver yet, as the engine may still be processing events to reach
// the target. A ForkchoiceUpdateEvent will close the deriver when the target is reached.
d.logger.Info("Derivation complete: no further L1 data to process")
if d.closeOnIdle {
d.logger.Info("Derivation complete: no further L1 data to process")
d.closing = true
} else {
d.logger.Info("Derivation idle: no further L1 data to process")
}
case rollup.ResetEvent:
d.closing = true
d.resultError = fmt.Errorf("unexpected reset error: %w", x.Err)
Expand Down
Loading