Skip to content

Conversation

@qianl15
Copy link

@qianl15 qianl15 commented Oct 8, 2025

Summary

This PR integrates DBOS with workflow and context to provide out-of-the-box durable execution and checkpointing.

Changes

  • DBOSWorkflow:

    • It subclasses Workflow to start a DBOS durable workflow for each step worker, and use durable notification for sending the StartEvent.
    • Once the _done step finishes, it broadcasts a message to all running DBOS workflows to signal the end. This cleanly terminates running tasks.
  • DBOSContext:

    • It subclasses Context to provide durable execution, making each step worker's main loop a DBOS workflow (including the cancel worker). It also uses DBOS.recv to durably receive incoming events for each step.
    • Because DBOS requires each workflow to be uniquely and statically defined, each DBOSContext needs to have a unique name and be created in a static function. This is important for DBOS to correctly find the definition of workflows for failure recovery.
    • For each user defined LlamaIndex step, it's automatically within the DBOS workflow. Therefore, if the step needs to call any external API or non-deterministic functions, users should wrap them in DBOS.step.
    • ctx.send_event and ctx.send_event_async use DBOS durable send instead of in-memory queues for communicating events between steps.
    • Use DBOS.sleep and _durable_time to make sure determinism within a step workflow.
@DBOS.step()
async def _durable_time() -> float:
    return time.time()

Example

Here is a simple example to use DBOS.

import asyncio
from pydantic import BaseModel, Field
from workflows import Context, Workflow, step
from workflows.events import Event, StartEvent, StopEvent
from dbos import DBOS, DBOSConfig

class MyEvent(Event):
    msg: list[str]

class RunState(BaseModel):
    num_runs: int = Field(default=0)

class MyWorkflow(DBOSWorkflow):
    @step
    async def start(self, ctx: Context[RunState], ev: StartEvent) -> MyEvent:
        async with ctx.store.edit_state() as state:
            state.num_runs += 1

            return MyEvent(msg=[ev.input_msg] * state.num_runs)

    @step
    async def process(self, ctx: Context[RunState], ev: MyEvent) -> StopEvent:
        data_length = len("".join(ev.msg))
        new_msg = f"Processed {len(ev.msg)} times, data length: {data_length}"
        return StopEvent(result=new_msg)

async def main():
    config: DBOSConfig = {
        "name": "dbos-llamaindex",
        "system_database_url": "sqlite:///dbos_llamaindex.sqlite",
    }
    DBOS(config=config)
    DBOS.launch()
    workflow = MyWorkflow()

    # For failure recovery, DBOS context should be defined statically (not in a dynamic function) with a unique name so that the recovery process can find it.
    ctx = DBOSContext(workflow, "run_my_workflow")
    result = await workflow.run(input_msg="Hello, world!", ctx=ctx)
    print("Workflow result:", result)


if __name__ == "__main__":
    asyncio.run(main())

Discussion

  • Potentially change the way of the _done step behavior -- it currently throws a WorkflowDone exception. Though it works, its workflow generates error stack trace that can be annoying. An alternative way is to use durable events to signal the end of the workflow.
  • How does it interact with LlamaIndex workflow server? Maybe the top level Workflow.run should also become a DBOS workflow, and each step is a sub-workflow. However, currently, _run_workflow uses asyncio.wait which is not deterministic (a fundamental requirement for durable workflows). This may require changes to how it waits for the workflow to finish.
  • How to integrate with StateStore. Currently it's in-memory and users need to access it from DBOS.step functions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant