Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 33 additions & 24 deletions core/framework/graph/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import asyncio
import logging
import threading
from collections.abc import Callable
from dataclasses import dataclass, field
from pathlib import Path
Expand Down Expand Up @@ -230,6 +231,9 @@ def __init__(
# Track the currently executing node for external injection routing
self.current_node_id: str | None = None

# Serialize read-modify-write cycles on state.json
self._progress_lock = threading.Lock()

def _write_progress(
self,
current_node: str,
Expand All @@ -244,6 +248,10 @@ def _write_progress(
state.json as the single source of truth — readers always see
current progress, not stale initial values.

The entire read-modify-write cycle is serialized with a threading
lock to prevent concurrent fan-out branches from clobbering each
other's progress updates.

The write is synchronous and best-effort: never blocks execution.
"""
if not self._storage_path:
Expand All @@ -253,30 +261,31 @@ def _write_progress(
import json as _json
from datetime import datetime

if state_path.exists():
state_data = _json.loads(state_path.read_text(encoding="utf-8"))
else:
state_data = {}

# Patch progress fields
progress = state_data.setdefault("progress", {})
progress["current_node"] = current_node
progress["path"] = list(path)
progress["node_visit_counts"] = dict(node_visit_counts)
progress["steps_executed"] = len(path)

# Update timestamp
timestamps = state_data.setdefault("timestamps", {})
timestamps["updated_at"] = datetime.now().isoformat()

# Persist full memory so state.json is sufficient for resume
# even if the process dies before the final write.
memory_snapshot = memory.read_all()
state_data["memory"] = memory_snapshot
state_data["memory_keys"] = list(memory_snapshot.keys())

with atomic_write(state_path, encoding="utf-8") as f:
_json.dump(state_data, f, indent=2)
with self._progress_lock:
if state_path.exists():
state_data = _json.loads(state_path.read_text(encoding="utf-8"))
else:
state_data = {}

# Patch progress fields
progress = state_data.setdefault("progress", {})
progress["current_node"] = current_node
progress["path"] = list(path)
progress["node_visit_counts"] = dict(node_visit_counts)
progress["steps_executed"] = len(path)

# Update timestamp
timestamps = state_data.setdefault("timestamps", {})
timestamps["updated_at"] = datetime.now().isoformat()

# Persist full memory so state.json is sufficient for resume
# even if the process dies before the final write.
memory_snapshot = memory.read_all()
state_data["memory"] = memory_snapshot
state_data["memory_keys"] = list(memory_snapshot.keys())

with atomic_write(state_path, encoding="utf-8") as f:
_json.dump(state_data, f, indent=2)
except Exception:
logger.warning(
"Failed to persist progress state to %s",
Expand Down
Loading
Loading