Skip to content

Conversation

@adrianlyjak
Copy link
Contributor

@adrianlyjak adrianlyjak commented Oct 7, 2025

This is a feature branch for refactoring the internals of workflows to better support a pluggable runtime, and to be easier to extend with additional features. It mainly split apart a lot of responsibilities that were held by the context, and create a deeper internal structure with varied responsibilities. It maintains parity with the existing public interface with minimal changes

Related PRs:


Original PR Notes:

This is a big one 😳 sorry! It's 100% hand coded though

First step of a refactor to facilitate better plugins for managing future runtime pluggability - long term goals are better support for distributed and/or persistent workflows by extending to external coordinators

This refactor focuses on giving more discrete responsibilities to a few new components, and narrow the responsibility of existing ones (namely Context):

  • Adds a WorkflowBroker class that by and large lifts methods from Context that are related to task, queue, and lock management. The long term goal is to define and break this up even further. There was a small amount of runtime/starting logic in the Workflow that was also moved here
  • Adds a SerializedContext typed intermediary pydantic model to validate and document the current serialized state, (rather than passing around a plain dict). The dict interface remains unchanged, to maintain compatibility
  • Adds a related WorkflowBrokerState, which contains the mutable/asyncio python state that parallels most of a SerializedContext

Context now contains a reference to the broker. Note, it also still contains the reference to the store (this was not moved to the Broker). My perspective is that the state and the runtime durability will have separate needs, and shouldn't be closely coupled. For this reason, I also removed the state snapshot on NOT_IN_PROGRESS internal events--I think it may be better to rethink that while it is unused so we can focus on figuring out making the state store and runtime more configurable/extendable.

The intialization of the workflow run was a little distributed before, so this part of the code is "new" (as opposed to copy pasted).

Sidequest 1: Adds better types to the @step decorator return value, such that the _step_config attribute is typed into the returned step function (this removes various get_attr calls in the code). Renames it to _step_config so python doesn't do name mangling

@coveralls
Copy link

coveralls commented Oct 7, 2025

Pull Request Test Coverage Report for Build 18885222297

Details

  • 994 of 1098 (90.53%) changed or added relevant lines in 20 files are covered.
  • 8 unchanged lines in 4 files lost coverage.
  • Overall coverage decreased (-0.3%) to 89.235%

Changes Missing Coverage Covered Lines Changed/Added Lines %
src/workflows/decorators.py 27 28 96.43%
src/workflows/runtime/types/results.py 58 59 98.31%
src/workflows/utils.py 11 12 91.67%
src/workflows/runtime/workflow_registry.py 44 46 95.65%
src/workflows/handler.py 7 10 70.0%
src/workflows/plugins/basic.py 46 49 93.88%
src/workflows/runtime/types/internal_state.py 80 84 95.24%
src/workflows/server/representation_utils.py 1 5 20.0%
src/workflows/runtime/broker.py 152 159 95.6%
src/workflows/runtime/types/step_function.py 56 63 88.89%
Files with Coverage Reduction New Missed Lines %
src/workflows/context/serializers.py 1 87.93%
src/workflows/context/state_store.py 1 85.37%
src/workflows/workflow.py 2 95.06%
src/workflows/context/context.py 4 81.13%
Totals Coverage Status
Change from base Build 18883084359: -0.3%
Covered Lines: 2553
Relevant Lines: 2861

💛 - Coveralls

@adrianlyjak adrianlyjak force-pushed the adrian/context-refact branch 3 times, most recently from d8507d0 to 74e10d2 Compare October 10, 2025 04:17
@adrianlyjak adrianlyjak changed the title Adrian/context refact Context Refactor Oct 10, 2025
@adrianlyjak adrianlyjak marked this pull request as ready for review October 10, 2025 04:18
msg = "Error creating a Context instance: the provided payload has a wrong or old format."
raise ContextSerdeError(msg) from e

async def mark_in_progress(self, name: str, ev: Event, worker_id: str = "") -> None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these weren't prefixed, but they seem very private, so went ahead with deleting them



@functools.lru_cache(maxsize=1)
def _warn_get_result() -> None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seemed a little odd to access this on the context, rather than the handler, which more represents the run (and is literally a future that resolves to this value). I only saw one small test reference.

@adrianlyjak adrianlyjak force-pushed the adrian/context-refact branch from 74e10d2 to eb1d2b5 Compare October 10, 2025 04:34
@adrianlyjak adrianlyjak added the enhancement New feature or request label Oct 10, 2025
@adrianlyjak adrianlyjak force-pushed the adrian/context-refact branch 2 times, most recently from 7d4141c to 181edb0 Compare October 10, 2025 21:32
Copy link
Collaborator

@logan-markewich logan-markewich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a decent cleanup. Would probably benefit from an architecture diagram to clarify the current shape and if it matches the desired end-goal

Also before we merge this (or before we release this), will need to update the API reference docs

Comment on lines 199 to 204
broker_state = WorkflowBrokerState.from_serialized(
self._init_snapshot, self._serializer
)
self._broker_run = WorkflowBroker(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to picture some of the hierarchy here

  • Context
    • state_store
    • broker_run
      • broker_state

)
self._broker_run = WorkflowBroker(
workflow=workflow,
context=self,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it weird to pass in the entire context here? Circular dependency?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are certainly coupled, but have the same lifetime. The context is more or less just the public interface for the broker

self._broker_run = self._init_broker(workflow)

async def before_start() -> None:
if prev_broker is not None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't we just set it to None and re-initialize it above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the one that was maybe just noned out if pre-existing. Here, the shutdown is within the before_start so that it can be awaited before initializing the new broker.

FWIW, This would only happen if you re-use a context across multiple workflow runs

accepted_events: list[tuple[str, str]] = Field(default_factory=list)

# Broker log of all dispatched events in order, as serializer-encoded strings.
broker_log: list[str] = Field(default_factory=list)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tbh this isn't used and just eats memory, we could probably delete if we are changing this much

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm planning more changes, so this seemed like a good checkpoint for review at least to see how we feel about the direction. I think perhaps we should keep these changes on a branch to introduce the full change set in one go?

@adrianlyjak
Copy link
Contributor Author

adrianlyjak commented Oct 12, 2025

@logan-markewich

This is a decent cleanup. Would probably benefit from an architecture diagram to clarify the current shape and if it matches the desired end-goal

Also before we merge this (or before we release this), will need to update the API reference docs

Here's a quicky. Basically just split the context into a few separate components that are private implementation details. but let's discuss long term goals more next week. Idea being to be able to sort of switch out the broker/runtime thing with plugins, but there's a number of details in there that we'd still want to re-use across plugins

image

Good point about the reference docs. Those need some attention anyways (they still have stepwise! the errors page is empty?). For the most part the goal here was to make no api surface changes. I'd be inclined to take out Context.init from the docs. Seems like you should only be calling .from_dict if manually constructing a context.

@adrianlyjak adrianlyjak force-pushed the adrian/context-refact branch from e227cc9 to d7341f8 Compare October 12, 2025 14:20
Copy link
Member

@AstraBert AstraBert left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adrianlyjak I went through the PR and, while pretty much everything seems legit, I am still not super sure I understand the end goals of this: I think @logan-markewich mentioned to me we are doing it so that we can make our workflows effectively durable and long-running, but I am still struggling to visualize the bigger picture and to envision how these changes are going to affect the way we use workflows/context on a end-user perspective.
It will certainly benefit from some documentation/examples for the new patterns, but I think it might be good if we also take more time to discuss architecture and design choices :) Thanks for doing all of this work tho! 🙌

@adrianlyjak adrianlyjak force-pushed the adrian/context-refact branch 2 times, most recently from aaccdd6 to cd0e32e Compare October 21, 2025 19:00
@review-notebook-app
Copy link

Check out this pull request on  ReviewNB

See visual diffs & provide feedback on Jupyter Notebooks.


Powered by ReviewNB

@adrianlyjak adrianlyjak changed the title Context Refactor Runtime Plugins Oct 21, 2025
async def before_start() -> None:
if prev_broker is not None:
try:
await prev_broker.shutdown()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the prev_broker run shutdown before getting to this point? The lifecycle of the broker feels a little odd here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it is, but you still want the reference to the broker for getting the state, so just double checking here. This also just protects from someone doing weird things with capturing a context that's still running and passing it in again to the workflow.

# We do this regardless of is_running state so workflows can resume from where they left off
for step_name, worker_data in serialized.workers.items():
if step_name not in base_state.workers:
continue
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it a critical error here? To me this indicates that the context and workflow do not match/are out of sync?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is from serialized state, if you renamed / removed a step, this could happen. Debatable, but seems like it could easily happen in dev. Nice to still be able to best effort deserialize

return new_state

@property
def _replay_ticks(self) -> list[WorkflowTick]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if its every commented/explained why we need/want replay? Why did I need to replay ticks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replay is a bad name. This is more or less an event source architecture, so this is basically all you need to record to recreate the state. Right now it's an implementation detail from which we can re-derive the current state rather than syncing the full state from inside the control loop

"""Wait for the next tick from the internal queue."""
return await self.queue.get()

def queue_event(self, tick: WorkflowTick, delay: float | None = None) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: queue_tick ?

add: AddWaiter[EventType]


StepWorkerStateContextVar = ContextVar[StepWorkerContext]("step_worker")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Context vars might not work in a distributed setup? Not a blocker but seems notable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idea would be to set this up so that it works in a distributed context

Copy link
Collaborator

@logan-markewich logan-markewich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a lot of new code/concepts being introduced here 😬 Will require heavy validation (and maybe even a few beta releases?). But from what I can tell by eye, seems overall a good structure

We might need a more in-depth diagram too, tbh I'm losing track of concepts as I go through the code lol (While I would learn this in time, even for newcomers and contributors it would be nice).

from workflows.events import Event, StopEvent


@dataclass(frozen=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooc why dataclass? I know weird things can happen when you mix dataclasses and pydantic, typically I'd recommend picking one or the other

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they're not meant to be mixed right now, and these are more or less transient. Just wanted something light weight. If we want them to be serializable, should switch to pydantic.

@adrianlyjak adrianlyjak force-pushed the adrian/context-refact branch 2 times, most recently from 27bb2fb to 66d2c49 Compare October 27, 2025 22:48
* add some refactoring notes

* Clarify / document plugin interface better

* debug mem leak

* ugly things to workaround leaks

* clean up from leak fix
* ugly leak fixes

* remove test files
* cp

* wip

* wip

* wip

* working again

* test gha

* fix precommits

* Add llama-index-utils-workflow release

* woops

* clean up pyproject

* Update docs

* ugly toml sort fix

* oops
@adrianlyjak adrianlyjak force-pushed the adrian/context-refact branch from 070fba2 to 862274b Compare October 28, 2025 17:31
@adrianlyjak adrianlyjak merged commit 18d9bb9 into main Oct 28, 2025
11 checks passed
@adrianlyjak adrianlyjak deleted the adrian/context-refact branch October 28, 2025 18:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants