From 664881bf5edaa3cd84f243a4522d99c3308e8d38 Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Fri, 10 Oct 2025 15:56:52 -0400 Subject: [PATCH 1/5] chore(engine): expose physical node in tree representation The scheduler prototype was relying on the tree ID matching the node ID for injecting additional information into the tree (streams used by that node). This commit introduces a workaround by allocating creators of a tree node to inject arbitrary values as context. Signed-off-by: Robert Fratto --- pkg/engine/internal/planner/physical/printer.go | 2 ++ pkg/engine/internal/util/tree/tree.go | 2 ++ 2 files changed, 4 insertions(+) diff --git a/pkg/engine/internal/planner/physical/printer.go b/pkg/engine/internal/planner/physical/printer.go index 198f712f66577..c31c96c9e2296 100644 --- a/pkg/engine/internal/planner/physical/printer.go +++ b/pkg/engine/internal/planner/physical/printer.go @@ -27,6 +27,8 @@ func toTree(p *Plan, n Node) *tree.Node { func toTreeNode(n Node) *tree.Node { treeNode := tree.NewNode(n.Type().String(), "") + treeNode.Context = n + switch node := n.(type) { case *DataObjScan: treeNode.Properties = []tree.Property{ diff --git a/pkg/engine/internal/util/tree/tree.go b/pkg/engine/internal/util/tree/tree.go index 18c7a8418f0b4..b776313488c14 100644 --- a/pkg/engine/internal/util/tree/tree.go +++ b/pkg/engine/internal/util/tree/tree.go @@ -43,6 +43,8 @@ type Node struct { // for comments are tree-style properties of a node, such as expressions of a // physical plan node. Comments []*Node + // Context is an optional value to associate with the node. + Context any } // NewNode creates a new node with the given name, unique identifier and From 5dad4f9444dafa1f704012435db9a84f47665a6b Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Fri, 10 Oct 2025 15:57:58 -0400 Subject: [PATCH 2/5] chore(engine): allow manipulation of physical plan DAGs As the scheduler prototype will manipulate physical plans and split them into smaller fragments, utility functions are needed to create physical plans from DAGs and return the existing DAG for modification. Signed-off-by: Robert Fratto --- pkg/engine/internal/planner/physical/plan.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/engine/internal/planner/physical/plan.go b/pkg/engine/internal/planner/physical/plan.go index 2c6e4993f69f4..c9dd8ab385060 100644 --- a/pkg/engine/internal/planner/physical/plan.go +++ b/pkg/engine/internal/planner/physical/plan.go @@ -126,6 +126,15 @@ type Plan struct { graph dag.Graph[Node] } +// FromGraph constructs a Plan from a given DAG. +func FromGraph(graph dag.Graph[Node]) *Plan { + return &Plan{graph: graph} +} + +// Graph returns the underlying graph of the plan. Modifications to the returned +// graph will affect the Plan. +func (p *Plan) Graph() *dag.Graph[Node] { return &p.graph } + // Len returns the number of nodes in the graph. func (p *Plan) Len() int { return p.graph.Len() } From adf942320282b23e2742f60127d130448ff3a007 Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Fri, 17 Oct 2025 11:35:48 -0400 Subject: [PATCH 3/5] chore(engine): introduce concept of shardable nodes A shardable node is a physical plan node that supports being split into multiple smaller nodes. Currently, ScanSet is the only shardable node, where each shard is a one of its targets (resulting in a DataObjScan node). Shardable nodes will be used in task planning to create tasks downstream of Parallelize. Signed-off-by: Robert Fratto --- pkg/engine/internal/planner/physical/plan.go | 15 +++++++++++ .../internal/planner/physical/scanset.go | 26 +++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/pkg/engine/internal/planner/physical/plan.go b/pkg/engine/internal/planner/physical/plan.go index c9dd8ab385060..441d3f780203b 100644 --- a/pkg/engine/internal/planner/physical/plan.go +++ b/pkg/engine/internal/planner/physical/plan.go @@ -1,6 +1,8 @@ package physical import ( + "iter" + "github.com/grafana/loki/v3/pkg/engine/internal/util/dag" ) @@ -78,6 +80,19 @@ type Node interface { isNode() } +// ShardableNode is a Node that can be split into multiple smaller partitions. +type ShardableNode interface { + Node + + // Shards produces a sequence of nodes that represent a fragment of the + // original node. Returned nodes do not need to be the same type as the + // original node. + // + // Implementations must produce unique values of Node in each call to + // Shards. + Shards() iter.Seq[Node] +} + var _ Node = (*DataObjScan)(nil) var _ Node = (*Projection)(nil) var _ Node = (*Limit)(nil) diff --git a/pkg/engine/internal/planner/physical/scanset.go b/pkg/engine/internal/planner/physical/scanset.go index 93e53d4e9b7fd..52a95efde9d92 100644 --- a/pkg/engine/internal/planner/physical/scanset.go +++ b/pkg/engine/internal/planner/physical/scanset.go @@ -2,6 +2,7 @@ package physical import ( "fmt" + "iter" ) // ScanTarget represents a target of a [ScanSet]. @@ -87,3 +88,28 @@ func (s *ScanSet) Type() NodeType { func (s *ScanSet) Accept(v Visitor) error { return v.VisitScanSet(s) } + +// Shards returns an iterator over the shards of the scan. Each emitted shard +// will be a clone. Projections and predicates on the ScanSet are cloned and +// applied to each shard. +// +// Shards panics if one of the targets is invalid. +func (s *ScanSet) Shards() iter.Seq[Node] { + return func(yield func(Node) bool) { + for _, target := range s.Targets { + switch target.Type { + case ScanTypeDataObject: + node := target.DataObject.Clone().(*DataObjScan) + node.Projections = cloneExpressions(s.Projections) + node.Predicates = cloneExpressions(s.Predicates) + + if !yield(node) { + return + } + + default: + panic(fmt.Sprintf("invalid scan type %s", target.Type)) + } + } + } +} From 382c2c129b88a88bc65721e8f2a31279f9dc759c Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Fri, 17 Oct 2025 11:35:57 -0400 Subject: [PATCH 4/5] chore(engine): add workflow package The new workflow package introduces an abstraction over physical plans: * A physical plan is split into several parallelizable units called "tasks," split on pipeline breakers and generated shards downstream of Parallelize nodes. * Each task sends or receives data along streams. * The "workflow" is the graph of tasks to execute for a given query. A workflow listens for task results from the root task. Other engines typically call these units "fragments," with the collection of fragments constructing the "distributed query plan." We don't use the standard term here since our usage is stateful. Workflows respond to tasks changing state, and will eventually be responsible for deciding when a task should be enqueued for running at all. The workflow.Runner interface is introduced to represent the mechanism running tasks. A basic implementation is used for testing, but in production, workflow.Runner will be implemented by the scheduler. Signed-off-by: Robert Fratto --- go.mod | 2 +- pkg/engine/internal/workflow/runner.go | 144 +++++++ pkg/engine/internal/workflow/task.go | 36 ++ pkg/engine/internal/workflow/workflow.go | 242 +++++++++++ .../internal/workflow/workflow_planner.go | 408 ++++++++++++++++++ .../workflow/workflow_planner_test.go | 285 ++++++++++++ .../internal/workflow/workflow_print.go | 101 +++++ pkg/engine/internal/workflow/workflow_test.go | 281 ++++++++++++ 8 files changed, 1498 insertions(+), 1 deletion(-) create mode 100644 pkg/engine/internal/workflow/runner.go create mode 100644 pkg/engine/internal/workflow/task.go create mode 100644 pkg/engine/internal/workflow/workflow.go create mode 100644 pkg/engine/internal/workflow/workflow_planner.go create mode 100644 pkg/engine/internal/workflow/workflow_planner_test.go create mode 100644 pkg/engine/internal/workflow/workflow_print.go create mode 100644 pkg/engine/internal/workflow/workflow_test.go diff --git a/go.mod b/go.mod index a5835d3a78c0a..0f06d5cc1713d 100644 --- a/go.mod +++ b/go.mod @@ -139,6 +139,7 @@ require ( github.com/influxdata/tdigest v0.0.2-0.20210216194612-fc98d27c9e8b github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db github.com/ncw/swift/v2 v2.0.4 + github.com/oklog/ulid/v2 v2.1.1 github.com/parquet-go/parquet-go v0.25.1 github.com/pressly/goose/v3 v3.26.0 github.com/prometheus/alertmanager v0.28.1 @@ -241,7 +242,6 @@ require ( github.com/muesli/termenv v0.16.0 // indirect github.com/ncruces/go-strftime v0.1.9 // indirect github.com/ncw/swift v1.0.53 // indirect - github.com/oklog/ulid/v2 v2.1.1 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.129.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.129.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor v0.129.0 // indirect diff --git a/pkg/engine/internal/workflow/runner.go b/pkg/engine/internal/workflow/runner.go new file mode 100644 index 0000000000000..bfcbf57f7cdb7 --- /dev/null +++ b/pkg/engine/internal/workflow/runner.go @@ -0,0 +1,144 @@ +package workflow + +import ( + "context" + "fmt" + + "github.com/grafana/loki/v3/pkg/engine/internal/executor" +) + +// A Runner can asynchronously execute a workflow. +type Runner interface { + // AddStreams registers a list of Streams that can be used by Tasks. + // AddStreams returns an error if any of the streams (by ID) are already + // registered. + // + // The provided handler will be called whenever any of the provided streams + // change state. + AddStreams(ctx context.Context, handler StreamEventHandler, streams ...*Stream) error + + // RemoveStreams removes a list of Streams that can be used by Tasks. The + // associated [StreamEventHandler] will no longer be called for removed + // streams. + // + // RemoveStreams returns an error if there are active tasks using the + // streams. + RemoveStreams(ctx context.Context, streams ...*Stream) error + + // Listen binds the caller as the receiver of the specified stream. + // Listening on a stream prevents tasks from reading from it. + Listen(ctx context.Context, stream *Stream) (executor.Pipeline, error) + + // Start begins executing the provided tasks in the background. Start + // returns an error if any of the Tasks references an unregistered Stream, + // or if any of the tasks are a reader of a stream that's already bound. + // + // The provided handler will be called whenever any of the provided tasks + // change state. + // + // Implementations must track executed tasks until the tasks enter a + // terminal state. + // + // The provided context is used for the lifetime of the tasks. Cancelling + // the context will cancel all tasks, and close associated streams for + // sending. + Start(ctx context.Context, handler TaskEventHandler, tasks ...*Task) error + + // Cancel requests cancelation of the specified tasks. Cancel returns an + // error if any of the tasks were not found. + Cancel(ctx context.Context, tasks ...*Task) error +} + +// StreamEventHandler is a function that handles events for changed streams. +type StreamEventHandler func(ctx context.Context, s *Stream, newState StreamState) + +// StreamState represents the state of a stream. It is sent as an event by a +// [Runner] whenever a stream associated with a task changes its state. +// +// The zero value of StreamState is an inactive stream. +type StreamState int + +const ( + // StreamStateIdle represents a stream that is waiting for both the sender + // and receiver to be available. + StreamStateIdle StreamState = iota + + // StreamStateOpen represents a stream that is open and transmitting data. + StreamStateOpen + + // StreamStateBlocked represents a stream that is blocked (by backpressure) + // on sending data. + StreamStateBlocked + + // StreamStateClosed represents a stream that is closed and no longer + // transmitting data. + StreamStateClosed +) + +var streamStates = [...]string{ + "Idle", + "Open", + "Blocked", + "Closed", +} + +// String returns a string representation of the StreamState. +func (s StreamState) String() string { + if s >= 0 && int(s) < len(streamStates) { + return streamStates[s] + } + return fmt.Sprintf("StreamState(%d)", s) +} + +// TaskEventHandler is a function that handles events for changed tasks. +type TaskEventHandler func(ctx context.Context, t *Task, newState TaskState) + +// TaskState represents the state of a Task. It is sent as an event by a +// [Runner] whenever a Task associated with a task changes its state. +type TaskState int + +const ( + // TaskStateCreated represents the initial state for a Task, where it has + // been created but not given to a [Runner]. + TaskStateCreated TaskState = iota + + // TaskStatePending represents a Task that is pending execution by a + // [Runner]. + TaskStatePending + + // TaskStateRunning represents a Task that is currently being executed by a + // [Runner]. + TaskStateRunning + + // TaskStateCompleted represents a Task that has completed successfully. + TaskStateCompleted + + // TaskStateCancelled represents a Task that has been cancelled, either by a + // [Runner] or the owning [Workflow]. + TaskStateCancelled + + // TaskStateFailed represents a Task that has failed during execution. + TaskStateFailed +) + +var TaskStates = [...]string{ + "Created", + "Pending", + "Running", + "Completed", + "Cancelled", + "Failed", +} + +// Terminal returns true if the TaskState is terminal. +func (s TaskState) Terminal() bool { + return s == TaskStateCompleted || s == TaskStateCancelled || s == TaskStateFailed +} + +// String returns a string representation of the TaskState. +func (s TaskState) String() string { + if s >= 0 && int(s) < len(TaskStates) { + return TaskStates[s] + } + return fmt.Sprintf("TaskState(%d)", s) +} diff --git a/pkg/engine/internal/workflow/task.go b/pkg/engine/internal/workflow/task.go new file mode 100644 index 0000000000000..f6df959a058b0 --- /dev/null +++ b/pkg/engine/internal/workflow/task.go @@ -0,0 +1,36 @@ +package workflow + +import ( + "github.com/oklog/ulid/v2" + + "github.com/grafana/loki/v3/pkg/engine/internal/planner/physical" +) + +// A Task is a single unit of work within a workflow. Each Task is a partition +// of a local physical plan. +type Task struct { + // ULID is a unique identifier of the Task. + ULID ulid.ULID + + // Fragment is the local physical plan that this Task represents. + Fragment *physical.Plan + + // Sources defines which Streams physical nodes read from. Sources are only + // defined for nodes in the Fragment which read data across task boundaries. + Sources map[physical.Node][]*Stream + + // Sinks defines which Streams physical nodes write to. Sinks are only + // defined for nodes in the Fragment which write data across task boundaries. + Sinks map[physical.Node][]*Stream +} + +// ID returns the string form of the Task's ULID. +func (t *Task) ID() string { return t.ULID.String() } + +// A Stream is an abstract representation of how data flows across Task +// boundaries. Each Stream has exactly one sender (a Task), and one receiver +// (either another Task or the owning [Workflow]). +type Stream struct { + // ULID is a unique identifier of the Stream. + ULID ulid.ULID +} diff --git a/pkg/engine/internal/workflow/workflow.go b/pkg/engine/internal/workflow/workflow.go new file mode 100644 index 0000000000000..8fb38a3e9c4b4 --- /dev/null +++ b/pkg/engine/internal/workflow/workflow.go @@ -0,0 +1,242 @@ +// Package workflow defines how to represent physical plans as distributed +// workflows. +package workflow + +import ( + "context" + "sync" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/oklog/ulid/v2" + + "github.com/grafana/loki/v3/pkg/engine/internal/executor" + "github.com/grafana/loki/v3/pkg/engine/internal/planner/physical" + "github.com/grafana/loki/v3/pkg/engine/internal/util/dag" +) + +// Workflow represents a physical plan that has been partitioned into +// parallelizable tasks. +type Workflow struct { + logger log.Logger + runner Runner + graph dag.Graph[*Task] + resultsStream *Stream + + tasksMut sync.RWMutex + taskStates map[*Task]TaskState + + streamsMut sync.RWMutex + streamStates map[*Stream]StreamState +} + +// New creates a new Workflow from a physical plan. New returns an error if the +// physical plan does not have exactly one root node, or if the physical plan +// cannot be partitioned into a Workflow. +// +// The provided Runner will be used for Workflow execution. +func New(logger log.Logger, runner Runner, plan *physical.Plan) (*Workflow, error) { + graph, err := planWorkflow(plan) + if err != nil { + return nil, err + } + + // Inject a stream for final task results. + results, err := injectResultsStream(&graph) + if err != nil { + return nil, err + } + + return &Workflow{ + logger: logger, + runner: runner, + graph: graph, + resultsStream: results, + + taskStates: make(map[*Task]TaskState), + streamStates: make(map[*Stream]StreamState), + }, nil +} + +// injectResultsStreams injects a new stream into the sinks of the root task for +// the workflow to receive final results. +func injectResultsStream(graph *dag.Graph[*Task]) (*Stream, error) { + results := &Stream{ULID: ulid.Make()} + + // Inject a stream for final task results. + rootTask, err := graph.Root() + if err != nil { + return nil, err + } + + rootNode, err := rootTask.Fragment.Root() + if err != nil { + return nil, err + } + + rootTask.Sinks[rootNode] = append(rootTask.Sinks[rootNode], results) + return results, nil +} + +// Run executes the workflow, returning a pipeline to read results from. The +// provided context is used for the lifetime of the workflow execution. +// +// The returned pipeline must be closed when the workflow is complete to release +// resources. +func (wf *Workflow) Run(ctx context.Context) (pipeline executor.Pipeline, err error) { + var ( + streams = wf.allStreams() + tasks = wf.allTasks() + ) + + if err := wf.runner.AddStreams(ctx, wf.onStreamChange, streams...); err != nil { + return nil, err + } + defer func() { + if err != nil { + _ = wf.runner.RemoveStreams(context.Background(), streams...) + } + }() + + pipeline, err = wf.runner.Listen(ctx, wf.resultsStream) + if err != nil { + return nil, err + } + + // TODO(rfratto): For logs queries, we want a system to limit how many scan + // tasks get sent to the runner at once. + // + // This will limit unnecessary resource consumption of workflows when + // there's a lot of compute capacity. + if err := wf.runner.Start(ctx, wf.onTaskChange, tasks...); err != nil { + pipeline.Close() + return nil, err + } + + wrapped := &wrappedPipeline{ + inner: pipeline, + onClose: func() { + // Cancel will return an error for any tasks that were already + // canceled, but for convenience we give all the known tasks anyway. + // + // The same thing applies to RemoveStreams. + _ = wf.runner.Cancel(context.Background(), tasks...) + _ = wf.runner.RemoveStreams(context.Background(), streams...) + }, + } + return wrapped, nil +} + +func (wf *Workflow) allStreams() []*Stream { + var ( + result []*Stream + seenStreams = map[*Stream]struct{}{} + ) + + // We only iterate over sources below (for convenience), and since + // wf.results is only used as a sink, we need to manually add it here. + result = append(result, wf.resultsStream) + seenStreams[wf.resultsStream] = struct{}{} + + for _, root := range wf.graph.Roots() { + _ = wf.graph.Walk(root, func(t *Task) error { + // Task construction guarantees that there is a sink for each source + // (minus the results stream we generate), so there's no point in + // interating over Sources and Sinks. + for _, streams := range t.Sources { + for _, stream := range streams { + if _, seen := seenStreams[stream]; seen { + continue + } + seenStreams[stream] = struct{}{} + result = append(result, stream) + } + } + + return nil + }, dag.PreOrderWalk) + } + + return result +} + +func (wf *Workflow) allTasks() []*Task { + var tasks []*Task + + for _, root := range wf.graph.Roots() { + // [dag.Graph.Walk] guarantees that each node is only visited once, so + // we can safely append the task to the list without checking if it's + // already been seen. + _ = wf.graph.Walk(root, func(t *Task) error { + tasks = append(tasks, t) + return nil + }, dag.PreOrderWalk) + } + + return tasks +} + +func (wf *Workflow) onStreamChange(_ context.Context, stream *Stream, newState StreamState) { + wf.streamsMut.Lock() + defer wf.streamsMut.Unlock() + + wf.streamStates[stream] = newState + + // TODO(rfratto): Do we need to do anything if a stream changes? Figuring + // out what to do here will need to wait until the scheduler is available. +} + +func (wf *Workflow) onTaskChange(ctx context.Context, task *Task, newState TaskState) { + wf.tasksMut.Lock() + wf.taskStates[task] = newState + wf.tasksMut.Unlock() + + if !newState.Terminal() { + return + } + + // task reached a terminal state. We need to detect if task's immediate + // children should be canceled. We only look at immediate children, since + // canceling them will trigger onTaskChange to process indirect children. + var tasksToCancel []*Task + + wf.tasksMut.RLock() + { + NextChild: + for _, child := range wf.graph.Children(task) { + // Cancel the child if and only if all of the child's parents (which + // includes the task that just updated) are in a terminal state. + for _, parent := range wf.graph.Parents(child) { + parentState := wf.taskStates[parent] + if !parentState.Terminal() { + continue NextChild + } + } + + tasksToCancel = append(tasksToCancel, child) + } + } + wf.tasksMut.RUnlock() + + // Runners may re-invoke onTaskChange, so we don't want to hold the mutex + // when calling this. + if err := wf.runner.Cancel(ctx, tasksToCancel...); err != nil { + level.Warn(wf.logger).Log("msg", "failed to cancel tasks", "err", err) + } +} + +type wrappedPipeline struct { + inner executor.Pipeline + onClose func() +} + +func (p *wrappedPipeline) Read(ctx context.Context) (arrow.Record, error) { + return p.inner.Read(ctx) +} + +// Close closes the resources of the pipeline. +func (p *wrappedPipeline) Close() { + p.inner.Close() + p.onClose() +} diff --git a/pkg/engine/internal/workflow/workflow_planner.go b/pkg/engine/internal/workflow/workflow_planner.go new file mode 100644 index 0000000000000..57c9d7d879b8b --- /dev/null +++ b/pkg/engine/internal/workflow/workflow_planner.go @@ -0,0 +1,408 @@ +package workflow + +import ( + "errors" + "fmt" + "slices" + + "github.com/oklog/ulid/v2" + + "github.com/grafana/loki/v3/pkg/engine/internal/planner/physical" + "github.com/grafana/loki/v3/pkg/engine/internal/util/dag" +) + +// planner is responsible for constructing the Task graph held by a [Workflow]. +type planner struct { + graph dag.Graph[*Task] + physical *physical.Plan + + streamWriters map[*Stream]*Task // Lookup of stream to which task writes to it + streamReaders map[*Stream]*Task // Lookup of stream to which task reads from it +} + +// planWorkflow partitions a physical plan into a graph of tasks. +// +// planWorkflow returns an error if the provided physical plan does not +// have exactly one root node, or if the physical plan cannot be partitioned. +func planWorkflow(plan *physical.Plan) (dag.Graph[*Task], error) { + root, err := plan.Root() + if err != nil { + return dag.Graph[*Task]{}, err + } + + planner := &planner{ + physical: plan, + + streamWriters: make(map[*Stream]*Task), + streamReaders: make(map[*Stream]*Task), + } + if err := planner.Process(root); err != nil { + return dag.Graph[*Task]{}, err + } + + return planner.graph, nil +} + +// Process builds a set of tasks from a root physical plan node. Built tasks are +// added to p.graph. +func (p *planner) Process(root physical.Node) error { + _, err := p.processNode(root, true) + return err +} + +// processNode builds a set of tasks from the given node. splitOnBreaker +// indicates whether pipeline breaker nodes should be split off into their own +// task. +// +// The reuslting task is the task immediately produced by node, which callers +// can use to add edges. All tasks, including those produced in recursive calls +// to processNode, are added into p.Graph. +func (p *planner) processNode(node physical.Node, splitOnBreaker bool) (*Task, error) { + var ( + // taskPlan is the in-progress physical plan for an individual task. + taskPlan dag.Graph[physical.Node] + + sources = make(map[physical.Node][]*Stream) + + // childrenTasks is the slice of immediate Tasks produced by processing + // the children of node. + childrenTasks []*Task + ) + + // Immediately add the node to the task physical plan. + taskPlan.Add(node) + + var ( + stack = make(stack[physical.Node], 0, p.physical.Len()) + visited = make(map[physical.Node]struct{}, p.physical.Len()) + nodeTasks = make(map[physical.Node][]*Task, p.physical.Len()) + ) + + stack.Push(node) + for stack.Len() > 0 { + next := stack.Pop() + if _, ok := visited[next]; ok { + // Ignore nodes that have already been visited, which can happen + // if a node has multiple parents. + continue + } + visited[next] = struct{}{} + + for _, child := range p.physical.Children(next) { + // NOTE(rfratto): We may have already seen child before (if it has + // more than one parent), but we want to continue processing to + // ensure we update sources properly and retain all relationships + // within the same graph. + + switch { + case splitOnBreaker && isPipelineBreaker(child): + childTasks, found := nodeTasks[child] + if !found { + // Split the pipeline breaker into its own set of tasks. + task, err := p.processNode(child, splitOnBreaker) + if err != nil { + return nil, err + } + childrenTasks = append(childrenTasks, task) + nodeTasks[child] = append(nodeTasks[child], task) + childTasks = nodeTasks[child] + } + + // Create one unique stream for each child task so we can + // receive output from them. + for _, task := range childTasks { + stream := &Stream{ULID: ulid.Make()} + if err := p.addSink(task, stream); err != nil { + return nil, err + } + sources[next] = append(sources[next], stream) + p.streamReaders[stream] = task + } + + case child.Type() == physical.NodeTypeParallelize: + childTasks, found := nodeTasks[child] + if !found { + // Split the pipeline breaker into its own set of tasks. + tasks, err := p.processParallelizeNode(child.(*physical.Parallelize)) + if err != nil { + return nil, err + } + childrenTasks = append(childrenTasks, tasks...) + nodeTasks[child] = append(nodeTasks[child], tasks...) + childTasks = nodeTasks[child] + } + + // Create one unique stream for each child task so we can + // receive output from them. + for _, task := range childTasks { + stream := &Stream{ULID: ulid.Make()} + if err := p.addSink(task, stream); err != nil { + return nil, err + } + sources[next] = append(sources[next], stream) + p.streamReaders[stream] = task + } + + default: + // Add child node into the plan (if we haven't already) and + // retain existing edges. + taskPlan.Add(child) + _ = taskPlan.AddEdge(dag.Edge[physical.Node]{ + Parent: next, + Child: child, + }) + + stack.Push(child) // Push child for further processing. + } + } + } + + task := &Task{ + ULID: ulid.Make(), + Fragment: physical.FromGraph(taskPlan), + Sources: sources, + Sinks: make(map[physical.Node][]*Stream), + } + p.graph.Add(task) + + // Wire edges to children tasks and update their sinks to note sending to + // our task. + for _, child := range childrenTasks { + _ = p.graph.AddEdge(dag.Edge[*Task]{ + Parent: task, + Child: child, + }) + } + + return task, nil +} + +// addSink adds the sink stream to the root node of the provided task. addSink +// returns an error if t has more than one root node. +func (p *planner) addSink(t *Task, sink *Stream) error { + root, err := t.Fragment.Root() + if err != nil { + return err + } + + if _, exist := p.streamWriters[sink]; exist { + return fmt.Errorf("writer for stream %s already exists", sink.ULID) + } + p.streamWriters[sink] = t + + t.Sinks[root] = append(t.Sinks[root], sink) + return nil +} + +// removeSink removes the sink stream from the root node of the provided task. +// removeSink is a no-op if sink doesn't have a writer. +func (p *planner) removeSink(sink *Stream) error { + t, exist := p.streamWriters[sink] + if !exist { + return nil + } + + root, err := t.Fragment.Root() + if err != nil { + return err + } + + t.Sinks[root] = slices.DeleteFunc(t.Sinks[root], func(s *Stream) bool { return s == sink }) + delete(p.streamWriters, sink) + return nil +} + +// isPipelineBreaker returns true if the node is a pipeline breaker. +func isPipelineBreaker(node physical.Node) bool { + // TODO(rfratto): Should this information be exposed by the node itself? A + // decision on this should wait until we're able to serialize the node over + // the network, since that might impact how we're able to define this at the + // node level. + switch node.Type() { + case physical.NodeTypeTopK, physical.NodeTypeRangeAggregation, physical.NodeTypeVectorAggregation: + return true + } + + return false +} + +// processParallelizeNode builds a set of tasks for a Parallelize node. +func (p *planner) processParallelizeNode(node *physical.Parallelize) ([]*Task, error) { + // Parallelize nodes are used as a marker task for splitting a branch of the + // query plan into many distributed tasks. + // + // For example + // + // Parallelize + // TopK limit=1000 + // ScanSet + // @target type=ScanTypeDataObject location=object-a section_id=1 + // @target type=ScanTypeDataObject location=object-b section_id=1 + // + // becomes two tasks: + // + // TopK limit=1000 -> DataObjScan location=object-a + // TopK limit=1000 -> DataObjScan location=object-b + // + // We handle this in a few phases: + // + // 1. Create a "template task" starting from the child of the Parallelize (TopK) + // 2. Find the target node in the template task that can be split into smaller shard nodes (ScanSet) + // 3. Create shard nodes from the target node. + // 4. For each shard node, clone the template task and replace the target node with the shard node. + // 5. Finally, remove the template task. + // + // Not all nodes can be used as a target for splitting into shards. See + // [findShardableNode] for the full list of rules. + roots := p.physical.Children(node) + if len(roots) != 1 { + return nil, errors.New("parallelize node must have exactly one child") + } + root := roots[0] + + // Since parallelize nodes can fan out to many smaller tasks, we don't split + // on pipeline breakers, which would otherwise create far too many tasks + // that do too *little* work. + templateTask, err := p.processNode(root, false) + if err != nil { + return nil, err + } + + shardableNode, err := findShardableNode(templateTask.Fragment.Graph(), root) + if err != nil { + return nil, err + } else if shardableNode == nil { + // In the case we have no node to shard (a physical planner bug?), we + // can skip the rest of the process and treat the template task as a + // single shard. + return []*Task{templateTask}, nil + } + + // Safety check: + // + // Our template task should currently be a root node, as we called + // processNode ourselves and haven't wired up parents yet. This means + // that it should have no parents, and it shouldn't have any sink + // streams yet. + // + // We double-check here to ensure that the invariant holds. If this + // invariant breaks, the loop below will be incorrect. + switch { + case len(p.graph.Parents(templateTask)) != 0: + return nil, errors.New("unexpected template task with parents") + case len(templateTask.Sinks) > 0: + return nil, errors.New("unexpected template task with sinks") + } + + var partitions []*Task + + for shard := range shardableNode.Shards() { + // Create a new task for the shard. + shardedPlan := templateTask.Fragment.Graph().Clone() + shardedPlan.Inject(shardableNode, shard) + shardedPlan.Eliminate(shardableNode) + + // The sources of the template task need to be replaced with new unique + // streams. + shardSources := make(map[physical.Node][]*Stream, len(templateTask.Sources)) + for node, templateStreams := range templateTask.Sources { + shardStreams := make([]*Stream, 0, len(templateStreams)) + + for _, templateStream := range templateStreams { + shardStream := &Stream{ULID: ulid.Make()} + shardStreams = append(shardStreams, shardStream) + + // Find the writer of the template stream and tell it about the + // new stream. + writer, ok := p.streamWriters[templateStream] + if !ok { + return nil, fmt.Errorf("unconnected stream %s", templateStream.ULID) + } else if err := p.addSink(writer, shardStream); err != nil { + return nil, err + } + } + + shardSources[node] = shardStreams + } + + partition := &Task{ + ULID: ulid.Make(), + + Fragment: physical.FromGraph(*shardedPlan), + Sources: shardSources, + Sinks: make(map[physical.Node][]*Stream), + } + p.graph.Add(partition) + + // Copy the downstream edges from the template task into our partitioned + // task. + for _, child := range p.graph.Children(templateTask) { + _ = p.graph.AddEdge(dag.Edge[*Task]{ + Parent: partition, + Child: child, + }) + } + + partitions = append(partitions, partition) + } + + // Before we remove our template task, we need to unlink its streams from + // whichever task is writing to it. + // + // This keeps stream alive in templateTask.Sources, but since templateTask + // is removed before the function returns, this is safe. + for _, streams := range templateTask.Sources { + for _, stream := range streams { + _ = p.removeSink(stream) + } + } + + p.graph.Eliminate(templateTask) + return partitions, nil +} + +// findShardableNode finds the first node in the graph that can be split into +// smaller shards. Only leaf nodes are examined. +// +// If there is no shardable leaf node reachable from root, findShardableNode +// returns nil. findShardableNode returns an error if there are two shardable +// leaf nodes. +func findShardableNode(graph *dag.Graph[physical.Node], root physical.Node) (physical.ShardableNode, error) { + var found physical.ShardableNode + + err := graph.Walk(root, func(n physical.Node) error { + isLeaf := len(graph.Children(n)) == 0 + if !isLeaf { + return nil + } + + shardable, ok := n.(physical.ShardableNode) + if !ok { + return nil + } else if found != nil { + return errors.New("multiple shardable leaf nodes found") + } + + found = shardable + return nil + }, dag.PreOrderWalk) + + return found, err +} + +// stack is a slice with Push and Pop operations. +type stack[E any] []E + +func (s stack[E]) Len() int { return len(s) } + +func (s *stack[E]) Push(e E) { *s = append(*s, e) } + +func (s *stack[E]) Pop() E { + if len(*s) == 0 { + panic("stack is empty") + } + last := len(*s) - 1 + e := (*s)[last] + *s = (*s)[:last] + return e +} diff --git a/pkg/engine/internal/workflow/workflow_planner_test.go b/pkg/engine/internal/workflow/workflow_planner_test.go new file mode 100644 index 0000000000000..9c8c5e8f5d7f7 --- /dev/null +++ b/pkg/engine/internal/workflow/workflow_planner_test.go @@ -0,0 +1,285 @@ +package workflow + +import ( + "strings" + "testing" + + "github.com/oklog/ulid/v2" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/engine/internal/planner/physical" + "github.com/grafana/loki/v3/pkg/engine/internal/util/dag" +) + +// TODO(rfratto): Once physical plans can be serializable, we should be able to +// write these tests using serialized physical plans rather than constructing +// them with the DAG. +// +// That would also allow for writing tests using txtar[^1], which may be easier +// to write and maintain. +// +// [^1]: https://research.swtch.com/testing + +func Test_planWorkflow(t *testing.T) { + t.Run("no pipeline breakers", func(t *testing.T) { + ulidGen := ulidGenerator{} + + var physicalGraph dag.Graph[physical.Node] + physicalGraph.Add(&physical.DataObjScan{}) + + physicalPlan := physical.FromGraph(physicalGraph) + + graph, err := planWorkflow(physicalPlan) + require.NoError(t, err) + require.Equal(t, 1, graph.Len()) + requireUniqueStreams(t, graph) + generateConsistentULIDs(&ulidGen, graph) + + expectOuptut := strings.TrimSpace(` +Task 00000000000000000000000001 +------------------------------- +DataObjScan location= streams=0 section_id=0 projections=() +`) + + actualOutput := Sprint(&Workflow{graph: graph}) + require.Equal(t, strings.TrimSpace(expectOuptut), strings.TrimSpace(actualOutput)) + }) + + t.Run("ends with one pipeline breaker", func(t *testing.T) { + ulidGen := ulidGenerator{} + + var physicalGraph dag.Graph[physical.Node] + + var ( + scan = physicalGraph.Add(&physical.DataObjScan{}) + rangeAgg = physicalGraph.Add(&physical.RangeAggregation{}) + ) + + _ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: rangeAgg, Child: scan}) + + physicalPlan := physical.FromGraph(physicalGraph) + + graph, err := planWorkflow(physicalPlan) + require.NoError(t, err) + require.Equal(t, 1, graph.Len()) + requireUniqueStreams(t, graph) + generateConsistentULIDs(&ulidGen, graph) + + expectOuptut := strings.TrimSpace(` +Task 00000000000000000000000001 +------------------------------- +RangeAggregation operation=invalid start=0001-01-01T00:00:00Z end=0001-01-01T00:00:00Z step=0s range=0s +└── DataObjScan location= streams=0 section_id=0 projections=() +`) + + actualOutput := Sprint(&Workflow{graph: graph}) + require.Equal(t, strings.TrimSpace(expectOuptut), strings.TrimSpace(actualOutput)) + }) + + t.Run("split on pipeline breaker", func(t *testing.T) { + ulidGen := ulidGenerator{} + + var physicalGraph dag.Graph[physical.Node] + + var ( + scan = physicalGraph.Add(&physical.DataObjScan{}) + rangeAgg = physicalGraph.Add(&physical.RangeAggregation{}) + vectorAgg = physicalGraph.Add(&physical.VectorAggregation{}) + ) + + _ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: rangeAgg, Child: scan}) + _ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: vectorAgg, Child: rangeAgg}) + + physicalPlan := physical.FromGraph(physicalGraph) + + graph, err := planWorkflow(physicalPlan) + require.NoError(t, err) + require.Equal(t, 2, graph.Len()) + requireUniqueStreams(t, graph) + generateConsistentULIDs(&ulidGen, graph) + + expectOuptut := strings.TrimSpace(` +Task 00000000000000000000000001 +------------------------------- +VectorAggregation + └── @source stream=00000000000000000000000003 + +Task 00000000000000000000000002 +------------------------------- +RangeAggregation operation=invalid start=0001-01-01T00:00:00Z end=0001-01-01T00:00:00Z step=0s range=0s +│ └── @sink stream=00000000000000000000000003 +└── DataObjScan location= streams=0 section_id=0 projections=() +`) + + actualOutput := Sprint(&Workflow{graph: graph}) + require.Equal(t, strings.TrimSpace(expectOuptut), strings.TrimSpace(actualOutput)) + }) + + t.Run("split on parallelize", func(t *testing.T) { + ulidGen := ulidGenerator{} + + var physicalGraph dag.Graph[physical.Node] + + var ( + vectorAgg = physicalGraph.Add(&physical.VectorAggregation{}) + rangeAgg = physicalGraph.Add(&physical.RangeAggregation{}) + + parallelize = physicalGraph.Add(&physical.Parallelize{}) + + filter = physicalGraph.Add(&physical.Filter{}) + parse = physicalGraph.Add(&physical.ParseNode{Kind: physical.ParserLogfmt}) + scanSet = physicalGraph.Add(&physical.ScanSet{ + Targets: []*physical.ScanTarget{ + {Type: physical.ScanTypeDataObject, DataObject: &physical.DataObjScan{Location: "a"}}, + {Type: physical.ScanTypeDataObject, DataObject: &physical.DataObjScan{Location: "b"}}, + {Type: physical.ScanTypeDataObject, DataObject: &physical.DataObjScan{Location: "c"}}, + }, + }) + ) + + _ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: vectorAgg, Child: rangeAgg}) + _ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: rangeAgg, Child: parallelize}) + _ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: parallelize, Child: filter}) + _ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: filter, Child: parse}) + _ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: parse, Child: scanSet}) + + physicalPlan := physical.FromGraph(physicalGraph) + + graph, err := planWorkflow(physicalPlan) + require.NoError(t, err) + require.Equal(t, 5, graph.Len()) + requireUniqueStreams(t, graph) + generateConsistentULIDs(&ulidGen, graph) + + expectOuptut := strings.TrimSpace(` +Task 00000000000000000000000001 +------------------------------- +VectorAggregation + └── @source stream=00000000000000000000000006 + +Task 00000000000000000000000002 +------------------------------- +RangeAggregation operation=invalid start=0001-01-01T00:00:00Z end=0001-01-01T00:00:00Z step=0s range=0s + ├── @source stream=00000000000000000000000007 + ├── @source stream=00000000000000000000000008 + ├── @source stream=00000000000000000000000009 + └── @sink stream=00000000000000000000000006 + +Task 00000000000000000000000003 +------------------------------- +Filter +│ └── @sink stream=00000000000000000000000007 +└── Parse kind=logfmt + └── DataObjScan location=a streams=0 section_id=0 projections=() + +Task 00000000000000000000000004 +------------------------------- +Filter +│ └── @sink stream=00000000000000000000000008 +└── Parse kind=logfmt + └── DataObjScan location=b streams=0 section_id=0 projections=() + +Task 00000000000000000000000005 +------------------------------- +Filter +│ └── @sink stream=00000000000000000000000009 +└── Parse kind=logfmt + └── DataObjScan location=c streams=0 section_id=0 projections=() +`) + + actualOutput := Sprint(&Workflow{graph: graph}) + require.Equal(t, strings.TrimSpace(expectOuptut), strings.TrimSpace(actualOutput)) + }) +} + +// requireUniqueStreams asserts that for each stream found in g, that stream has +// exactly one reader (from Task.Sources) and exactly one writer (from +// Task.Sinks). +func requireUniqueStreams(t testing.TB, g dag.Graph[*Task]) { + t.Helper() + + type streamInfo struct { + Reader physical.Node + Writer physical.Node + } + + streamInfos := make(map[*Stream]*streamInfo) + + for _, root := range g.Roots() { + _ = g.Walk(root, func(task *Task) error { + for node, streams := range task.Sources { + for _, stream := range streams { + info, ok := streamInfos[stream] + if !ok { + info = new(streamInfo) + streamInfos[stream] = info + } + + require.Nil(t, info.Reader, "each stream must have exactly one reader") + info.Reader = node + } + } + + for node, streams := range task.Sinks { + for _, stream := range streams { + info, ok := streamInfos[stream] + if !ok { + info = new(streamInfo) + streamInfos[stream] = info + } + + require.Nil(t, info.Writer, "each stream must have exactly one writer") + info.Writer = node + } + } + + return nil + }, dag.PreOrderWalk) + } + + for _, info := range streamInfos { + require.NotNil(t, info.Reader, "each stream must have exactly one reader") + require.NotNil(t, info.Writer, "each stream must have exactly one writer") + } +} + +// generateConsistentULIDs reassigns ULIDs to all tasks and streams in the graph +// using the ULID generator for consistent output. +func generateConsistentULIDs(gen *ulidGenerator, g dag.Graph[*Task]) { + for _, root := range g.Roots() { + _ = g.Walk(root, func(task *Task) error { + task.ULID = gen.Make() + return nil + }, dag.PreOrderWalk) + } + + // For easier debugging, we do a second pass for all the streams. That way + // we know stream ULIDs start after task ULIDs. + for _, root := range g.Roots() { + _ = g.Walk(root, func(task *Task) error { + for _, streams := range task.Sources { + for _, stream := range streams { + stream.ULID = gen.Make() + } + } + return nil + }, dag.PreOrderWalk) + } +} + +type ulidGenerator struct { + lastCounter uint64 +} + +func (g *ulidGenerator) Make() ulid.ULID { + g.lastCounter++ + value := g.lastCounter + + // Manually set ULID bytes - counter in the last 8 bytes, zeros in first 8 + var ulidBytes [16]byte + // Put the counter value in the last 8 bytes (big-endian) + for i := range 8 { + ulidBytes[15-i] = byte(value >> (8 * i)) + } + return ulid.ULID(ulidBytes) +} diff --git a/pkg/engine/internal/workflow/workflow_print.go b/pkg/engine/internal/workflow/workflow_print.go new file mode 100644 index 0000000000000..965d5bd956f45 --- /dev/null +++ b/pkg/engine/internal/workflow/workflow_print.go @@ -0,0 +1,101 @@ +package workflow + +import ( + "fmt" + "io" + "strings" + + "github.com/grafana/loki/v3/pkg/engine/internal/planner/physical" + "github.com/grafana/loki/v3/pkg/engine/internal/util/dag" + "github.com/grafana/loki/v3/pkg/engine/internal/util/tree" +) + +// Sprint returns a string representation of the workflow. +func Sprint(wf *Workflow) string { + var sb strings.Builder + _ = Fprint(&sb, wf) + return sb.String() +} + +// Fprint prints a string representation of the workflow to the given writer. +func Fprint(w io.Writer, wf *Workflow) error { + visited := make(map[*Task]struct{}, wf.graph.Len()) + + roots := wf.graph.Roots() + for _, root := range roots { + err := wf.graph.Walk(root, func(n *Task) error { + if _, seen := visited[n]; seen { + return nil + } + visited[n] = struct{}{} + + fmt.Fprintf(w, "Task %s\n", n.ID()) + fmt.Fprintln(w, "-------------------------------") + + var sb strings.Builder + for _, root := range n.Fragment.Roots() { + printer := tree.NewPrinter(&sb) + + planTree := physical.BuildTree(n.Fragment, root) + + for node, streams := range n.Sources { + treeNode := findTreeNode(planTree, func(n *tree.Node) bool { return n.Context == node }) + if treeNode == nil { + continue + } + + for _, stream := range streams { + treeNode.AddComment("@source", "", []tree.Property{tree.NewProperty("stream", false, stream.ULID.String())}) + } + } + + for node, streams := range n.Sinks { + treeNode := findTreeNode(planTree, func(n *tree.Node) bool { return n.Context == node }) + if treeNode == nil { + continue + } + + for _, stream := range streams { + treeNode.AddComment("@sink", "", []tree.Property{tree.NewProperty("stream", false, stream.ULID.String())}) + } + } + + printer.Print(planTree) + } + + if _, err := io.Copy(w, strings.NewReader(sb.String())); err != nil { + return err + } else if _, err := fmt.Fprintln(w); err != nil { + return err + } + return nil + }, dag.PreOrderWalk) + if err != nil { + return err + } + } + + return nil +} + +// findTreeNode finds the first node in the tree that satisfies the given +// predicate. findTreeNode returns nil if no node is found. +func findTreeNode(root *tree.Node, f func(node *tree.Node) bool) *tree.Node { + if f(root) { + return root + } + + for _, child := range root.Children { + if node := findTreeNode(child, f); node != nil { + return node + } + } + + for _, comment := range root.Comments { + if node := findTreeNode(comment, f); node != nil { + return node + } + } + + return nil +} diff --git a/pkg/engine/internal/workflow/workflow_test.go b/pkg/engine/internal/workflow/workflow_test.go new file mode 100644 index 0000000000000..7ffba2b3a17a6 --- /dev/null +++ b/pkg/engine/internal/workflow/workflow_test.go @@ -0,0 +1,281 @@ +package workflow + +import ( + "context" + "errors" + "fmt" + "testing" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/go-kit/log" + "github.com/oklog/ulid/v2" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/engine/internal/executor" + "github.com/grafana/loki/v3/pkg/engine/internal/planner/physical" + "github.com/grafana/loki/v3/pkg/engine/internal/util/dag" +) + +// Test performs an end-to-end test of a workflow, asserting that: +// +// 1. All streams get registered before running tasks. +// 2. There is never more than one sender/receiver for a stream. +// 3. The root task has a pipeline listener. +// 4. Closing the pipeline properly cleans up tasks and streams. +// +// Some of these assertions are handled by [fakeRunner], which returns an error +// when used improperly. +func Test(t *testing.T) { + var physicalGraph dag.Graph[physical.Node] + + var ( + scan = physicalGraph.Add(&physical.DataObjScan{}) + rangeAgg = physicalGraph.Add(&physical.RangeAggregation{}) + vectorAgg = physicalGraph.Add(&physical.VectorAggregation{}) + ) + + _ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: rangeAgg, Child: scan}) + _ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: vectorAgg, Child: rangeAgg}) + + physicalPlan := physical.FromGraph(physicalGraph) + + fr := newFakeRunner() + + wf, err := New(log.NewNopLogger(), fr, physicalPlan) + require.NoError(t, err, "workflow should construct properly") + require.NotNil(t, wf.resultsStream, "workflow should have created results stream") + + defer func() { + if !t.Failed() { + return + } + + t.Log("Failing workflow:") + t.Log(Sprint(wf)) + }() + + // Run returns an error if any of the methods in our fake runner failed. + p, err := wf.Run(t.Context()) + require.NoError(t, err, "Workflow should start properly") + + defer func() { + p.Close() + + // Closing the pipeline should remove all remaining streams and tasks. + require.Len(t, fr.streams, 0, "all streams should be removed after closing the pipeline") + require.Len(t, fr.tasks, 0, "all streams should be removed after closing the pipeline") + }() + + rs, ok := fr.streams[wf.resultsStream.ULID] + require.True(t, ok, "results stream should be registered in runner") + require.NotEqual(t, ulid.Zero, rs.Sender, "results stream should have a sender") + require.Equal(t, ulid.Zero, rs.TaskReceiver, "results stream should not have a task receiver") + require.NotNil(t, rs.Listener, "results stream should have a listener") + + // Check to make sure all known tasks have been given to the runner. + for _, task := range wf.allTasks() { + _, exist := fr.tasks[task.ULID] + require.True(t, exist, "workflow should give all tasks to runner (task %s is missing)", task.ULID) + } +} + +// TestCancellation tasks that a task entering a terminal state cancels all +// downstream tasks. +func TestCancellation(t *testing.T) { + var physicalGraph dag.Graph[physical.Node] + + var ( + scan = physicalGraph.Add(&physical.DataObjScan{}) + rangeAgg = physicalGraph.Add(&physical.RangeAggregation{}) + vectorAgg = physicalGraph.Add(&physical.VectorAggregation{}) + ) + + _ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: rangeAgg, Child: scan}) + _ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: vectorAgg, Child: rangeAgg}) + + physicalPlan := physical.FromGraph(physicalGraph) + + terminalStates := []TaskState{TaskStateCancelled, TaskStateCancelled, TaskStateFailed} + for _, state := range terminalStates { + t.Run(state.String(), func(t *testing.T) { + fr := newFakeRunner() + wf, err := New(log.NewNopLogger(), fr, physicalPlan) + require.NoError(t, err, "workflow should construct properly") + require.NotNil(t, wf.resultsStream, "workflow should have created results stream") + + defer func() { + if !t.Failed() { + return + } + + t.Log("Failing workflow:") + t.Log(Sprint(wf)) + }() + + // Run returns an error if any of the methods in our fake runner failed. + p, err := wf.Run(t.Context()) + require.NoError(t, err, "Workflow should start properly") + defer p.Close() + + rootTask, err := wf.graph.Root() + require.NoError(t, err, "should be able to retrieve singular root task") + + rt, ok := fr.tasks[rootTask.ULID] + require.True(t, ok, "root task should be registered with runner") + + // Notify the workflow that the root task has entered a terminal + // state. + rt.handler(t.Context(), rootTask, state) + + _ = wf.graph.Walk(rootTask, func(n *Task) error { + if n == rootTask { + return nil + } + + require.Equal(t, TaskStateCancelled, wf.taskStates[n], "downstream task %s should be canceled", n.ULID) + return nil + }, dag.PreOrderWalk) + }) + } +} + +type fakeRunner struct { + streams map[ulid.ULID]*runnerStream + tasks map[ulid.ULID]*runnerTask +} + +func newFakeRunner() *fakeRunner { + return &fakeRunner{ + streams: make(map[ulid.ULID]*runnerStream), + tasks: make(map[ulid.ULID]*runnerTask), + } +} + +func (f *fakeRunner) AddStreams(_ context.Context, handler StreamEventHandler, streams ...*Stream) error { + for _, stream := range streams { + if _, exist := f.streams[stream.ULID]; exist { + return fmt.Errorf("stream %s already added", stream.ULID) + } + + f.streams[stream.ULID] = &runnerStream{ + Stream: stream, + Handler: handler, + } + } + + return nil +} + +func (f *fakeRunner) RemoveStreams(_ context.Context, streams ...*Stream) error { + var errs []error + + for _, stream := range streams { + if _, exist := f.streams[stream.ULID]; !exist { + errs = append(errs, fmt.Errorf("stream %s not found", stream.ULID)) + } + delete(f.streams, stream.ULID) + } + + return errors.Join(errs...) +} + +func (f *fakeRunner) Listen(_ context.Context, stream *Stream) (executor.Pipeline, error) { + rs, exist := f.streams[stream.ULID] + if !exist { + return nil, fmt.Errorf("stream %s not found", stream.ULID) + } else if rs.Listener != nil || rs.TaskReceiver != ulid.Zero { + return nil, fmt.Errorf("stream %s already bound", stream.ULID) + } + + rs.Listener = noopPipeline{} + return rs.Listener, nil +} + +func (f *fakeRunner) Start(ctx context.Context, handler TaskEventHandler, tasks ...*Task) error { + for _, task := range tasks { + if _, exist := f.tasks[task.ULID]; exist { + return fmt.Errorf("task %s already added", task.ULID) + } + + f.tasks[task.ULID] = &runnerTask{ + task: task, + handler: handler, + } + + for _, streams := range task.Sinks { + for _, stream := range streams { + rs, exist := f.streams[stream.ULID] + if !exist { + return fmt.Errorf("sink stream %s not found", stream.ULID) + } else if rs.Sender != ulid.Zero { + return fmt.Errorf("stream %s already bound to sender %s", stream.ULID, rs.Sender) + } + + rs.Sender = task.ULID + } + } + for _, streams := range task.Sources { + for _, stream := range streams { + rs, exist := f.streams[stream.ULID] + if !exist { + return fmt.Errorf("source stream %s not found", stream.ULID) + } else if rs.TaskReceiver != ulid.Zero { + return fmt.Errorf("source stream %s already bound to %s", stream.ULID, rs.TaskReceiver) + } else if rs.Listener != nil { + return fmt.Errorf("source stream %s already bound to local receiver", stream.ULID) + } + + rs.TaskReceiver = task.ULID + } + } + + // Inform handler of task state change. + handler(ctx, task, TaskStatePending) + } + + return nil +} + +func (f *fakeRunner) Cancel(ctx context.Context, tasks ...*Task) error { + var errs []error + + for _, task := range tasks { + rt, exist := f.tasks[task.ULID] + if !exist { + errs = append(errs, fmt.Errorf("task %s not found", task.ULID)) + continue + } + + rt.handler(ctx, task, TaskStateCancelled) + delete(f.tasks, task.ULID) + } + + return errors.Join(errs...) +} + +// runnerStream wraps a stream with the edges of the connection. A stream sender +// is always another task, while the receiver is either another task or a +// pipeline owned by the runner, given to the workflow. +type runnerStream struct { + Stream *Stream + Handler StreamEventHandler + + TaskReceiver ulid.ULID // Task listening for messages on stream. + Listener executor.Pipeline // Pipeline listening for messages on stream. + + Sender ulid.ULID +} + +type runnerTask struct { + task *Task + handler TaskEventHandler +} + +type noopPipeline struct{} + +func (noopPipeline) Read(ctx context.Context) (arrow.Record, error) { + <-ctx.Done() + return nil, ctx.Err() +} + +func (noopPipeline) Close() {} From 2db4323ce935e73d095a5d23a68d951baabb56f3 Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Tue, 21 Oct 2025 08:14:24 -0400 Subject: [PATCH 5/5] fixup! chore(engine): add workflow package --- pkg/engine/internal/workflow/workflow.go | 4 ++-- pkg/engine/internal/workflow/workflow_planner.go | 6 +----- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/pkg/engine/internal/workflow/workflow.go b/pkg/engine/internal/workflow/workflow.go index 8fb38a3e9c4b4..5c7730115bff6 100644 --- a/pkg/engine/internal/workflow/workflow.go +++ b/pkg/engine/internal/workflow/workflow.go @@ -59,7 +59,7 @@ func New(logger log.Logger, runner Runner, plan *physical.Plan) (*Workflow, erro }, nil } -// injectResultsStreams injects a new stream into the sinks of the root task for +// injectResultsStream injects a new stream into the sinks of the root task for // the workflow to receive final results. func injectResultsStream(graph *dag.Graph[*Task]) (*Stream, error) { results := &Stream{ULID: ulid.Make()} @@ -143,7 +143,7 @@ func (wf *Workflow) allStreams() []*Stream { _ = wf.graph.Walk(root, func(t *Task) error { // Task construction guarantees that there is a sink for each source // (minus the results stream we generate), so there's no point in - // interating over Sources and Sinks. + // iterating over Sources and Sinks. for _, streams := range t.Sources { for _, stream := range streams { if _, seen := seenStreams[stream]; seen { diff --git a/pkg/engine/internal/workflow/workflow_planner.go b/pkg/engine/internal/workflow/workflow_planner.go index 57c9d7d879b8b..edc4bb82c5cac 100644 --- a/pkg/engine/internal/workflow/workflow_planner.go +++ b/pkg/engine/internal/workflow/workflow_planner.go @@ -17,7 +17,6 @@ type planner struct { physical *physical.Plan streamWriters map[*Stream]*Task // Lookup of stream to which task writes to it - streamReaders map[*Stream]*Task // Lookup of stream to which task reads from it } // planWorkflow partitions a physical plan into a graph of tasks. @@ -34,7 +33,6 @@ func planWorkflow(plan *physical.Plan) (dag.Graph[*Task], error) { physical: plan, streamWriters: make(map[*Stream]*Task), - streamReaders: make(map[*Stream]*Task), } if err := planner.Process(root); err != nil { return dag.Graph[*Task]{}, err @@ -54,7 +52,7 @@ func (p *planner) Process(root physical.Node) error { // indicates whether pipeline breaker nodes should be split off into their own // task. // -// The reuslting task is the task immediately produced by node, which callers +// The resulting task is the task immediately produced by node, which callers // can use to add edges. All tasks, including those produced in recursive calls // to processNode, are added into p.Graph. func (p *planner) processNode(node physical.Node, splitOnBreaker bool) (*Task, error) { @@ -116,7 +114,6 @@ func (p *planner) processNode(node physical.Node, splitOnBreaker bool) (*Task, e return nil, err } sources[next] = append(sources[next], stream) - p.streamReaders[stream] = task } case child.Type() == physical.NodeTypeParallelize: @@ -140,7 +137,6 @@ func (p *planner) processNode(node physical.Node, splitOnBreaker bool) (*Task, e return nil, err } sources[next] = append(sources[next], stream) - p.streamReaders[stream] = task } default: