Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 23 additions & 18 deletions core/framework/graph/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,8 @@ async def execute(
path: list[str] = []
total_tokens = 0
total_latency = 0
node_retry_counts: dict[str, int] = {} # Track retries per node
node_retry_counts: dict[str, int] = {} # Per-visit retry budget (reset on fresh visits)
node_retry_totals: dict[str, int] = {} # Cumulative retry counts (never reset)
node_visit_counts: dict[str, int] = {} # Track visits for feedback loops
_is_retry = False # True when looping back for a retry (not a new visit)

Expand Down Expand Up @@ -821,6 +822,7 @@ async def execute(
if not _is_retry:
cnt = node_visit_counts.get(current_node_id, 0) + 1
node_visit_counts[current_node_id] = cnt
node_retry_counts[current_node_id] = 0 # fresh visit = fresh retry budget
_is_retry = False
max_visits = getattr(node_spec, "max_node_visits", 0)
if max_visits > 0 and node_visit_counts[current_node_id] > max_visits:
Expand Down Expand Up @@ -946,7 +948,7 @@ async def execute(
current_node=node_spec.id,
execution_path=list(path),
memory=memory,
is_clean=(sum(node_retry_counts.values()) == 0),
is_clean=(sum(node_retry_totals.values()) == 0),
)

if checkpoint_config.async_checkpoint:
Expand Down Expand Up @@ -1080,6 +1082,9 @@ async def execute(
node_retry_counts[current_node_id] = (
node_retry_counts.get(current_node_id, 0) + 1
)
node_retry_totals[current_node_id] = (
node_retry_totals.get(current_node_id, 0) + 1
)

# [CORRECTED] Use node_spec.max_retries instead of hardcoded 3
max_retries = getattr(node_spec, "max_retries", 3)
Expand Down Expand Up @@ -1166,8 +1171,8 @@ async def execute(
)

# Calculate quality metrics
total_retries_count = sum(node_retry_counts.values())
nodes_failed = list(node_retry_counts.keys())
total_retries_count = sum(node_retry_totals.values())
nodes_failed = list(node_retry_totals.keys())

if self.runtime_logger:
await self.runtime_logger.end_run(
Expand Down Expand Up @@ -1199,7 +1204,7 @@ async def execute(
path=path,
total_retries=total_retries_count,
nodes_with_failures=nodes_failed,
retry_details=dict(node_retry_counts),
retry_details=dict(node_retry_totals),
had_partial_failures=len(nodes_failed) > 0,
execution_quality="failed",
node_visit_counts=dict(node_visit_counts),
Expand Down Expand Up @@ -1237,8 +1242,8 @@ async def execute(
)

# Calculate quality metrics
total_retries_count = sum(node_retry_counts.values())
nodes_failed = [nid for nid, count in node_retry_counts.items() if count > 0]
total_retries_count = sum(node_retry_totals.values())
nodes_failed = [nid for nid, count in node_retry_totals.items() if count > 0]
exec_quality = "degraded" if total_retries_count > 0 else "clean"

if self.runtime_logger:
Expand All @@ -1260,7 +1265,7 @@ async def execute(
session_state=session_state_out,
total_retries=total_retries_count,
nodes_with_failures=nodes_failed,
retry_details=dict(node_retry_counts),
retry_details=dict(node_retry_totals),
had_partial_failures=len(nodes_failed) > 0,
execution_quality=exec_quality,
node_visit_counts=dict(node_visit_counts),
Expand Down Expand Up @@ -1387,7 +1392,7 @@ async def execute(
execution_path=list(path),
memory=memory,
next_node=next_node,
is_clean=(sum(node_retry_counts.values()) == 0),
is_clean=(sum(node_retry_totals.values()) == 0),
)

if checkpoint_config.async_checkpoint:
Expand Down Expand Up @@ -1577,8 +1582,8 @@ async def execute(
self.logger.info(f" Total latency: {total_latency}ms")

# Calculate execution quality metrics
total_retries_count = sum(node_retry_counts.values())
nodes_failed = [nid for nid, count in node_retry_counts.items() if count > 0]
total_retries_count = sum(node_retry_totals.values())
nodes_failed = [nid for nid, count in node_retry_totals.items() if count > 0]
exec_quality = "degraded" if total_retries_count > 0 else "clean"

# Update narrative to reflect execution quality
Expand Down Expand Up @@ -1613,7 +1618,7 @@ async def execute(
path=path,
total_retries=total_retries_count,
nodes_with_failures=nodes_failed,
retry_details=dict(node_retry_counts),
retry_details=dict(node_retry_totals),
had_partial_failures=len(nodes_failed) > 0,
execution_quality=exec_quality,
node_visit_counts=dict(node_visit_counts),
Expand Down Expand Up @@ -1665,8 +1670,8 @@ async def execute(
}

# Calculate quality metrics
total_retries_count = sum(node_retry_counts.values())
nodes_failed = [nid for nid, count in node_retry_counts.items() if count > 0]
total_retries_count = sum(node_retry_totals.values())
nodes_failed = [nid for nid, count in node_retry_totals.items() if count > 0]
exec_quality = "degraded" if total_retries_count > 0 else "clean"

if self.runtime_logger:
Expand All @@ -1690,7 +1695,7 @@ async def execute(
session_state=session_state_out,
total_retries=total_retries_count,
nodes_with_failures=nodes_failed,
retry_details=dict(node_retry_counts),
retry_details=dict(node_retry_totals),
had_partial_failures=len(nodes_failed) > 0,
execution_quality=exec_quality,
node_visit_counts=dict(node_visit_counts),
Expand Down Expand Up @@ -1722,8 +1727,8 @@ async def execute(
)

# Calculate quality metrics even for exceptions
total_retries_count = sum(node_retry_counts.values())
nodes_failed = list(node_retry_counts.keys())
total_retries_count = sum(node_retry_totals.values())
nodes_failed = list(node_retry_totals.keys())

if self.runtime_logger:
await self.runtime_logger.end_run(
Expand Down Expand Up @@ -1789,7 +1794,7 @@ async def execute(
path=path,
total_retries=total_retries_count,
nodes_with_failures=nodes_failed,
retry_details=dict(node_retry_counts),
retry_details=dict(node_retry_totals),
had_partial_failures=len(nodes_failed) > 0,
execution_quality="failed",
node_visit_counts=dict(node_visit_counts),
Expand Down
Loading
Loading