Skip to content

Commit 3c99535

Browse files
authored
chore(engine): provide worker (#19588)
The new worker package connects to an instance of a scheduler (#19570) for task assignment and execution. A worker spawns a fixed number of threads, each of which execute one task at a time. Signed-off-by: Robert Fratto <[email protected]>
1 parent 9a68900 commit 3c99535

File tree

16 files changed

+1810
-60
lines changed

16 files changed

+1810
-60
lines changed

pkg/engine/internal/executor/executor.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ type Config struct {
2525
Bucket objstore.Bucket
2626

2727
MergePrefetchCount int
28+
29+
// GetExternalInputs is an optional function called for each node in the
30+
// plan. If GetExternalInputs returns a non-nil slice of Pipelines, they
31+
// will be used as inputs to the pipeline of node.
32+
GetExternalInputs func(ctx context.Context, node physical.Node) []Pipeline
2833
}
2934

3035
func Run(ctx context.Context, cfg Config, plan *physical.Plan, logger log.Logger) Pipeline {
@@ -35,6 +40,7 @@ func Run(ctx context.Context, cfg Config, plan *physical.Plan, logger log.Logger
3540
bucket: cfg.Bucket,
3641
logger: logger,
3742
evaluator: newExpressionEvaluator(),
43+
getExternalInputs: cfg.GetExternalInputs,
3844
}
3945
if plan == nil {
4046
return errorPipeline(ctx, errors.New("plan is nil"))
@@ -55,6 +61,8 @@ type Context struct {
5561
evaluator expressionEvaluator
5662
bucket objstore.Bucket
5763

64+
getExternalInputs func(ctx context.Context, node physical.Node) []Pipeline
65+
5866
mergePrefetchCount int
5967
}
6068

@@ -65,6 +73,10 @@ func (c *Context) execute(ctx context.Context, node physical.Node) Pipeline {
6573
inputs = append(inputs, c.execute(ctx, child))
6674
}
6775

76+
if c.getExternalInputs != nil {
77+
inputs = append(inputs, c.getExternalInputs(ctx, node)...)
78+
}
79+
6880
switch n := node.(type) {
6981
case *physical.DataObjScan:
7082
// DataObjScan reads from object storage to determine the full pipeline to
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package scheduler
2+
3+
import (
4+
"context"
5+
6+
"github.com/grafana/loki/v3/pkg/engine/internal/workflow"
7+
)
8+
9+
// streamNotification is a deferred call to a stream event handler.
10+
type streamNotification struct {
11+
Handler workflow.StreamEventHandler
12+
Stream *workflow.Stream
13+
NewState workflow.StreamState
14+
}
15+
16+
// taskNotification is a deferred call to a task event handler.
17+
type taskNotification struct {
18+
Handler workflow.TaskEventHandler
19+
Task *workflow.Task
20+
NewStatus workflow.TaskStatus
21+
}
22+
23+
// A notifier is responsible for invoking [workflow.StreamEventHandler] and
24+
// [workflow.TaskEventHandler].
25+
//
26+
// Notifier is used to avoid deadlocks so notifications can be held without any
27+
// mutexes held.
28+
type notifier struct {
29+
streamNotifications []streamNotification
30+
taskNotifications []taskNotification
31+
}
32+
33+
// AddStreamEvent buffers a stream event notification.
34+
func (n *notifier) AddStreamEvent(notification streamNotification) {
35+
n.streamNotifications = append(n.streamNotifications, notification)
36+
}
37+
38+
// AddTaskEvent buffers a task event notification.
39+
func (n *notifier) AddTaskEvent(notification taskNotification) {
40+
n.taskNotifications = append(n.taskNotifications, notification)
41+
}
42+
43+
// Notify handles all pending notifications.
44+
func (n *notifier) Notify(ctx context.Context) {
45+
for _, ev := range n.streamNotifications {
46+
ev.Handler(ctx, ev.Stream, ev.NewState)
47+
}
48+
49+
for _, ev := range n.taskNotifications {
50+
ev.Handler(ctx, ev.Task, ev.NewStatus)
51+
}
52+
}

pkg/engine/internal/scheduler/scheduler.go

Lines changed: 75 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,9 @@ func nudgeSemaphore(sema chan struct{}) {
184184
}
185185

186186
func (s *Scheduler) handleTaskStatus(ctx context.Context, msg wire.TaskStatusMessage) error {
187+
var n notifier
188+
defer n.Notify(ctx)
189+
187190
s.resourcesMut.Lock()
188191
defer s.resourcesMut.Unlock()
189192

@@ -195,35 +198,41 @@ func (s *Scheduler) handleTaskStatus(ctx context.Context, msg wire.TaskStatusMes
195198
changed, err := task.setState(msg.Status)
196199
if err != nil {
197200
return err
201+
} else if changed {
202+
// Notify the owner about the change.
203+
n.AddTaskEvent(taskNotification{
204+
Handler: task.handler,
205+
Task: task.inner,
206+
NewStatus: msg.Status,
207+
})
198208
}
199209

200210
// If the task's current state is terminal, we can untrack it now. For
201211
// simplicity, we lazily check this even if the state hasn't changed.
202212
if task.status.State.Terminal() {
203-
s.deleteTask(task)
213+
s.deleteTask(ctx, &n, task)
204214
}
205215

206-
if changed {
207-
// Inform the owner about the change.
208-
task.handler(ctx, task.inner, msg.Status)
209-
}
210216
return nil
211217
}
212218

213219
func (s *Scheduler) handleStreamStatus(ctx context.Context, msg wire.StreamStatusMessage) error {
220+
var n notifier
221+
defer n.Notify(ctx)
222+
214223
s.resourcesMut.Lock()
215224
defer s.resourcesMut.Unlock()
216225

217226
stream, found := s.streams[msg.StreamID]
218227
if !found {
219228
return fmt.Errorf("stream %s not found", msg.StreamID)
220229
}
221-
return s.changeStreamState(ctx, stream, msg.State)
230+
return s.changeStreamState(ctx, &n, stream, msg.State)
222231
}
223232

224233
// changeStreamState updates the state of the target stream. changeStreamState
225234
// must be called while the resourcesMut lock is held.
226-
func (s *Scheduler) changeStreamState(ctx context.Context, target *stream, newState workflow.StreamState) error {
235+
func (s *Scheduler) changeStreamState(ctx context.Context, n *notifier, target *stream, newState workflow.StreamState) error {
227236
changed, err := target.setState(newState)
228237
if err != nil {
229238
return err
@@ -242,7 +251,11 @@ func (s *Scheduler) changeStreamState(ctx context.Context, target *stream, newSt
242251
}
243252

244253
// Inform the owner about the change.
245-
target.handler(ctx, target.inner, newState)
254+
n.AddStreamEvent(streamNotification{
255+
Handler: target.handler,
256+
Stream: target.inner,
257+
NewState: newState,
258+
})
246259
return nil
247260
}
248261

@@ -252,6 +265,9 @@ func (s *Scheduler) changeStreamState(ctx context.Context, target *stream, newSt
252265
// If the reason argument is nil, the tasks are cancelled. Otherwise, they are
253266
// marked as failed with the provided reason.
254267
func (s *Scheduler) abortWorkerTasks(ctx context.Context, worker *wire.Peer, reason error) {
268+
var n notifier
269+
defer n.Notify(ctx)
270+
255271
s.resourcesMut.Lock()
256272
defer s.resourcesMut.Unlock()
257273

@@ -268,11 +284,15 @@ func (s *Scheduler) abortWorkerTasks(ctx context.Context, worker *wire.Peer, rea
268284
continue
269285
}
270286

271-
s.deleteTask(task)
287+
s.deleteTask(ctx, &n, task)
272288

273289
// We only need to inform the handler about the change. There's nothing
274290
// to send to the owner of the task since worker has disconnected.
275-
task.handler(ctx, task.inner, newStatus)
291+
n.AddTaskEvent(taskNotification{
292+
Handler: task.handler,
293+
Task: task.inner,
294+
NewStatus: newStatus,
295+
})
276296
}
277297

278298
delete(s.workerTasks, worker)
@@ -290,6 +310,9 @@ func (s *Scheduler) runAssignLoop(ctx context.Context) error {
290310
}
291311

292312
func (s *Scheduler) assignTasks(ctx context.Context) {
313+
var n notifier
314+
defer n.Notify(ctx)
315+
293316
// We need to grab the lock on resources to prevent stream states from being
294317
// modified while we're assigning the task.
295318
//
@@ -313,7 +336,7 @@ func (s *Scheduler) assignTasks(ctx context.Context) {
313336
// We may have a canceled task in our queue; we take this opportunity to
314337
// clean them up.
315338
if state := task.status.State; state.Terminal() {
316-
s.deleteTask(task)
339+
s.deleteTask(ctx, &n, task)
317340
s.taskQueue = s.taskQueue[1:]
318341
continue
319342
}
@@ -484,6 +507,9 @@ func (s *Scheduler) AddStreams(_ context.Context, handler workflow.StreamEventHa
484507
//
485508
// RemoveStreams returns an error if there are active tasks using the streams.
486509
func (s *Scheduler) RemoveStreams(ctx context.Context, streams ...*workflow.Stream) error {
510+
var n notifier
511+
defer n.Notify(ctx)
512+
487513
s.resourcesMut.Lock()
488514
defer s.resourcesMut.Unlock()
489515

@@ -508,7 +534,11 @@ func (s *Scheduler) RemoveStreams(ctx context.Context, streams ...*workflow.Stre
508534

509535
changed, _ := registered.setState(workflow.StreamStateClosed)
510536
if changed {
511-
registered.handler(ctx, streamToRemove, workflow.StreamStateClosed)
537+
n.AddStreamEvent(streamNotification{
538+
Handler: registered.handler,
539+
Stream: streamToRemove,
540+
NewState: workflow.StreamStateClosed,
541+
})
512542
}
513543

514544
delete(s.streams, streamToRemove.ULID)
@@ -577,6 +607,9 @@ func (s *Scheduler) Start(ctx context.Context, handler workflow.TaskEventHandler
577607
// registerTasks registers the provided tasks with the scheduler without
578608
// enqueuing them.
579609
func (s *Scheduler) registerTasks(ctx context.Context, handler workflow.TaskEventHandler, tasks ...*workflow.Task) ([]*task, error) {
610+
var n notifier
611+
defer n.Notify(ctx)
612+
580613
newTasks := make([]*task, 0, len(tasks))
581614

582615
s.resourcesMut.Lock()
@@ -629,7 +662,11 @@ NextTask:
629662
newTasks = append(newTasks, newTask)
630663

631664
// Inform the owner about the state change from Created to Pending.
632-
handler(ctx, taskToStart, newTask.status)
665+
n.AddTaskEvent(taskNotification{
666+
Handler: handler,
667+
Task: taskToStart,
668+
NewStatus: newTask.status,
669+
})
633670
}
634671

635672
if len(errs) == 0 {
@@ -652,6 +689,9 @@ func (s *Scheduler) enqueueTasks(tasks []*task) {
652689
// Cancel requests cancellation of the specified tasks. Cancel returns an error
653690
// if any of the tasks were not found.
654691
func (s *Scheduler) Cancel(ctx context.Context, tasks ...*workflow.Task) error {
692+
var n notifier
693+
defer n.Notify(ctx)
694+
655695
s.resourcesMut.Lock()
656696
defer s.resourcesMut.Unlock()
657697

@@ -665,29 +705,14 @@ func (s *Scheduler) Cancel(ctx context.Context, tasks ...*workflow.Task) error {
665705
}
666706

667707
// Immediately clean up our own resources.
668-
s.deleteTask(registered)
708+
s.deleteTask(ctx, &n, registered)
669709

670710
if changed, _ := registered.setState(workflow.TaskStatus{State: workflow.TaskStateCancelled}); !changed {
671711
// Ignore if the task couldn't move into the canceled state, which
672712
// indicates it's already in a terminal state.
673713
continue
674714
}
675715

676-
// Close all associated sink streams.
677-
for _, sinks := range registered.inner.Sinks {
678-
for _, rawSink := range sinks {
679-
sink, ok := s.streams[rawSink.ULID]
680-
if !ok {
681-
continue
682-
}
683-
684-
// changeStreamState only returns an error for an invalid state
685-
// change, which isn't possible here (it's never invalid to move
686-
// to Closed, only a no-op if it's already Closed).
687-
_ = s.changeStreamState(ctx, sink, workflow.StreamStateClosed)
688-
}
689-
}
690-
691716
// If the task has an owner, we'll inform it that the task has been
692717
// canceled and it can stop processing it.
693718
//
@@ -697,7 +722,11 @@ func (s *Scheduler) Cancel(ctx context.Context, tasks ...*workflow.Task) error {
697722
}
698723

699724
// Inform the owner about the change.
700-
registered.handler(ctx, taskToCancel, registered.status)
725+
n.AddTaskEvent(taskNotification{
726+
Handler: registered.handler,
727+
Task: taskToCancel,
728+
NewStatus: registered.status,
729+
})
701730
}
702731

703732
if len(errs) == 0 {
@@ -706,7 +735,7 @@ func (s *Scheduler) Cancel(ctx context.Context, tasks ...*workflow.Task) error {
706735
return errors.Join(errs...)
707736
}
708737

709-
func (s *Scheduler) deleteTask(t *task) {
738+
func (s *Scheduler) deleteTask(ctx context.Context, n *notifier, t *task) {
710739
delete(s.tasks, t.inner.ULID)
711740

712741
if owner := t.owner; owner != nil {
@@ -715,4 +744,19 @@ func (s *Scheduler) deleteTask(t *task) {
715744
delete(knownTasks, t)
716745
}
717746
}
747+
748+
// Close all associated sink streams.
749+
for _, sinks := range t.inner.Sinks {
750+
for _, rawSink := range sinks {
751+
sink, ok := s.streams[rawSink.ULID]
752+
if !ok {
753+
continue
754+
}
755+
756+
// changeStreamState only returns an error for an invalid state
757+
// change, which isn't possible here (it's never invalid to move
758+
// to Closed, only a no-op if it's already Closed).
759+
_ = s.changeStreamState(ctx, n, sink, workflow.StreamStateClosed)
760+
}
761+
}
718762
}

0 commit comments

Comments
 (0)