Skip to content

Commit a58dd90

Browse files
committed
update state
1 parent 5eb8ec0 commit a58dd90

File tree

1 file changed

+23
-2
lines changed

1 file changed

+23
-2
lines changed

src/workflows/runtime/state.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,41 @@
1313

1414
@dataclass
1515
class WorkflowBrokerState:
16-
"""Asyncio-native broker state holder managed by the runtime.
16+
"""Asyncio-native, serializable broker state for a single workflow run.
1717
18-
Note: The state store remains owned by `Context` and is not included here.
18+
Owned and mutated by ``WorkflowBroker``; excludes the user state store
19+
(owned by ``Context``). See ``SerializedContext`` for the wire format.
1920
"""
2021

22+
# True while a run is active; cleared on completion/timeout/cancel
2123
is_running: bool = False
24+
25+
# Event stream for observers; broker writes lifecycle/system events here
26+
# A None item may be used as a sentinel for end-of-stream
2227
streaming_queue: asyncio.Queue[Event | None] = field(
2328
default_factory=lambda: asyncio.Queue()
2429
)
30+
31+
# Per-step and waiter inbound queues: step name / waiter ID -> asyncio.Queue
32+
# Workers consume from these to drive step execution
2533
queues: dict[str, asyncio.Queue[Event]] = field(default_factory=dict)
34+
35+
# Buffers used by collect_events: buffer_id -> fully.qualified.Event -> [Event]
36+
# Enables waiting for specific combinations of events
2637
event_buffers: dict[str, dict[str, list[Event]]] = field(default_factory=dict)
38+
39+
# Step name -> list of input events currently being processed by that step
40+
# Also used to seed queues on deserialization so in-flight work can resume
2741
in_progress: dict[str, list[Event]] = field(default_factory=dict)
42+
43+
# (step_name, input_event_class_name) recorded when a step outputs an event
44+
# Consumed by drawing/visualization utilities to reconstruct edges
2845
accepted_events: list[tuple[str, str]] = field(default_factory=list)
46+
47+
# Append-only log of all events dispatched by the broker in order
2948
broker_log: list[Event] = field(default_factory=list)
49+
50+
# Active waiter IDs to suppress duplicate waiter events (e.g., UI prompts)
3051
waiting_ids: set[str] = field(default_factory=set)
3152

3253
def to_serialized(self, serializer: BaseSerializer) -> SerializedContext:

0 commit comments

Comments
 (0)