Skip to content

Commit 5eb8ec0

Browse files
committed
nicen it up
1 parent 2cb45e5 commit 5eb8ec0

File tree

6 files changed

+30
-17
lines changed

6 files changed

+30
-17
lines changed

src/workflows/handler.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,4 +130,5 @@ async def cancel_run(self) -> None:
130130
"""
131131
if self.ctx:
132132
self.ctx._internal_cancel_run()
133-
await asyncio.sleep(0)
133+
if self._run_task is not None:
134+
await self._run_task

src/workflows/runtime/broker.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ async def _run_workflow() -> None:
260260
self._state.is_running = False
261261

262262
if exception_raised:
263+
print("exception_raised", exception_raised)
263264
# cancel the stream
264265
self.write_event_to_stream(StopEvent())
265266

src/workflows/server/representation_utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ def _extract_workflow_structure(
9898
# First pass: Add all nodes
9999
for step_name, step_func in steps.items():
100100
step_config = step_func._step_config
101-
101+
102102
# Add step node
103103
step_label = (
104104
_truncate_label(step_name, max_label_length)
@@ -195,7 +195,7 @@ def _extract_workflow_structure(
195195
# Second pass: Add edges
196196
for step_name, step_func in steps.items():
197197
step_config = step_func._step_config
198-
198+
199199
# Edges from steps to return types
200200
for return_type in step_config.return_types:
201201
if return_type is not type(None):

src/workflows/workflow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,7 @@ def _validate(self) -> bool:
352352
steps_accepting_stop_event: list[str] = []
353353

354354
for name, step_func in self._get_steps().items():
355-
step_config: StepConfig = step_func._step_config
355+
step_config: StepConfig = step_func._step_config
356356

357357
# Check that no user-defined step accepts StopEvent (only _done step should)
358358
if name != "_done":

tests/test_handler.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@
1010
from workflows.handler import WorkflowHandler
1111

1212

13-
def test_str() -> None:
13+
@pytest.mark.asyncio
14+
async def test_str() -> None:
1415
h = WorkflowHandler()
1516
h.set_result([])
1617
assert str(h) == "[]"

tests/test_workflow.py

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ async def slow_step(self, ev: StartEvent) -> StopEvent:
7575
return StopEvent(result="Done")
7676

7777
with pytest.raises(WorkflowTimeoutError):
78-
await WorkflowTestRunner(SlowWorkflow(timeout=1)).run()
78+
await WorkflowTestRunner(SlowWorkflow(timeout=0.1)).run()
7979

8080

8181
@pytest.mark.asyncio
@@ -442,21 +442,31 @@ async def step(self, ctx: Context, ev: StartEvent) -> StopEvent:
442442

443443
@pytest.mark.asyncio
444444
async def test_workflow_context_to_dict(workflow: Workflow) -> None:
445-
handler = workflow.run()
446-
ctx = handler.ctx
445+
ctx: Union[Context, None] = None
446+
new_ctx: Union[Context, None] = None
447+
try:
448+
handler = workflow.run()
449+
ctx = handler.ctx
447450

448-
ctx.send_event(EventWithName(name="test")) # type:ignore
451+
ctx.send_event(EventWithName(name="test")) # type:ignore
449452

450-
# get the context dict
451-
data = ctx.to_dict() # type:ignore
453+
# get the context dict
454+
data = ctx.to_dict() # type:ignore
452455

453-
# finish workflow
454-
await handler
456+
# finish workflow
457+
await handler
455458

456-
new_ctx = Context.from_dict(workflow, data)
457-
new_ctx._init_broker(workflow)
458-
assert new_ctx._broker_run is not None
459-
assert new_ctx._broker_run._state.queues["start_step"].get_nowait().name == "test"
459+
new_ctx = Context.from_dict(workflow, data)
460+
new_ctx._init_broker(workflow)
461+
assert new_ctx._broker_run is not None
462+
assert (
463+
new_ctx._broker_run._state.queues["start_step"].get_nowait().name == "test"
464+
)
465+
finally:
466+
if ctx is not None and ctx._broker_run is not None:
467+
await ctx._broker_run.shutdown()
468+
if new_ctx is not None and new_ctx._broker_run is not None:
469+
await new_ctx._broker_run.shutdown()
460470

461471

462472
class HumanInTheLoopWorkflow(Workflow):

0 commit comments

Comments
 (0)