Skip to content

Commit 3664400

Browse files
committed
Cooperative Event Scheduling
1 parent c813b04 commit 3664400

File tree

7 files changed

+1341
-2
lines changed

7 files changed

+1341
-2
lines changed

op-node/node/node.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ func (n *OpNode) init(ctx context.Context, cfg *config.Config) error {
173173

174174
func (n *OpNode) initEventSystem() {
175175
// This executor will be configurable in the future, for parallel event processing
176-
executor := event.NewGlobalSynchronous(n.resourcesCtx).WithMetrics(n.metrics)
176+
executor := event.NewCooperative(n.resourcesCtx).WithMetrics(n.metrics)
177177
sys := event.NewSystem(n.log, executor)
178178
sys.AddTracer(event.NewMetricsTracer(n.metrics))
179179
sys.Register("node", event.DeriverFunc(n.onEvent))

op-service/event/async_fn.go

Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
package event
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strconv"
7+
"sync"
8+
"sync/atomic"
9+
)
10+
11+
// Async arities with explicit error type parameter E.
12+
type Async0[E interface{ error }] func(ctx context.Context) E
13+
type Async1[T any, E interface{ error }] func(ctx context.Context) (T, E)
14+
type Async2[T1 any, T2 any, E interface{ error }] func(ctx context.Context) (T1, T2, E)
15+
16+
// asyncInvokeEvent schedules a function by ID via the cooperative loop.
17+
type asyncInvokeEvent struct{ id string }
18+
19+
func (asyncInvokeEvent) String() string { return "async-invoke" }
20+
21+
// taskRunner receives asyncInvokeEvent and runs the associated function.
22+
type taskRunner struct {
23+
exec *CooperativeExec
24+
em Emitter
25+
sys System
26+
mu sync.Mutex
27+
tasks map[string]func(ctx context.Context)
28+
ctr atomic.Uint64
29+
}
30+
31+
func (tr *taskRunner) nextID() string { return strconv.FormatUint(tr.ctr.Add(1), 10) }
32+
33+
func (tr *taskRunner) OnEvent(ctx context.Context, ev Event) bool {
34+
a, ok := ev.(asyncInvokeEvent)
35+
if !ok {
36+
return false
37+
}
38+
tr.mu.Lock()
39+
fn := tr.tasks[a.id]
40+
if fn == nil {
41+
tr.mu.Unlock()
42+
return false
43+
}
44+
delete(tr.tasks, a.id)
45+
tr.mu.Unlock()
46+
// ensure the context is system-aware for ctx-only APIs
47+
ctx = WithSystem(ctx, tr.sys)
48+
fn(ctx)
49+
return true
50+
}
51+
52+
var (
53+
runnersMu sync.Mutex
54+
// runners is keyed by System, then by Priority. This allows us to respect spawn priority
55+
// while keeping the ability to collapse to a single Normal-priority runner.
56+
runners = make(map[*Sys]map[Priority]*taskRunner)
57+
// Toggle to enable/disable respecting SpawnN priority. If false, all spawns use Normal.
58+
respectSpawnPriority = true
59+
)
60+
61+
// SetSpawnPriorityEnabled toggles whether SpawnN respects the provided priority (default: true).
62+
func SetSpawnPriorityEnabled(enabled bool) { respectSpawnPriority = enabled }
63+
64+
func ensureTaskRunner(s System, pr Priority) (*taskRunner, error) {
65+
sys, ok := s.(*Sys)
66+
if !ok {
67+
return nil, fmt.Errorf("unexpected system type")
68+
}
69+
exec, ok := sys.executor.(*CooperativeExec)
70+
if !ok {
71+
return nil, fmt.Errorf("cooperative executor required")
72+
}
73+
// Optionally collapse to Normal priority runner.
74+
effectivePr := pr
75+
if !respectSpawnPriority {
76+
effectivePr = Normal
77+
}
78+
79+
runnersMu.Lock()
80+
defer runnersMu.Unlock()
81+
byPr, ok := runners[sys]
82+
if !ok {
83+
byPr = make(map[Priority]*taskRunner)
84+
runners[sys] = byPr
85+
}
86+
if tr := byPr[effectivePr]; tr != nil {
87+
return tr, nil
88+
}
89+
// Register a new runner with both executor and emitter priorities set.
90+
tr := &taskRunner{exec: exec, sys: sys, tasks: make(map[string]func(context.Context))}
91+
name := "async-runner-" + strconv.Itoa(int(effectivePr))
92+
em := sys.Register(name, tr, WithExecPriority(effectivePr), WithEmitPriority(effectivePr))
93+
tr.em = em
94+
byPr[effectivePr] = tr
95+
return tr, nil
96+
}
97+
98+
// Spawn options to configure priority and name (optional).
99+
type SpawnOption func(*SpawnConfig)
100+
101+
type SpawnConfig struct {
102+
Priority Priority
103+
Name string
104+
}
105+
106+
func defaultSpawnConfig() *SpawnConfig {
107+
return &SpawnConfig{Priority: Normal, Name: ""}
108+
}
109+
110+
func WithSpawnPriority(p Priority) SpawnOption { return func(c *SpawnConfig) { c.Priority = p } }
111+
func WithSpawnName(name string) SpawnOption { return func(c *SpawnConfig) { c.Name = name } }
112+
113+
// Context-based spawn helpers that derive the System from the context.
114+
// Spawn0 schedules f and returns a Promise0.
115+
func Spawn0[E interface{ error }](ctx context.Context, f Async0[E], opts ...SpawnOption) (Promise0[E], error) {
116+
sys, ok := SystemFromContext(ctx)
117+
if !ok {
118+
var p Promise0[E]
119+
return p, fmt.Errorf("no system in context")
120+
}
121+
// implement using taskRunner, resolving the promise on completion
122+
cfg := defaultSpawnConfig()
123+
for _, o := range opts {
124+
o(cfg)
125+
}
126+
tr, _ := ensureTaskRunner(sys, cfg.Priority)
127+
id := tr.nextID()
128+
p, r := NewPromise0[E]()
129+
tr.mu.Lock()
130+
tr.tasks[id] = func(ctx context.Context) {
131+
err := f(ctx)
132+
if any(err) != nil {
133+
r.Reject(err)
134+
return
135+
}
136+
r.Resolve()
137+
}
138+
tr.mu.Unlock()
139+
tr.em.Emit(ctx, asyncInvokeEvent{id: id})
140+
return p, nil
141+
}
142+
143+
// Spawn1 schedules f and returns a Promise1.
144+
func Spawn1[T any, E interface{ error }](ctx context.Context, f Async1[T, E], opts ...SpawnOption) (Promise1[T, E], error) {
145+
sys, ok := SystemFromContext(ctx)
146+
if !ok {
147+
var p Promise1[T, E]
148+
return p, fmt.Errorf("no system in context")
149+
}
150+
cfg := defaultSpawnConfig()
151+
for _, o := range opts {
152+
o(cfg)
153+
}
154+
tr, _ := ensureTaskRunner(sys, cfg.Priority)
155+
id := tr.nextID()
156+
p, r := NewPromise1[T, E]()
157+
tr.mu.Lock()
158+
tr.tasks[id] = func(ctx context.Context) {
159+
t, e := f(ctx)
160+
if any(e) != nil {
161+
r.Reject(e)
162+
return
163+
}
164+
r.Resolve(t)
165+
}
166+
tr.mu.Unlock()
167+
tr.em.Emit(ctx, asyncInvokeEvent{id: id})
168+
return p, nil
169+
}
170+
171+
// Spawn2 schedules f and returns a Promise2.
172+
func Spawn2[T1 any, T2 any, E interface{ error }](ctx context.Context, f Async2[T1, T2, E], opts ...SpawnOption) (Promise2[T1, T2, E], error) {
173+
sys, ok := SystemFromContext(ctx)
174+
if !ok {
175+
var p Promise2[T1, T2, E]
176+
return p, fmt.Errorf("no system in context")
177+
}
178+
cfg := defaultSpawnConfig()
179+
for _, o := range opts {
180+
o(cfg)
181+
}
182+
tr, _ := ensureTaskRunner(sys, cfg.Priority)
183+
id := tr.nextID()
184+
p, r := NewPromise2[T1, T2, E]()
185+
tr.mu.Lock()
186+
tr.tasks[id] = func(ctx context.Context) {
187+
a, b, e := f(ctx)
188+
if any(e) != nil {
189+
r.Reject(e)
190+
return
191+
}
192+
r.Resolve(a, b)
193+
}
194+
tr.mu.Unlock()
195+
tr.em.Emit(ctx, asyncInvokeEvent{id: id})
196+
return p, nil
197+
}

0 commit comments

Comments
 (0)