Skip to content

Bug: GraphAgent state corruption on execution error #103

@tdan1

Description

@tdan1

Priority: Critical
Component: spoon_ai/agents/graph_agent.py

Description
The GraphAgent class has critical state corruption issues during graph execution failures. When errors occur during graph execution, the agent's internal state (memory, preserved state, execution metadata) becomes corrupted and inconsistent, leading to unpredictable behavior in subsequent operations.

Root Cause Analysis
Located in spoon_ai/agents/graph_agent.py:

1. Incomplete Memory Restoration (lines 195-204):

# Reset memory to original state to prevent corruption
try:
    self.memory.clear()
    for msg in original_messages:
        self.memory.add_message(msg)  # No validation of message integrity

2. Unsafe Preserved State Handling (lines 85-90)

if self.preserve_state:
    self._last_state = self._sanitize_preserved_state(final_state)
# No validation that final_state isn't corrupted before sanitization

3. Unbounded Error Metadata (line 186):

self.execution_metadata = {
    "error": str(error),  # Could contain sensitive data
    "error_type": type(error).__name__,
    # No size limits or cleanup mechanism
}

4. Race Conditions: No synchronization for concurrent state access during error handling.

Steps to Reproduce

Create a GraphAgent with preserve_state=True:
from spoon_ai.agents.graph_agent import GraphAgent
from spoon_ai.graph import StateGraph

# Create a graph that will fail mid-execution
graph = StateGraph()
graph.add_node("start", lambda x: x)
graph.add_node("fail", lambda x: 1/0)  # Division by zero
graph.add_edge("start", "fail")

agent = GraphAgent(
    name="test_agent",
    graph=graph,
    preserve_state=True
)

Run the agent with a request:

try:
    result = await agent.run("test request")
except Exception as e:
    print(f"Error: {e}")

Inspect agent state after error:

print(f"Messages count: {len(agent.memory.get_messages())}")
print(f"Preserved state: {hasattr(agent, '_last_state')}")
print(f"Execution metadata: {agent.execution_metadata}")

Try to run the agent again:

# This may fail or produce incorrect results
result2 = await agent.run("second request")

Expected Behavior

Agent state should be atomically restored to pre-execution state on any error
Memory should contain only validated, uncorrupted messages
Preserved state should be cleared if corruption is detected
Execution metadata should have bounded size and no sensitive data
Subsequent runs should work correctly after error recovery
Concurrent access should be properly synchronized

Actual Behavior

Memory Corruption: Messages may be partially restored or contain invalid data
State Inconsistency: _last_state may contain corrupted data from failed execution
Memory Leaks: Execution metadata grows unbounded with error details
Race Conditions: Concurrent operations can corrupt state during error handling
Cascade Failures: Subsequent runs fail due to corrupted internal state

Error Examples:

GraphAgent 'test_agent' memory restored after error
WARNING - GraphAgent 'test_agent' had invalid preserved state type: <class 'NoneType'>
ERROR - Unexpected error during graph execution for agent 'test_agent': 'NoneType' object is not subscriptable

Impact Assessment

Reliability: High - Agent becomes unusable after first error
Data Integrity: High - Conversation history may be corrupted
Performance: Medium - Memory leaks degrade performance over time
Security: Medium - Error metadata may contain sensitive information

Phase 1: Add State Synchronization

class GraphAgent(BaseAgent):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self._state_lock = asyncio.Lock()
        self._max_metadata_size = 1024  # Configurable limit
    
    async def run(self, request: Optional[str] = None) -> str:
        async with self._state_lock:  # Synchronize all state operations
            # ... existing implementation

Phase 2: Atomic State Management

def _handle_execution_error(self, error: Exception, original_messages: list):
    """Enhanced error handling with atomic rollback."""
    async with self._state_lock:
        try:
            # Create checkpoint before restoration
            checkpoint = {
                'messages': original_messages.copy(),
                'current_step': 0,
                'state': AgentState.IDLE
            }
            
            # Atomic restoration with validation
            self._restore_from_checkpoint(checkpoint)
            
            # Bounded error metadata
            error_str = str(error)[:500]  # Truncate long errors
            self.execution_metadata = {
                "error": error_str,
                "error_type": type(error).__name__,
                "execution_successful": False,
                "execution_time": time.time(),
                "recovery_attempted": True
            }
            
        except Exception as restore_error:
            logger.critical(f"State corruption detected in {self.name}: {restore_error}")
            # Emergency reset to safe state
            self._emergency_reset()
            
    # Clear any corrupted preserved state
    self._safe_clear_preserved_state()

Phase 3: Message Validation

def _validate_message(self, msg) -> bool:
    """Validate message integrity before restoration."""
    if not hasattr(msg, 'role') or not hasattr(msg, 'content'):
        return False
    if msg.role not in ['user', 'assistant', 'tool']:
        return False
    if not isinstance(msg.content, (str, type(None))):
        return False
    return True

def _restore_from_checkpoint(self, checkpoint: Dict[str, Any]):
    """Atomically restore agent state from checkpoint."""
    self.memory.clear()
    
    # Validate and restore messages
    valid_messages = []
    for msg in checkpoint['messages']:
        if self._validate_message(msg):
            valid_messages.append(msg)
        else:
            logger.warning(f"Skipping corrupted message during restoration: {type(msg)}")
    
    # Batch restore all valid messages
    for msg in valid_messages:
        self.memory.add_message(msg)
    
    # Restore other state
    self.current_step = checkpoint.get('current_step', 0)
    self.state = checkpoint.get('state', AgentState.IDLE)

Testing Requirements
Unit Tests

Test memory restoration with corrupted messages
Test preserved state corruption handling
Test metadata size limits
Test concurrent error scenarios
Test emergency reset functionality

Integration Tests

Test end-to-end error recovery flows
Test agent reusability after errors
Test state consistency across multiple runs
Test memory leak prevention

Performance Tests

Benchmark error recovery overhead
Test memory usage under repeated errors
Validate synchronization performance impact

Documentation Updates Required

Update GraphAgent docstrings with error handling details
Add state management best practices guide
Document configuration options for error recovery
Add troubleshooting section for state corruption issues

Breaking Changes
None - This is a bug fix that maintains API compatibility.

Migration Guide
No migration required. Existing code will automatically benefit from improved reliability.

Definition of Done

All identified race conditions eliminated with proper synchronization
Memory restoration is atomic and validates message integrity
Preserved state corruption is detected and safely handled

Comms : https://t.me/fastbuild01

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions