-
Notifications
You must be signed in to change notification settings - Fork 120
Description
Description:
In event_driven=True mode, the TaskEventRunner fails to correctly identify the completion of a task and aggregate the final answer if the agent producing the final output is not designated as the root_agent (or communicate_agent) in a TEAM or WORKFLOW topology.
This results in two failure modes:
- Hanging: The runner waits indefinitely for new events because no
TopicType.FINISHEDmessage is ever emitted. - Empty Answer: If the swarm is interrupted by
max_stepsor a timeout, theTaskResponsecontains an emptyanswerstring, even though the final result exists in thecontext.trajectories.
Root Cause Analysis:
The issue lies in agent.py. In the _team_stop_check method, the framework only emits a FINISHED topic if the agent sending the message is the root_agent:
# aworld/runners/handler/agent.py
async def _team_stop_check(self, action: ActionModel, message: Message):
# ...
if ((not caller or caller == self.swarm.communicate_agent.id())
and (self.swarm.cur_step >= self.swarm.max_steps or self.swarm.finished or
(agent.id() == self.swarm.agent_graph.root_agent.id() and agent.finished))):
yield Message(..., topic=TopicType.FINISHED) # Only triggered by root_agentIn many common topologies (e.g., Specialists → Orchestrator), the root_agent is the entry point (Specialist), but the logical exit point is the Orchestrator. Since the Orchestrator is not the root_agent, its output is treated as a standard AgentMessage and sent back to the caller, never triggering the task completion logic.
Reproduction Steps:
- Create a reproduction script
reproduce_issue.py:
import asyncio
from typing import List, Dict, Any
from aworld.core.agent.base import BaseAgent
from aworld.core.agent.swarm import Swarm, Team
from aworld.core.common import ActionModel, Observation
from aworld.core.event.base import Message
from aworld.runner import Runners
class SimpleAgent(BaseAgent):
def policy(self, observation, info=None, message=None, **kwargs):
# Simulate an agent that just returns a string
return [ActionModel(policy_info=f"Result from {self.name()}")]
async def reproduce():
specialist = SimpleAgent(name="Specialist")
orchestrator = SimpleAgent(name="Orchestrator")
# Topology: Specialist -> Orchestrator
# Specialist is the root_agent (entry point)
swarm = Team(
topology=[(specialist, orchestrator)],
root_agent=specialist,
event_driven=True
)
print("Running event-driven swarm...")
try:
# This will likely hang because Orchestrator is not the root_agent
response = await asyncio.wait_for(
Runners.run(input="Start Task", swarm=swarm),
timeout=5.0
)
print(f"Success! Answer: {response.answer}")
except asyncio.TimeoutError:
print("FAILED: Swarm hung indefinitely!")
if __name__ == "__main__":
asyncio.run(reproduce())- Run the script.
- Observed Behavior: The script hangs and hits the 5-second timeout. If
max_stepsis reached instead, theresponse.answeris"". - Expected Behavior: The framework should recognize that the
Orchestratorhas no further successors in the graph and automatically aggregate its output as the finalTaskResponse.answer.