Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ require (
github.com/influxdata/tdigest v0.0.2-0.20210216194612-fc98d27c9e8b
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db
github.com/ncw/swift/v2 v2.0.4
github.com/oklog/ulid/v2 v2.1.1
github.com/parquet-go/parquet-go v0.25.1
github.com/pressly/goose/v3 v3.26.0
github.com/prometheus/alertmanager v0.28.1
Expand Down Expand Up @@ -241,7 +242,6 @@ require (
github.com/muesli/termenv v0.16.0 // indirect
github.com/ncruces/go-strftime v0.1.9 // indirect
github.com/ncw/swift v1.0.53 // indirect
github.com/oklog/ulid/v2 v2.1.1 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.129.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.129.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor v0.129.0 // indirect
Expand Down
24 changes: 24 additions & 0 deletions pkg/engine/internal/planner/physical/plan.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package physical

import (
"iter"

"github.com/grafana/loki/v3/pkg/engine/internal/util/dag"
)

Expand Down Expand Up @@ -78,6 +80,19 @@ type Node interface {
isNode()
}

// ShardableNode is a Node that can be split into multiple smaller partitions.
type ShardableNode interface {
Node

// Shards produces a sequence of nodes that represent a fragment of the
// original node. Returned nodes do not need to be the same type as the
// original node.
//
// Implementations must produce unique values of Node in each call to
// Shards.
Shards() iter.Seq[Node]
}

var _ Node = (*DataObjScan)(nil)
var _ Node = (*Projection)(nil)
var _ Node = (*Limit)(nil)
Expand Down Expand Up @@ -126,6 +141,15 @@ type Plan struct {
graph dag.Graph[Node]
}

// FromGraph constructs a Plan from a given DAG.
func FromGraph(graph dag.Graph[Node]) *Plan {
return &Plan{graph: graph}
}

// Graph returns the underlying graph of the plan. Modifications to the returned
// graph will affect the Plan.
func (p *Plan) Graph() *dag.Graph[Node] { return &p.graph }

// Len returns the number of nodes in the graph.
func (p *Plan) Len() int { return p.graph.Len() }

Expand Down
2 changes: 2 additions & 0 deletions pkg/engine/internal/planner/physical/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ func toTree(p *Plan, n Node) *tree.Node {

func toTreeNode(n Node) *tree.Node {
treeNode := tree.NewNode(n.Type().String(), "")
treeNode.Context = n
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used by workflow.Sprint to allow workflow printing to hook into the tree produced by the physical plan and add additional context for nodes (the streams to write to or read from).


switch node := n.(type) {
case *DataObjScan:
treeNode.Properties = []tree.Property{
Expand Down
26 changes: 26 additions & 0 deletions pkg/engine/internal/planner/physical/scanset.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package physical

import (
"fmt"
"iter"
)

// ScanTarget represents a target of a [ScanSet].
Expand Down Expand Up @@ -87,3 +88,28 @@ func (s *ScanSet) Type() NodeType {
func (s *ScanSet) Accept(v Visitor) error {
return v.VisitScanSet(s)
}

// Shards returns an iterator over the shards of the scan. Each emitted shard
// will be a clone. Projections and predicates on the ScanSet are cloned and
// applied to each shard.
//
// Shards panics if one of the targets is invalid.
func (s *ScanSet) Shards() iter.Seq[Node] {
return func(yield func(Node) bool) {
for _, target := range s.Targets {
switch target.Type {
case ScanTypeDataObject:
node := target.DataObject.Clone().(*DataObjScan)
node.Projections = cloneExpressions(s.Projections)
node.Predicates = cloneExpressions(s.Predicates)

if !yield(node) {
return
}

default:
panic(fmt.Sprintf("invalid scan type %s", target.Type))
}
}
}
}
2 changes: 2 additions & 0 deletions pkg/engine/internal/util/tree/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type Node struct {
// for comments are tree-style properties of a node, such as expressions of a
// physical plan node.
Comments []*Node
// Context is an optional value to associate with the node.
Context any
}

// NewNode creates a new node with the given name, unique identifier and
Expand Down
144 changes: 144 additions & 0 deletions pkg/engine/internal/workflow/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package workflow

import (
"context"
"fmt"

"github.com/grafana/loki/v3/pkg/engine/internal/executor"
)

// A Runner can asynchronously execute a workflow.
type Runner interface {
// AddStreams registers a list of Streams that can be used by Tasks.
// AddStreams returns an error if any of the streams (by ID) are already
// registered.
//
// The provided handler will be called whenever any of the provided streams
// change state.
AddStreams(ctx context.Context, handler StreamEventHandler, streams ...*Stream) error

// RemoveStreams removes a list of Streams that can be used by Tasks. The
// associated [StreamEventHandler] will no longer be called for removed
// streams.
//
// RemoveStreams returns an error if there are active tasks using the
// streams.
RemoveStreams(ctx context.Context, streams ...*Stream) error

// Listen binds the caller as the receiver of the specified stream.
// Listening on a stream prevents tasks from reading from it.
Listen(ctx context.Context, stream *Stream) (executor.Pipeline, error)

// Start begins executing the provided tasks in the background. Start
// returns an error if any of the Tasks references an unregistered Stream,
// or if any of the tasks are a reader of a stream that's already bound.
//
// The provided handler will be called whenever any of the provided tasks
// change state.
//
// Implementations must track executed tasks until the tasks enter a
// terminal state.
//
// The provided context is used for the lifetime of the tasks. Cancelling
// the context will cancel all tasks, and close associated streams for
// sending.
Start(ctx context.Context, handler TaskEventHandler, tasks ...*Task) error

// Cancel requests cancelation of the specified tasks. Cancel returns an
// error if any of the tasks were not found.
Cancel(ctx context.Context, tasks ...*Task) error
}

// StreamEventHandler is a function that handles events for changed streams.
type StreamEventHandler func(ctx context.Context, s *Stream, newState StreamState)

// StreamState represents the state of a stream. It is sent as an event by a
// [Runner] whenever a stream associated with a task changes its state.
//
// The zero value of StreamState is an inactive stream.
type StreamState int

const (
// StreamStateIdle represents a stream that is waiting for both the sender
// and receiver to be available.
StreamStateIdle StreamState = iota

// StreamStateOpen represents a stream that is open and transmitting data.
StreamStateOpen

// StreamStateBlocked represents a stream that is blocked (by backpressure)
// on sending data.
StreamStateBlocked

// StreamStateClosed represents a stream that is closed and no longer
// transmitting data.
StreamStateClosed
)

var streamStates = [...]string{
"Idle",
"Open",
"Blocked",
"Closed",
}

// String returns a string representation of the StreamState.
func (s StreamState) String() string {
if s >= 0 && int(s) < len(streamStates) {
return streamStates[s]
}
return fmt.Sprintf("StreamState(%d)", s)
}

// TaskEventHandler is a function that handles events for changed tasks.
type TaskEventHandler func(ctx context.Context, t *Task, newState TaskState)

// TaskState represents the state of a Task. It is sent as an event by a
// [Runner] whenever a Task associated with a task changes its state.
type TaskState int

const (
// TaskStateCreated represents the initial state for a Task, where it has
// been created but not given to a [Runner].
TaskStateCreated TaskState = iota

// TaskStatePending represents a Task that is pending execution by a
// [Runner].
TaskStatePending

// TaskStateRunning represents a Task that is currently being executed by a
// [Runner].
TaskStateRunning

// TaskStateCompleted represents a Task that has completed successfully.
TaskStateCompleted

// TaskStateCancelled represents a Task that has been cancelled, either by a
// [Runner] or the owning [Workflow].
TaskStateCancelled

// TaskStateFailed represents a Task that has failed during execution.
TaskStateFailed
)

var TaskStates = [...]string{
"Created",
"Pending",
"Running",
"Completed",
"Cancelled",
"Failed",
}

// Terminal returns true if the TaskState is terminal.
func (s TaskState) Terminal() bool {
return s == TaskStateCompleted || s == TaskStateCancelled || s == TaskStateFailed
}

// String returns a string representation of the TaskState.
func (s TaskState) String() string {
if s >= 0 && int(s) < len(TaskStates) {
return TaskStates[s]
}
return fmt.Sprintf("TaskState(%d)", s)
}
36 changes: 36 additions & 0 deletions pkg/engine/internal/workflow/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package workflow

import (
"github.com/oklog/ulid/v2"

"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
)

// A Task is a single unit of work within a workflow. Each Task is a partition
// of a local physical plan.
type Task struct {
// ULID is a unique identifier of the Task.
ULID ulid.ULID
Comment on lines +12 to +13
Copy link
Member Author

@rfratto rfratto Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be UUIDv7, but I picked ULID here because:

  • Its output is smaller (26 vs 36 bytes), and
  • ULIDs have a canonical binary wire representation, unlike UUIDv7, allowing us to safely marshal its binary representation (16 bytes) rather than the text representation.

ULID also requires monotonic counters, which guarantees that a single instance of a process can't generate collisions. With UUIDv7, it's recommended but not required, so it depends on the library you use.


// Fragment is the local physical plan that this Task represents.
Fragment *physical.Plan

// Sources defines which Streams physical nodes read from. Sources are only
// defined for nodes in the Fragment which read data across task boundaries.
Sources map[physical.Node][]*Stream

// Sinks defines which Streams physical nodes write to. Sinks are only
// defined for nodes in the Fragment which write data across task boundaries.
Sinks map[physical.Node][]*Stream
}

// ID returns the string form of the Task's ULID.
func (t *Task) ID() string { return t.ULID.String() }

// A Stream is an abstract representation of how data flows across Task
// boundaries. Each Stream has exactly one sender (a Task), and one receiver
// (either another Task or the owning [Workflow]).
type Stream struct {
// ULID is a unique identifier of the Stream.
ULID ulid.ULID
}
Loading