Skip to content

Conversation

rfratto
Copy link
Member

@rfratto rfratto commented Oct 15, 2025

The new workflow package introduces an abstraction over physical plans:

  • A physical plan is split into several parallelizable units called "tasks," split on pipeline breakers and generated shards downstream of Parallelize nodes (chore(engine): add Parallelize hint node #19521).

  • Each task sends or receives data along "streams."

  • The "workflow" is the graph of tasks to execute for a given query. A workflow listens for task results from the root task.

Other engines typically call these units "fragments," with the collection of fragments constructing the "distributed query plan."

We don't use the standard term here since our usage is stateful. Workflows respond to tasks changing state, and will eventually be responsible for deciding when a task should be enqueued for running at all.

The workflow.Runner interface is introduced to represent the mechanism running tasks. A basic implementation is used for testing, but in production, workflow.Runner will be implemented by the scheduler.

@rfratto rfratto requested a review from a team as a code owner October 15, 2025 18:41

func toTreeNode(n Node) *tree.Node {
treeNode := tree.NewNode(n.Type().String(), "")
treeNode.Context = n
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used by workflow.Sprint to allow workflow printing to hook into the tree produced by the physical plan and add additional context for nodes (the streams to write to or read from).

Comment on lines +12 to +13
// ULID is a unique identifier of the Task.
ULID ulid.ULID
Copy link
Member Author

@rfratto rfratto Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be UUIDv7, but I picked ULID here because:

  • Its output is smaller (26 vs 36 bytes), and
  • ULIDs have a canonical binary wire representation, unlike UUIDv7, allowing us to safely marshal its binary representation (16 bytes) rather than the text representation.

ULID also requires monotonic counters, which guarantees that a single instance of a process can't generate collisions. With UUIDv7, it's recommended but not required, so it depends on the library you use.

@rfratto rfratto force-pushed the thor-scheduler-workflow branch 2 times, most recently from b37ed12 to 356a763 Compare October 15, 2025 19:14
@rfratto
Copy link
Member Author

rfratto commented Oct 16, 2025

I'm going to move this back into draft, I can complete the implementation of workflow splitting now that #19521 is available, and #19524 will be merged soon.

@rfratto rfratto marked this pull request as draft October 16, 2025 16:11
@rfratto rfratto force-pushed the thor-scheduler-workflow branch 3 times, most recently from 49d9f8a to 43f7b67 Compare October 17, 2025 15:41
The scheduler prototype was relying on the tree ID matching the node ID
for injecting additional information into the tree (streams used by that
node).

This commit introduces a workaround by allocating creators of a tree
node to inject arbitrary values as context.

Signed-off-by: Robert Fratto <[email protected]>
As the scheduler prototype will manipulate physical plans and split them
into smaller fragments, utility functions are needed to create physical
plans from DAGs and return the existing DAG for modification.

Signed-off-by: Robert Fratto <[email protected]>
A shardable node is a physical plan node that supports being split into
multiple smaller nodes.

Currently, ScanSet is the only shardable node, where each shard is a one
of its targets (resulting in a DataObjScan node).

Shardable nodes will be used in task planning to create tasks downstream
of Parallelize.

Signed-off-by: Robert Fratto <[email protected]>
The new workflow package introduces an abstraction over physical plans:

* A physical plan is split into several parallelizable units called
  "tasks," split on pipeline breakers and generated shards downstream of
  Parallelize nodes.

* Each task sends or receives data along streams.

* The "workflow" is the graph of tasks to execute for a given query. A
  workflow listens for task results from the root task.

Other engines typically call these units "fragments," with the
collection of fragments constructing the "distributed query plan."

We don't use the standard term here since our usage is stateful.
Workflows respond to tasks changing state, and will eventually be
responsible for deciding when a task should be enqueued for running at
all.

The workflow.Runner interface is introduced to represent the mechanism
running tasks. A basic implementation is used for testing, but in
production, workflow.Runner will be implemented by the scheduler.

Signed-off-by: Robert Fratto <[email protected]>
@rfratto rfratto force-pushed the thor-scheduler-workflow branch from 43f7b67 to 382c2c1 Compare October 17, 2025 16:02
@rfratto
Copy link
Member Author

rfratto commented Oct 17, 2025

This is ready for a review again, I've integrated #19521 and #19524, and now the workflow package is generating the representation of tasks I used in the scheduler prototype.

@rfratto rfratto assigned rfratto and unassigned rfratto Oct 17, 2025
@rfratto rfratto marked this pull request as ready for review October 17, 2025 16:18
@rfratto rfratto requested a review from spiridonov October 17, 2025 16:19

func (wf *Workflow) onTaskChange(ctx context.Context, task *Task, newState TaskState) {
wf.tasksMut.Lock()
wf.taskStates[task] = newState
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wdyt about forbidding changing a terminal task state to a non-terminal? I think there could be race conditions otherwise. The lock is released on line 220 before the tasks are cancelled => if a task becomes non-terminal again after the lock is released but before the children are cancelled it might stuck (?).

I understand that the above sounds very unlikely to happen but the whole idea of a task being able to resurrect from a terminal state increases the complexity, so if it's not intended why not to forbid it explicitly? 😅

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's a race condition here; the most important thing is that onTaskChange can't be called recursively while the lock is held, which we prevent by releasing the lock right before canceling tasks. (It's fine if the tasks somehow revive themselves; that invocation of onTaskChange would wait for the previous invocation to exit).

I'm not sure we want onTaskChange to be responsible for rejecting state changes; workflow is more of a responder to whatever the runner decides the task state is. I'd like to revisit this once we have the worker in place and see how it feels, and then we can adjust based on that.

@rfratto rfratto merged commit 6b96e4d into main Oct 21, 2025
65 checks passed
@rfratto rfratto deleted the thor-scheduler-workflow branch October 21, 2025 12:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants