diff --git a/apps/backend/agents/session.py b/apps/backend/agents/session.py index 437628dbd9..f7f0dcba0a 100644 --- a/apps/backend/agents/session.py +++ b/apps/backend/agents/session.py @@ -648,6 +648,27 @@ async def run_agent_session( current_tool = None + # Handle rate_limit_event SystemMessage from sdk_patches + elif ( + msg_type == "SystemMessage" + and getattr(msg, "subtype", None) == "rate_limit_event" + ): + debug( + "session", + "Rate limit event received — Claude Code is paused, stream remains open", + ) + print( + "[Rate limit] Claude Code is waiting for rate limit reset...", + flush=True, + ) + if task_logger: + task_logger.log( + "Rate limit reached — waiting for reset (stream remains open)", + LogEntryType.INFO, + phase, + print_to_console=False, + ) + print("\n" + "-" * 70 + "\n") # Check if build is complete diff --git a/apps/backend/core/error_utils.py b/apps/backend/core/error_utils.py index 28c1b155cb..875058f16e 100644 --- a/apps/backend/core/error_utils.py +++ b/apps/backend/core/error_utils.py @@ -56,6 +56,7 @@ def is_rate_limit_error(error: Exception) -> bool: for p in [ "limit reached", "rate limit", + "rate_limit", # Catches "Unknown message type: rate_limit_event" from claude_agent_sdk "too many requests", "usage limit", "quota exceeded", diff --git a/apps/backend/core/sdk_patches.py b/apps/backend/core/sdk_patches.py new file mode 100644 index 0000000000..6b2342f1af --- /dev/null +++ b/apps/backend/core/sdk_patches.py @@ -0,0 +1,56 @@ +"""Runtime patches for third-party SDK bugs.""" + +import logging + +logger = logging.getLogger(__name__) + + +def apply_claude_agent_sdk_patches() -> None: + """Patch claude_agent_sdk to handle rate_limit_event gracefully. + + The bundled SDK raises MessageParseError for unknown message types including + rate_limit_event. This patch returns a SystemMessage instead of raising, + allowing the message loop in session.py to handle it and keep the stream open + while Claude Code waits for the rate limit to reset. + + Patches both: + - message_parser.parse_message (the module attribute) + - _internal.client.parse_message (the already-bound module-level name that + the client's receive_response() generator actually calls via + `from .message_parser import parse_message` at import time) + + Idempotent — safe to call multiple times from different entry points. + """ + try: + from claude_agent_sdk._internal import client as _ic + from claude_agent_sdk._internal import message_parser as _mp + from claude_agent_sdk.types import SystemMessage as _SystemMessage + + if getattr(_mp, "_rate_limit_patched", False): + logger.debug("claude_agent_sdk already patched — skipping") + return + + _orig_parse = _mp.parse_message + + def _patched_parse(data: dict) -> object: + if isinstance(data, dict) and data.get("type") == "rate_limit_event": + logger.warning( + "Rate limit event received from Claude Code — " + "returning as SystemMessage so the stream stays open: %s", + data, + ) + return _SystemMessage(subtype="rate_limit_event", data=data) + return _orig_parse(data) + + _patched_parse.__wrapped__ = _orig_parse # type: ignore[attr-defined] + + _mp.parse_message = _patched_parse + _ic.parse_message = _patched_parse + _mp._rate_limit_patched = True # type: ignore[attr-defined] + logger.debug("claude_agent_sdk patched to handle rate_limit_event") + except Exception: + logger.warning( + "Failed to apply claude_agent_sdk rate_limit_event patch — " + "rate limit events may cause unexpected session failures", + exc_info=True, + ) diff --git a/apps/backend/run.py b/apps/backend/run.py index bd6c95f06d..4427847161 100644 --- a/apps/backend/run.py +++ b/apps/backend/run.py @@ -70,6 +70,11 @@ if "_new_stream" in dir(): del _new_stream +from core.sdk_patches import apply_claude_agent_sdk_patches # pragma: no cover + +apply_claude_agent_sdk_patches() # pragma: no cover + + # Validate platform-specific dependencies BEFORE any imports that might # trigger graphiti_core -> real_ladybug -> pywintypes import chain (ACS-253) from core.dependency_validator import validate_platform_dependencies diff --git a/apps/backend/runners/github/runner.py b/apps/backend/runners/github/runner.py index 0a883a5482..78764a82f9 100644 --- a/apps/backend/runners/github/runner.py +++ b/apps/backend/runners/github/runner.py @@ -62,6 +62,10 @@ validate_platform_dependencies() +from core.sdk_patches import apply_claude_agent_sdk_patches # pragma: no cover + +apply_claude_agent_sdk_patches() # pragma: no cover + # Load .env file with centralized error handling from cli.utils import import_dotenv diff --git a/apps/backend/runners/gitlab/runner.py b/apps/backend/runners/gitlab/runner.py index eb05468543..c34d158897 100644 --- a/apps/backend/runners/gitlab/runner.py +++ b/apps/backend/runners/gitlab/runner.py @@ -32,6 +32,10 @@ validate_platform_dependencies() +from core.sdk_patches import apply_claude_agent_sdk_patches # pragma: no cover + +apply_claude_agent_sdk_patches() # pragma: no cover + # Load .env file with centralized error handling from cli.utils import import_dotenv diff --git a/apps/backend/runners/ideation_runner.py b/apps/backend/runners/ideation_runner.py index 1ec3412aaf..9f64db0041 100644 --- a/apps/backend/runners/ideation_runner.py +++ b/apps/backend/runners/ideation_runner.py @@ -32,6 +32,10 @@ validate_platform_dependencies() +from core.sdk_patches import apply_claude_agent_sdk_patches # pragma: no cover + +apply_claude_agent_sdk_patches() # pragma: no cover + # Load .env file with centralized error handling from cli.utils import import_dotenv diff --git a/apps/backend/runners/insights_runner.py b/apps/backend/runners/insights_runner.py index 891a4d84ed..9fa3d5eeb4 100644 --- a/apps/backend/runners/insights_runner.py +++ b/apps/backend/runners/insights_runner.py @@ -21,6 +21,10 @@ validate_platform_dependencies() +from core.sdk_patches import apply_claude_agent_sdk_patches # pragma: no cover + +apply_claude_agent_sdk_patches() # pragma: no cover + # Load .env file with centralized error handling from cli.utils import import_dotenv diff --git a/apps/backend/runners/roadmap_runner.py b/apps/backend/runners/roadmap_runner.py index 185dcc5f76..2f956a7d17 100644 --- a/apps/backend/runners/roadmap_runner.py +++ b/apps/backend/runners/roadmap_runner.py @@ -27,6 +27,10 @@ validate_platform_dependencies() +from core.sdk_patches import apply_claude_agent_sdk_patches # pragma: no cover + +apply_claude_agent_sdk_patches() # pragma: no cover + # Load .env file with centralized error handling from cli.utils import import_dotenv diff --git a/apps/backend/runners/spec_runner.py b/apps/backend/runners/spec_runner.py index 70d6e755d7..d6c2549ed1 100644 --- a/apps/backend/runners/spec_runner.py +++ b/apps/backend/runners/spec_runner.py @@ -83,6 +83,11 @@ # Add auto-claude to path (parent of runners/) sys.path.insert(0, str(Path(__file__).parent.parent)) + +from core.sdk_patches import apply_claude_agent_sdk_patches # pragma: no cover + +apply_claude_agent_sdk_patches() # pragma: no cover + # Validate platform-specific dependencies BEFORE any imports that might # trigger graphiti_core -> real_ladybug -> pywintypes import chain (ACS-253) from core.dependency_validator import validate_platform_dependencies diff --git a/tests/test_sdk_patches.py b/tests/test_sdk_patches.py new file mode 100644 index 0000000000..c8a80e88e8 --- /dev/null +++ b/tests/test_sdk_patches.py @@ -0,0 +1,212 @@ +#!/usr/bin/env python3 +""" +Tests for core.sdk_patches +============================ + +Covers: +- apply_claude_agent_sdk_patches() happy path +- Idempotency guard (no double-wrapping) +- rate_limit_event returns SystemMessage +- Other unknown message types still raise +- Graceful failure when SDK internals are unavailable +""" + +import sys + +# Ensure backend is on the path +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +sys.path.insert(0, str(Path(__file__).parent.parent / "apps" / "backend")) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_sdk_mocks(): + """Build a minimal fake claude_agent_sdk module tree.""" + + class _FakeMessageParseError(Exception): + pass + + class _FakeSystemMessage: + def __init__(self, subtype: str, data: dict): + self.subtype = subtype + self.data = data + + def _real_parse(data): + msg_type = data.get("type") if isinstance(data, dict) else None + if msg_type == "rate_limit_event": + raise _FakeMessageParseError(f"Unknown message type: {msg_type}") + if msg_type == "assistant": + return MagicMock(type="assistant") + raise _FakeMessageParseError(f"Unknown message type: {msg_type}") + + # Build fake module objects + mp_mod = MagicMock() + mp_mod.parse_message = _real_parse + del mp_mod._rate_limit_patched # ensure sentinel absent at start + + ic_mod = MagicMock() + ic_mod.parse_message = _real_parse + + types_mod = MagicMock() + types_mod.SystemMessage = _FakeSystemMessage + + sdk_mod = MagicMock() + + internal_mod = MagicMock() + internal_mod.message_parser = mp_mod + internal_mod.client = ic_mod + + return sdk_mod, internal_mod, mp_mod, ic_mod, types_mod, _FakeMessageParseError + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def _clean_patch_sentinel(): + """Remove any leftover _rate_limit_patched sentinel between tests.""" + yield + # Remove sdk_patches from sys.modules so each test gets a fresh import + sys.modules.pop("core.sdk_patches", None) + + +# --------------------------------------------------------------------------- +# Tests for apply_claude_agent_sdk_patches +# --------------------------------------------------------------------------- + + +class TestApplyClaudeAgentSdkPatches: + def _build_modules(self): + sdk, internal, mp, ic, types, err = _make_sdk_mocks() + modules = { + "claude_agent_sdk": sdk, + "claude_agent_sdk._internal": internal, + "claude_agent_sdk._internal.message_parser": mp, + "claude_agent_sdk._internal.client": ic, + "claude_agent_sdk.types": types, + } + return modules, mp, ic, types, err + + def test_patches_both_call_sites(self): + """parse_message is replaced on both _mp and _ic after patching.""" + modules, mp, ic, types, _ = self._build_modules() + original = mp.parse_message + + with patch.dict(sys.modules, modules): + from core.sdk_patches import apply_claude_agent_sdk_patches + + apply_claude_agent_sdk_patches() + + assert mp.parse_message is not original + assert ic.parse_message is not original + assert mp.parse_message is ic.parse_message + + def test_rate_limit_event_returns_system_message(self): + """rate_limit_event data is converted to a SystemMessage without raising.""" + modules, mp, ic, types, _ = self._build_modules() + + with patch.dict(sys.modules, modules): + from core.sdk_patches import apply_claude_agent_sdk_patches + + apply_claude_agent_sdk_patches() + + result = ic.parse_message({"type": "rate_limit_event", "retry_after": 30}) + assert type(result).__name__ == "_FakeSystemMessage" + assert result.subtype == "rate_limit_event" + + def test_other_unknown_types_still_raise(self): + """Non-rate-limit unknown message types still propagate MessageParseError.""" + modules, mp, ic, types, FakeErr = self._build_modules() + + with patch.dict(sys.modules, modules): + from core.sdk_patches import apply_claude_agent_sdk_patches + + apply_claude_agent_sdk_patches() + + with pytest.raises(FakeErr, match="Unknown message type: totally_unknown"): + ic.parse_message({"type": "totally_unknown"}) + + def test_valid_messages_pass_through(self): + """Known message types are delegated to the original parser unchanged.""" + modules, mp, ic, types, _ = self._build_modules() + + with patch.dict(sys.modules, modules): + from core.sdk_patches import apply_claude_agent_sdk_patches + + apply_claude_agent_sdk_patches() + + result = ic.parse_message({"type": "assistant"}) + assert result is not None + + def test_idempotent_second_call_skips_rewrap(self): + """Calling apply_claude_agent_sdk_patches twice does not double-wrap.""" + modules, mp, ic, types, _ = self._build_modules() + + with patch.dict(sys.modules, modules): + from core.sdk_patches import apply_claude_agent_sdk_patches + + apply_claude_agent_sdk_patches() + patched_once = ic.parse_message + apply_claude_agent_sdk_patches() + patched_twice = ic.parse_message + + assert patched_once is patched_twice + + def test_sets_rate_limit_patched_sentinel(self): + """_rate_limit_patched sentinel is set on the message_parser module.""" + modules, mp, ic, types, _ = self._build_modules() + + with patch.dict(sys.modules, modules): + from core.sdk_patches import apply_claude_agent_sdk_patches + + apply_claude_agent_sdk_patches() + + assert getattr(mp, "_rate_limit_patched", False) is True + + def test_wrapped_attribute_preserves_original(self): + """__wrapped__ on the patched function points to the original parser.""" + modules, mp, ic, types, _ = self._build_modules() + original = mp.parse_message + + with patch.dict(sys.modules, modules): + from core.sdk_patches import apply_claude_agent_sdk_patches + + apply_claude_agent_sdk_patches() + + assert mp.parse_message.__wrapped__ is original + + def test_graceful_failure_when_sdk_unavailable(self, caplog): + """Patch fails silently with a warning when the SDK is not installed.""" + import logging + + broken = {"claude_agent_sdk._internal": None} + with patch.dict(sys.modules, broken): + sys.modules.pop("core.sdk_patches", None) + with caplog.at_level(logging.WARNING, logger="core.sdk_patches"): + from core.sdk_patches import apply_claude_agent_sdk_patches + + apply_claude_agent_sdk_patches() + + assert any("Failed to apply" in r.message for r in caplog.records) + + def test_non_dict_data_passes_to_original(self): + """Non-dict data skips the rate_limit check and goes to original parser.""" + modules, mp, ic, types, FakeErr = self._build_modules() + + with patch.dict(sys.modules, modules): + from core.sdk_patches import apply_claude_agent_sdk_patches + + apply_claude_agent_sdk_patches() + + # Original parser raises for non-dict / unknown input + with pytest.raises(FakeErr): + ic.parse_message("not-a-dict") diff --git a/tests/test_session_rate_limit.py b/tests/test_session_rate_limit.py new file mode 100644 index 0000000000..7c0b3c63f8 --- /dev/null +++ b/tests/test_session_rate_limit.py @@ -0,0 +1,150 @@ +#!/usr/bin/env python3 +""" +Tests for rate_limit_event handling in agents/session.py +========================================================== + +Covers the SystemMessage(subtype="rate_limit_event") branch added to +run_agent_session's message loop to keep the stream open when Claude Code +is temporarily rate-limited. +""" + +import sys +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +sys.path.insert(0, str(Path(__file__).parent.parent / "apps" / "backend")) + + +# --------------------------------------------------------------------------- +# Minimal fakes matching the types session.py checks by name +# --------------------------------------------------------------------------- + + +class SystemMessage: + """Fake with the exact name session.py checks via type(msg).__name__.""" + + def __init__(self, subtype: str, data: dict): + self.subtype = subtype + self.data = data + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_client(messages: list) -> MagicMock: + """Return a mock ClaudeSDKClient whose receive_response yields *messages*.""" + + async def _gen(): + for m in messages: + yield m + + client = MagicMock() + client.query = AsyncMock() + client.receive_response = _gen + return client + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class TestSessionRateLimitHandler: + """run_agent_session handles rate_limit_event SystemMessages gracefully.""" + + @pytest.fixture(autouse=True) + def _patch_deps(self, tmp_path): + """Patch heavy dependencies so the session loop runs in isolation. + + self.task_logger defaults to None; set it to a MagicMock in a test to + exercise the logger branch without fighting stacked patch() contexts. + """ + self.spec_dir = tmp_path / "spec" + self.spec_dir.mkdir() + self.task_logger = None # tests can override before calling the session + + def _get_logger(*_args, **_kwargs): + return self.task_logger + + patches = [ + patch("agents.session.get_task_logger", side_effect=_get_logger), + patch("agents.session.is_build_complete", return_value=False), + patch("agents.session.debug"), + patch("agents.session.debug_success"), + patch("agents.session.debug_detailed"), + patch("agents.session.debug_section"), + patch("agents.session.debug_error"), + ] + for p in patches: + p.start() + yield + for p in patches: + p.stop() + + @pytest.mark.asyncio + async def test_rate_limit_event_does_not_raise(self): + """A rate_limit_event SystemMessage is absorbed without raising.""" + msg = SystemMessage(subtype="rate_limit_event", data={"retry_after": 5}) + client = _make_client([msg]) + + from agents.session import run_agent_session + + status, _, _ = await run_agent_session(client, "test prompt", self.spec_dir) + assert status in ("continue", "complete"), f"Expected session to continue, got '{status}'" + + @pytest.mark.asyncio + async def test_rate_limit_event_logs_to_task_logger(self): + """When a task_logger is present, the rate limit event is logged.""" + self.task_logger = MagicMock() + + msg = SystemMessage(subtype="rate_limit_event", data={"retry_after": 10}) + client = _make_client([msg]) + + from agents.session import run_agent_session + + await run_agent_session(client, "test prompt", self.spec_dir) + + self.task_logger.log.assert_called_once() + logged_message = self.task_logger.log.call_args[0][0] + assert "rate limit" in logged_message.lower() + + @pytest.mark.asyncio + async def test_non_rate_limit_system_message_ignored(self): + """SystemMessages with other subtypes are silently skipped.""" + msg = SystemMessage(subtype="some_other_event", data={}) + client = _make_client([msg]) + + from agents.session import run_agent_session + + # Should not raise + status, _, _ = await run_agent_session(client, "test prompt", self.spec_dir) + assert status in ("continue", "complete"), f"Expected session to continue, got '{status}'" + + @pytest.mark.asyncio + async def test_stream_continues_after_rate_limit_event(self): + """Messages after the rate_limit_event are still processed.""" + rate_limit_msg = SystemMessage( + subtype="rate_limit_event", data={"retry_after": 1} + ) + # Follow with a plain assistant message to confirm loop continues + text_block = MagicMock() + text_block.__class__.__name__ = "TextBlock" + type(text_block).__name__ = "TextBlock" + text_block.text = "hello" + + assistant_msg = MagicMock() + type(assistant_msg).__name__ = "AssistantMessage" + assistant_msg.content = [text_block] + + client = _make_client([rate_limit_msg, assistant_msg]) + + from agents.session import run_agent_session + + status, response_text, _ = await run_agent_session( + client, "test prompt", self.spec_dir + ) + assert "hello" in response_text