diff --git a/core/framework/config.py b/core/framework/config.py index 236f9be69a..5ad69e551b 100644 --- a/core/framework/config.py +++ b/core/framework/config.py @@ -13,6 +13,7 @@ from typing import Any from framework.graph.edge import DEFAULT_MAX_TOKENS +from framework.llm.codex_backend import CODEX_API_BASE, build_codex_litellm_kwargs # --------------------------------------------------------------------------- # Low-level config file access @@ -125,7 +126,6 @@ def get_worker_api_key() -> str | None: return token except ImportError: pass - api_key_env_var = worker_llm.get("api_key_env_var") if api_key_env_var: return os.environ.get(api_key_env_var) @@ -141,7 +141,7 @@ def get_worker_api_base() -> str | None: return get_api_base() if worker_llm.get("use_codex_subscription"): - return "https://chatgpt.com/backend-api/codex" + return CODEX_API_BASE if worker_llm.get("use_kimi_code_subscription"): return "https://api.kimi.com/coding" if worker_llm.get("use_antigravity_subscription"): @@ -169,23 +169,14 @@ def get_worker_llm_extra_kwargs() -> dict[str, Any]: if worker_llm.get("use_codex_subscription"): api_key = get_worker_api_key() if api_key: - headers: dict[str, str] = { - "Authorization": f"Bearer {api_key}", - "User-Agent": "CodexBar", - } + account_id = None try: from framework.runner.runner import get_codex_account_id account_id = get_codex_account_id() - if account_id: - headers["ChatGPT-Account-Id"] = account_id except ImportError: pass - return { - "extra_headers": headers, - "store": False, - "allowed_openai_params": ["store"], - } + return build_codex_litellm_kwargs(api_key, account_id=account_id) return {} @@ -274,7 +265,6 @@ def get_api_key() -> str | None: return token except ImportError: pass - # Standard env-var path (covers ZAI Code and all API-key providers) api_key_env_var = llm.get("api_key_env_var") if api_key_env_var: @@ -380,7 +370,7 @@ def get_api_base() -> str | None: llm = get_hive_config().get("llm", {}) if llm.get("use_codex_subscription"): # Codex subscription routes through the ChatGPT backend, not api.openai.com. - return "https://chatgpt.com/backend-api/codex" + return CODEX_API_BASE if llm.get("use_kimi_code_subscription"): # Kimi Code uses an Anthropic-compatible endpoint (no /v1 suffix). return "https://api.kimi.com/coding" @@ -415,23 +405,14 @@ def get_llm_extra_kwargs() -> dict[str, Any]: if llm.get("use_codex_subscription"): api_key = get_api_key() if api_key: - headers: dict[str, str] = { - "Authorization": f"Bearer {api_key}", - "User-Agent": "CodexBar", - } + account_id = None try: from framework.runner.runner import get_codex_account_id account_id = get_codex_account_id() - if account_id: - headers["ChatGPT-Account-Id"] = account_id except ImportError: pass - return { - "extra_headers": headers, - "store": False, - "allowed_openai_params": ["store"], - } + return build_codex_litellm_kwargs(api_key, account_id=account_id) return {} diff --git a/core/framework/llm/codex_adapter.py b/core/framework/llm/codex_adapter.py new file mode 100644 index 0000000000..8ac1c9bb53 --- /dev/null +++ b/core/framework/llm/codex_adapter.py @@ -0,0 +1,255 @@ +"""Codex adapter for Hive's LiteLLM provider. + +Codex CLI is tool-first and event-structured: tool invocations and tool results +are emitted as explicit response items, not as plain-text workflow narration. +This adapter keeps the ChatGPT Codex backend aligned with Hive's normal +provider contract by normalizing Codex request shaping and response recovery at +the provider boundary. +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Any + +from framework.llm.codex_backend import ( + build_codex_extra_headers, + is_codex_api_base, + merge_codex_allowed_openai_params, + normalize_codex_api_base, +) +from framework.llm.provider import Tool + +if TYPE_CHECKING: + from collections.abc import Callable + + from framework.llm.litellm import LiteLLMProvider + from framework.llm.stream_events import StreamEvent + +logger = logging.getLogger(__name__) + +_CODEX_CRITICAL_TOOL_NAMES = frozenset( + { + "ask_user", + "ask_user_multiple", + "set_output", + "escalate", + "save_agent_draft", + "confirm_and_build", + "initialize_and_build_agent", + } +) +_CODEX_SYSTEM_CHUNK_CHARS = 3500 +_CODEX_SYSTEM_PREAMBLE = """# Codex Execution Contract +Follow the system sections below in order. +- Obey every CRITICAL, MUST, NEVER, and ONLY instruction exactly. +- When tools are available, emit structured tool calls instead of replying with plain-text promises. +- Do not skip required workflow boundaries or approval gates. +""" + + +class CodexResponsesAdapter: + """Normalize the ChatGPT Codex backend to Hive's standard provider semantics.""" + + def __init__(self, provider: LiteLLMProvider): + self._provider = provider + + @property + def enabled(self) -> bool: + """Return True when the provider targets the ChatGPT Codex backend.""" + return is_codex_api_base(self._provider.api_base) + + def chunk_system_prompt(self, system: str) -> list[str]: + """Break large system prompts into smaller Codex-friendly chunks.""" + normalized = system.replace("\r\n", "\n").strip() + if not normalized: + return [] + + sections: list[str] = [] + current: list[str] = [] + for line in normalized.splitlines(): + if line.startswith("#") and current: + sections.append("\n".join(current).strip()) + current = [line] + else: + current.append(line) + if current: + sections.append("\n".join(current).strip()) + + chunks: list[str] = [] + for section in sections: + if len(section) <= _CODEX_SYSTEM_CHUNK_CHARS: + chunks.append(section) + continue + + paragraphs = [ + paragraph.strip() for paragraph in section.split("\n\n") if paragraph.strip() + ] + current_chunk = "" + for paragraph in paragraphs: + candidate = paragraph if not current_chunk else f"{current_chunk}\n\n{paragraph}" + if current_chunk and len(candidate) > _CODEX_SYSTEM_CHUNK_CHARS: + chunks.append(current_chunk) + current_chunk = paragraph + else: + current_chunk = candidate + if current_chunk: + chunks.append(current_chunk) + + return chunks or [normalized] + + def build_system_messages( + self, + system: str, + *, + json_mode: bool, + ) -> list[dict[str, Any]]: + """Build Codex system messages in the tool-first format Codex CLI expects.""" + system_messages: list[dict[str, Any]] = [] + if system: + chunks = self.chunk_system_prompt(system) + if len(chunks) > 1 or len(chunks[0]) > _CODEX_SYSTEM_CHUNK_CHARS: + system_messages.append({"role": "system", "content": _CODEX_SYSTEM_PREAMBLE}) + for chunk in chunks: + system_messages.append({"role": "system", "content": chunk}) + else: + system_messages.append({"role": "system", "content": "You are a helpful assistant."}) + + if json_mode: + system_messages.append( + {"role": "system", "content": "Please respond with a valid JSON object."} + ) + return system_messages + + def derive_tool_choice( + self, + messages: list[dict[str, Any]], + tools: list[Tool] | None, + ) -> str | dict[str, Any] | None: + """Force structured tool use when Codex sees critical framework tools.""" + if not tools: + return None + + tool_names = {tool.name for tool in tools} + if not (tool_names & _CODEX_CRITICAL_TOOL_NAMES): + return None + + last_role = next( + (m.get("role") for m in reversed(messages) if m.get("role") != "system"), + None, + ) + if last_role == "assistant": + return None + return "required" + + def harden_request_kwargs(self, kwargs: dict[str, Any]) -> dict[str, Any]: + """Strip unsupported params and inject the Codex backend headers.""" + cleaned = dict(kwargs) + cleaned["api_base"] = normalize_codex_api_base( + cleaned.get("api_base") or self._provider.api_base + ) + cleaned.setdefault("store", False) + cleaned["allowed_openai_params"] = merge_codex_allowed_openai_params( + cleaned.get("allowed_openai_params") + ) + cleaned.pop("max_tokens", None) + cleaned.pop("stream_options", None) + + extra_headers = dict(cleaned.get("extra_headers") or {}) + if "ChatGPT-Account-Id" not in extra_headers: + try: + from framework.runner.runner import get_codex_account_id + + account_id = get_codex_account_id() + if account_id: + extra_headers["ChatGPT-Account-Id"] = account_id + except Exception: + logger.debug("Could not populate ChatGPT-Account-Id", exc_info=True) + + cleaned["extra_headers"] = build_codex_extra_headers( + self._provider.api_key, + account_id=extra_headers.get("ChatGPT-Account-Id"), + extra_headers=extra_headers, + ) + return cleaned + + async def recover_empty_stream( + self, + kwargs: dict[str, Any], + *, + last_role: str | None, + acompletion: Callable[..., Any], + ) -> list[StreamEvent] | None: + """Try a non-stream completion when Codex returns an empty stream.""" + fallback_kwargs = dict(kwargs) + fallback_kwargs.pop("stream", None) + fallback_kwargs.pop("stream_options", None) + fallback_kwargs = self._provider._sanitize_request_kwargs(fallback_kwargs, stream=False) + + try: + response = await acompletion(**fallback_kwargs) + except Exception as exc: + logger.debug( + "[stream-recover] %s non-stream fallback after empty %s stream failed: %s", + self._provider.model, + last_role, + exc, + ) + return None + + events = self._provider._build_stream_events_from_nonstream_response(response) + if events: + logger.info( + "[stream-recover] %s recovered empty %s stream via non-stream completion", + self._provider.model, + last_role, + ) + return events + return None + + def merge_tool_call_chunk( + self, + tool_calls_acc: dict[int, dict[str, str]], + tc: Any, + last_tool_idx: int, + ) -> int: + """Merge a streamed tool-call chunk, compensating for broken bridge indexes.""" + idx = tc.index if hasattr(tc, "index") and tc.index is not None else 0 + tc_id = getattr(tc, "id", None) or "" + func = getattr(tc, "function", None) + func_name = getattr(func, "name", "") if func is not None else "" + func_args = getattr(func, "arguments", "") if func is not None else "" + + if tc_id: + existing_idx = next( + (key for key, value in tool_calls_acc.items() if value["id"] == tc_id), + None, + ) + if existing_idx is not None: + idx = existing_idx + elif idx in tool_calls_acc and tool_calls_acc[idx]["id"] not in ("", tc_id): + idx = max(tool_calls_acc.keys(), default=-1) + 1 + last_tool_idx = idx + elif func_name: + if ( + last_tool_idx in tool_calls_acc + and tool_calls_acc[last_tool_idx]["name"] + and tool_calls_acc[last_tool_idx]["name"] != func_name + and tool_calls_acc[last_tool_idx]["arguments"] + ): + idx = max(tool_calls_acc.keys(), default=-1) + 1 + last_tool_idx = idx + else: + idx = last_tool_idx if tool_calls_acc else idx + else: + idx = last_tool_idx if tool_calls_acc else idx + + if idx not in tool_calls_acc: + tool_calls_acc[idx] = {"id": "", "name": "", "arguments": ""} + if tc_id: + tool_calls_acc[idx]["id"] = tc_id + if func_name: + tool_calls_acc[idx]["name"] = func_name + if func_args: + tool_calls_acc[idx]["arguments"] += func_args + return idx diff --git a/core/framework/llm/codex_backend.py b/core/framework/llm/codex_backend.py new file mode 100644 index 0000000000..5904f29094 --- /dev/null +++ b/core/framework/llm/codex_backend.py @@ -0,0 +1,70 @@ +"""Shared helpers for Codex's ChatGPT-backed transport. + +Codex CLI talks to the ChatGPT Codex backend, which is not the standard +platform OpenAI API. Hive keeps its normal provider contract by centralizing +the transport-specific headers and request kwargs here. +""" + +from __future__ import annotations + +from typing import Any + +CODEX_API_BASE = "https://chatgpt.com/backend-api/codex" +CODEX_USER_AGENT = "CodexBar" +CODEX_ALLOWED_OPENAI_PARAMS = ("store",) + + +def is_codex_api_base(api_base: str | None) -> bool: + """Return True when *api_base* targets the ChatGPT Codex backend.""" + return bool(api_base and CODEX_API_BASE in api_base) + + +def normalize_codex_api_base(api_base: str | None) -> str | None: + """Normalize ChatGPT Codex backend URLs to the stable base endpoint.""" + if not api_base: + return api_base + trimmed = api_base.rstrip("/") + if trimmed.endswith("/responses") and is_codex_api_base(trimmed): + return trimmed[: -len("/responses")] + return trimmed + + +def merge_codex_allowed_openai_params(params: list[str] | tuple[str, ...] | None) -> list[str]: + """Ensure Codex-required pass-through params are always present.""" + allowed = set(params or []) + allowed.update(CODEX_ALLOWED_OPENAI_PARAMS) + return sorted(allowed) + + +def build_codex_extra_headers( + api_key: str | None, + *, + account_id: str | None = None, + extra_headers: dict[str, str] | None = None, +) -> dict[str, str]: + """Build headers for the ChatGPT Codex backend.""" + headers = dict(extra_headers or {}) + if api_key: + headers.setdefault("Authorization", f"Bearer {api_key}") + headers.setdefault("User-Agent", CODEX_USER_AGENT) + if account_id: + headers.setdefault("ChatGPT-Account-Id", account_id) + return headers + + +def build_codex_litellm_kwargs( + api_key: str | None, + *, + account_id: str | None = None, + extra_headers: dict[str, str] | None = None, +) -> dict[str, Any]: + """Return the LiteLLM kwargs required by the ChatGPT Codex backend.""" + return { + "extra_headers": build_codex_extra_headers( + api_key, + account_id=account_id, + extra_headers=extra_headers, + ), + "store": False, + "allowed_openai_params": list(CODEX_ALLOWED_OPENAI_PARAMS), + } diff --git a/core/framework/llm/litellm.py b/core/framework/llm/litellm.py index 7697cdd83f..c8ae858a61 100644 --- a/core/framework/llm/litellm.py +++ b/core/framework/llm/litellm.py @@ -28,6 +28,8 @@ RateLimitError = Exception # type: ignore[assignment, misc] from framework.config import HIVE_LLM_ENDPOINT as HIVE_API_BASE +from framework.llm.codex_adapter import CodexResponsesAdapter +from framework.llm.codex_backend import normalize_codex_api_base from framework.llm.provider import LLMProvider, LLMResponse, Tool from framework.llm.stream_events import StreamEvent @@ -177,8 +179,6 @@ def _sync_wrapper(*args, _orig=original, **kwargs): "zai-glm", "glm-", ) - - def _model_supports_cache_control(model: str) -> bool: return any(model.startswith(p) for p in _CACHE_CONTROL_PREFIXES) @@ -512,7 +512,9 @@ def __init__( api_base = api_base.rstrip("/")[:-3] self.model = model self.api_key = api_key - self.api_base = api_base or self._default_api_base_for_model(_original_model) + self.api_base = normalize_codex_api_base( + api_base or self._default_api_base_for_model(_original_model) + ) self.extra_kwargs = kwargs # Detect Claude Code OAuth subscription by checking the api_key prefix. self._claude_code_oauth = bool(api_key and api_key.startswith("sk-ant-oat")) @@ -520,13 +522,11 @@ def __init__( # Anthropic requires a specific User-Agent for OAuth requests. eh = self.extra_kwargs.setdefault("extra_headers", {}) eh.setdefault("user-agent", CLAUDE_CODE_USER_AGENT) - # The Codex ChatGPT backend (chatgpt.com/backend-api/codex) rejects - # several standard OpenAI params: max_output_tokens, stream_options. - self._codex_backend = bool( - self.api_base and "chatgpt.com/backend-api/codex" in self.api_base - ) # Antigravity routes through a local OpenAI-compatible proxy — no patches needed. self._antigravity = bool(self.api_base and "localhost:8069" in self.api_base) + self._codex_adapter = CodexResponsesAdapter(self) + # Backward-compatible alias for existing tests/callers. + self._codex_backend = self._codex_adapter.enabled if litellm is None: raise ImportError( @@ -553,6 +553,132 @@ def _default_api_base_for_model(model: str) -> str | None: return HIVE_API_BASE return None + @staticmethod + def _normalize_codex_api_base(api_base: str | None) -> str | None: + """Normalize ChatGPT Codex backend URLs to the stable base endpoint.""" + return normalize_codex_api_base(api_base) + + def _chunk_codex_system_prompt(self, system: str) -> list[str]: + """Break large system prompts into smaller Codex-friendly chunks.""" + return self._codex_adapter.chunk_system_prompt(system) + + def _build_request_messages( + self, + messages: list[dict[str, Any]], + system: str, + *, + json_mode: bool, + ) -> list[dict[str, Any]]: + """Build request messages, including Codex-specific prompt chunking.""" + full_messages: list[dict[str, Any]] = [] + if self._claude_code_oauth: + billing = _claude_code_billing_header(messages) + full_messages.append({"role": "system", "content": billing}) + + system_messages: list[dict[str, Any]] = [] + if system: + if self._codex_backend: + system_messages.extend( + self._codex_adapter.build_system_messages(system, json_mode=json_mode) + ) + else: + sys_msg: dict[str, Any] = {"role": "system", "content": system} + if _model_supports_cache_control(self.model): + sys_msg["cache_control"] = {"type": "ephemeral"} + system_messages.append(sys_msg) + elif self._codex_backend: + system_messages.extend( + self._codex_adapter.build_system_messages("", json_mode=json_mode) + ) + + if json_mode and not self._codex_backend: + json_instruction = "Please respond with a valid JSON object." + if system_messages: + system_messages[0] = { + **system_messages[0], + "content": f"{system_messages[0]['content']}\n\n{json_instruction}", + } + else: + system_messages.append({"role": "system", "content": json_instruction}) + + full_messages.extend(system_messages) + full_messages.extend(messages) + + return [ + m + for m in full_messages + if not ( + m.get("role") == "assistant" and not m.get("content") and not m.get("tool_calls") + ) + ] + + def _derive_codex_tool_choice( + self, + messages: list[dict[str, Any]], + tools: list[Tool] | None, + ) -> str | dict[str, Any] | None: + """Force tool use for Codex when critical framework tools are available.""" + if not self._codex_backend: + return None + return self._codex_adapter.derive_tool_choice(messages, tools) + + def _sanitize_request_kwargs( + self, + kwargs: dict[str, Any], + *, + stream: bool, + ) -> dict[str, Any]: + """Normalize provider kwargs, with extra hardening for Codex.""" + cleaned = dict(kwargs) + if cleaned.get("metadata") is None: + cleaned.pop("metadata", None) + + if self._codex_backend: + cleaned = self._codex_adapter.harden_request_kwargs(cleaned) + + if stream: + cleaned["stream"] = True + return cleaned + + def _build_completion_kwargs( + self, + messages: list[dict[str, Any]], + system: str, + *, + tools: list[Tool] | None, + max_tokens: int, + response_format: dict[str, Any] | None, + json_mode: bool, + stream: bool, + ) -> dict[str, Any]: + """Build request kwargs for completion/stream calls.""" + full_messages = self._build_request_messages(messages, system, json_mode=json_mode) + kwargs: dict[str, Any] = { + "model": self.model, + "messages": full_messages, + **self.extra_kwargs, + } + if not stream: + kwargs["max_tokens"] = max_tokens + else: + kwargs["max_tokens"] = max_tokens + if not self._is_anthropic_model(): + kwargs["stream_options"] = {"include_usage": True} + + if self.api_key: + kwargs["api_key"] = self.api_key + if self.api_base: + kwargs["api_base"] = self.api_base + if tools: + kwargs["tools"] = [self._tool_to_openai_format(t) for t in tools] + tool_choice = self._derive_codex_tool_choice(full_messages, tools) + if tool_choice is not None: + kwargs["tool_choice"] = tool_choice + if response_format: + kwargs["response_format"] = response_format + + return self._sanitize_request_kwargs(kwargs, stream=stream) + def _completion_with_rate_limit_retry( self, max_retries: int | None = None, **kwargs: Any ) -> Any: @@ -691,42 +817,15 @@ def complete( ) ) - # Prepare messages with system prompt - full_messages = [] - if system: - full_messages.append({"role": "system", "content": system}) - full_messages.extend(messages) - - # Add JSON mode via prompt engineering (works across all providers) - if json_mode: - json_instruction = "\n\nPlease respond with a valid JSON object." - # Append to system message if present, otherwise add as system message - if full_messages and full_messages[0]["role"] == "system": - full_messages[0]["content"] += json_instruction - else: - full_messages.insert(0, {"role": "system", "content": json_instruction.strip()}) - - # Build kwargs - kwargs: dict[str, Any] = { - "model": self.model, - "messages": full_messages, - "max_tokens": max_tokens, - **self.extra_kwargs, - } - - if self.api_key: - kwargs["api_key"] = self.api_key - if self.api_base: - kwargs["api_base"] = self.api_base - - # Add tools if provided - if tools: - kwargs["tools"] = [self._tool_to_openai_format(t) for t in tools] - - # Add response_format for structured output - # LiteLLM passes this through to the underlying provider - if response_format: - kwargs["response_format"] = response_format + kwargs = self._build_completion_kwargs( + messages, + system, + tools=tools, + max_tokens=max_tokens, + response_format=response_format, + json_mode=json_mode, + stream=False, + ) # Make the call response = self._completion_with_rate_limit_retry(max_retries=max_retries, **kwargs) @@ -887,40 +986,15 @@ async def acomplete( json_mode=json_mode, ) return await self._collect_stream_to_response(stream_iter) - - full_messages: list[dict[str, Any]] = [] - if self._claude_code_oauth: - billing = _claude_code_billing_header(messages) - full_messages.append({"role": "system", "content": billing}) - if system: - sys_msg: dict[str, Any] = {"role": "system", "content": system} - if _model_supports_cache_control(self.model): - sys_msg["cache_control"] = {"type": "ephemeral"} - full_messages.append(sys_msg) - full_messages.extend(messages) - - if json_mode: - json_instruction = "\n\nPlease respond with a valid JSON object." - if full_messages and full_messages[0]["role"] == "system": - full_messages[0]["content"] += json_instruction - else: - full_messages.insert(0, {"role": "system", "content": json_instruction.strip()}) - - kwargs: dict[str, Any] = { - "model": self.model, - "messages": full_messages, - "max_tokens": max_tokens, - **self.extra_kwargs, - } - - if self.api_key: - kwargs["api_key"] = self.api_key - if self.api_base: - kwargs["api_base"] = self.api_base - if tools: - kwargs["tools"] = [self._tool_to_openai_format(t) for t in tools] - if response_format: - kwargs["response_format"] = response_format + kwargs = self._build_completion_kwargs( + messages, + system, + tools=tools, + max_tokens=max_tokens, + response_format=response_format, + json_mode=json_mode, + stream=False, + ) response = await self._acompletion_with_rate_limit_retry(max_retries=max_retries, **kwargs) @@ -1170,17 +1244,46 @@ def _repair_truncated_tool_arguments(self, raw_arguments: str) -> dict[str, Any] return parsed return None + @staticmethod + def _normalize_pythonish_tool_arguments(raw_arguments: str) -> str: + """Convert common JSON-like literals into a form ast.literal_eval can parse.""" + return re.sub( + r"\b(true|false|null)\b", + lambda match: { + "true": "True", + "false": "False", + "null": "None", + }[match.group(1)], + raw_arguments, + ) + + def _parse_pythonish_tool_arguments(self, raw_arguments: str) -> dict[str, Any] | None: + """Parse single-quoted / trailing-comma argument payloads safely.""" + stripped = raw_arguments.strip().strip("`") + if not stripped or stripped[0] != "{": + return None + candidate = self._close_truncated_json_fragment(stripped) + candidate = self._normalize_pythonish_tool_arguments(candidate) + try: + parsed = ast.literal_eval(candidate) + except (SyntaxError, ValueError): + return None + return parsed if isinstance(parsed, dict) else None + def _parse_tool_call_arguments(self, raw_arguments: str, tool_name: str) -> dict[str, Any]: """Parse streamed tool arguments, repairing truncation when possible.""" + stripped = raw_arguments.strip() + if stripped.startswith("```") and stripped.endswith("```"): + stripped = stripped.strip("`").strip() try: - parsed = json.loads(raw_arguments) if raw_arguments else {} + parsed = json.loads(stripped) if stripped else {} except json.JSONDecodeError: parsed = None if isinstance(parsed, dict): return parsed - repaired = self._repair_truncated_tool_arguments(raw_arguments) + repaired = self._repair_truncated_tool_arguments(stripped) if repaired is not None: logger.warning( "[tool-args] Recovered truncated arguments for %s on %s", @@ -1189,6 +1292,15 @@ def _parse_tool_call_arguments(self, raw_arguments: str, tool_name: str) -> dict ) return repaired + pythonish = self._parse_pythonish_tool_arguments(stripped) + if pythonish is not None: + logger.warning( + "[tool-args] Recovered malformed arguments for %s on %s", + tool_name, + self.model, + ) + return pythonish + raise ValueError( f"Failed to parse tool call arguments for '{tool_name}' (likely truncated JSON)." ) @@ -1516,6 +1628,139 @@ async def _stream_via_nonstream_completion( model=response.model, ) + def _build_stream_events_from_nonstream_response( + self, + response: Any, + ) -> list[StreamEvent]: + """Convert a non-stream completion response into stream events.""" + from framework.llm.stream_events import ( + FinishEvent, + TextDeltaEvent, + TextEndEvent, + ToolCallEvent, + ) + + choices = getattr(response, "choices", None) or [] + if not choices: + output_text = getattr(response, "output_text", "") or "" + if not output_text: + return [] + from framework.llm.stream_events import FinishEvent, TextDeltaEvent, TextEndEvent + + usage = getattr(response, "usage", None) + return [ + TextDeltaEvent(content=output_text, snapshot=output_text), + TextEndEvent(full_text=output_text), + FinishEvent( + stop_reason="stop", + input_tokens=getattr(usage, "prompt_tokens", 0) or 0 if usage else 0, + output_tokens=getattr(usage, "completion_tokens", 0) or 0 if usage else 0, + model=getattr(response, "model", None) or self.model, + ), + ] + + choice = choices[0] + message = getattr(choice, "message", None) + content = self._extract_message_text(message) + tool_calls = getattr(message, "tool_calls", None) or [] + + events: list[StreamEvent] = [] + for tc in tool_calls: + parsed_args = self._coerce_tool_input( + tc.function.arguments if tc.function else {}, + tc.function.name if tc.function else "", + ) + events.append( + ToolCallEvent( + tool_use_id=getattr(tc, "id", ""), + tool_name=tc.function.name if tc.function else "", + tool_input=parsed_args, + ) + ) + + if content: + events.append(TextDeltaEvent(content=content, snapshot=content)) + events.append(TextEndEvent(full_text=content)) + + usage = getattr(response, "usage", None) + input_tokens = getattr(usage, "prompt_tokens", 0) or 0 if usage else 0 + output_tokens = getattr(usage, "completion_tokens", 0) or 0 if usage else 0 + cached_tokens = 0 + if usage: + details = getattr(usage, "prompt_tokens_details", None) + cached_tokens = ( + getattr(details, "cached_tokens", 0) or 0 + if details is not None + else getattr(usage, "cache_read_input_tokens", 0) or 0 + ) + + events.append( + FinishEvent( + stop_reason=getattr(choice, "finish_reason", None) + or ("tool_calls" if tool_calls else "stop"), + input_tokens=input_tokens, + output_tokens=output_tokens, + cached_tokens=cached_tokens, + model=getattr(response, "model", None) or self.model, + ) + ) + return events + + @staticmethod + def _extract_message_text(message: Any) -> str: + """Extract text from a provider message object across response shapes.""" + if message is None: + return "" + content = getattr(message, "content", "") + if isinstance(content, str): + return content + if isinstance(content, list): + parts: list[str] = [] + for block in content: + if isinstance(block, str): + parts.append(block) + elif isinstance(block, dict): + text = block.get("text") or block.get("content") or "" + if isinstance(text, str): + parts.append(text) + else: + text = getattr(block, "text", "") or getattr(block, "content", "") + if isinstance(text, str): + parts.append(text) + return "".join(parts) + return str(content or "") + + def _coerce_tool_input(self, raw_arguments: Any, tool_name: str) -> dict[str, Any]: + """Normalize raw tool-call arguments from either string or object forms.""" + if isinstance(raw_arguments, dict): + return raw_arguments + if raw_arguments in (None, ""): + return {} + return self._parse_tool_call_arguments(str(raw_arguments), tool_name) + + async def _recover_empty_codex_stream( + self, + kwargs: dict[str, Any], + last_role: str | None, + ) -> list[StreamEvent] | None: + """Try a non-stream completion when Codex returns an empty stream.""" + if not self._codex_backend: + return None + return await self._codex_adapter.recover_empty_stream( + kwargs, + last_role=last_role, + acompletion=litellm.acompletion, # type: ignore[union-attr] + ) + + def _merge_tool_call_chunk( + self, + tool_calls_acc: dict[int, dict[str, str]], + tc: Any, + last_tool_idx: int, + ) -> int: + """Merge a streamed tool-call chunk, compensating for broken Codex indexes.""" + return self._codex_adapter.merge_tool_call_chunk(tool_calls_acc, tc, last_tool_idx) + async def stream( self, messages: list[dict[str, Any]], @@ -1567,65 +1812,16 @@ async def stream( yield event return - full_messages: list[dict[str, Any]] = [] - if self._claude_code_oauth: - billing = _claude_code_billing_header(messages) - full_messages.append({"role": "system", "content": billing}) - if system: - sys_msg: dict[str, Any] = {"role": "system", "content": system} - if _model_supports_cache_control(self.model): - sys_msg["cache_control"] = {"type": "ephemeral"} - full_messages.append(sys_msg) - full_messages.extend(messages) - - # Codex Responses API requires an `instructions` field (system prompt). - # Inject a minimal one when callers don't provide a system message. - if self._codex_backend and not any(m["role"] == "system" for m in full_messages): - full_messages.insert(0, {"role": "system", "content": "You are a helpful assistant."}) - - # Add JSON mode via prompt engineering (works across all providers) - if json_mode: - json_instruction = "\n\nPlease respond with a valid JSON object." - if full_messages and full_messages[0]["role"] == "system": - full_messages[0]["content"] += json_instruction - else: - full_messages.insert(0, {"role": "system", "content": json_instruction.strip()}) - - # Remove ghost empty assistant messages (content="" and no tool_calls). - # These arise when a model returns an empty stream after a tool result - # (an "expected" no-op turn). Keeping them in history confuses some - # models (notably Codex/gpt-5.3) and causes cascading empty streams. - full_messages = [ - m - for m in full_messages - if not ( - m.get("role") == "assistant" and not m.get("content") and not m.get("tool_calls") - ) - ] - - kwargs: dict[str, Any] = { - "model": self.model, - "messages": full_messages, - "max_tokens": max_tokens, - "stream": True, - **self.extra_kwargs, - } - # stream_options is OpenAI-specific; Anthropic rejects it with 400. - # Only include it for providers that support it. - if not self._is_anthropic_model(): - kwargs["stream_options"] = {"include_usage": True} - if self.api_key: - kwargs["api_key"] = self.api_key - if self.api_base: - kwargs["api_base"] = self.api_base - if tools: - kwargs["tools"] = [self._tool_to_openai_format(t) for t in tools] - if response_format: - kwargs["response_format"] = response_format - # The Codex ChatGPT backend (Responses API) rejects several params. - if self._codex_backend: - kwargs.pop("max_tokens", None) - kwargs.pop("stream_options", None) + kwargs = self._build_completion_kwargs( + messages, + system, + tools=tools, + max_tokens=max_tokens, + response_format=response_format, + json_mode=json_mode, + stream=True, + ) + full_messages = kwargs["messages"] for attempt in range(RATE_LIMIT_MAX_RETRIES + 1): # Post-stream events (ToolCall, TextEnd, Finish) are buffered @@ -1683,43 +1879,17 @@ async def stream( # argument deltas that arrive with id=None. if delta and delta.tool_calls: for tc in delta.tool_calls: - idx = tc.index if hasattr(tc, "index") and tc.index is not None else 0 - - if tc.id: - # New tool call announced (or done event re-sent). - # Check if this id already has a slot. - existing_idx = next( - (k for k, v in tool_calls_acc.items() if v["id"] == tc.id), - None, - ) - if existing_idx is not None: - idx = existing_idx - elif idx in tool_calls_acc and tool_calls_acc[idx]["id"] not in ( - "", - tc.id, - ): - # Slot taken by a different call — assign new index - idx = max(tool_calls_acc.keys()) + 1 - _last_tool_idx = idx - else: - # Argument delta with no id — route to last opened slot - idx = _last_tool_idx - - if idx not in tool_calls_acc: - tool_calls_acc[idx] = {"id": "", "name": "", "arguments": ""} - if tc.id: - tool_calls_acc[idx]["id"] = tc.id - if tc.function: - if tc.function.name: - tool_calls_acc[idx]["name"] = tc.function.name - if tc.function.arguments: - tool_calls_acc[idx]["arguments"] += tc.function.arguments + _last_tool_idx = self._merge_tool_call_chunk( + tool_calls_acc, + tc, + _last_tool_idx, + ) # --- Finish --- if choice.finish_reason: stream_finish_reason = choice.finish_reason for _idx, tc_data in sorted(tool_calls_acc.items()): - parsed_args = self._parse_tool_call_arguments( + parsed_args = self._coerce_tool_input( tc_data.get("arguments", ""), tc_data.get("name", ""), ) @@ -1852,6 +2022,11 @@ async def stream( (m["role"] for m in reversed(full_messages) if m.get("role") != "system"), None, ) + recovered_events = await self._recover_empty_codex_stream(kwargs, last_role) + if recovered_events: + for event in recovered_events: + yield event + return if attempt < EMPTY_STREAM_MAX_RETRIES: token_count, token_method = _estimate_tokens( self.model, diff --git a/core/framework/runner/runner.py b/core/framework/runner/runner.py index 901fd3605d..a8b14ce743 100644 --- a/core/framework/runner/runner.py +++ b/core/framework/runner/runner.py @@ -22,6 +22,7 @@ ) from framework.graph.executor import ExecutionResult from framework.graph.node import NodeSpec +from framework.llm.codex_backend import CODEX_API_BASE, build_codex_litellm_kwargs from framework.llm.provider import LLMProvider, Tool from framework.runner.preload_validation import run_preload_validation from framework.runner.tool_registry import ToolRegistry @@ -327,17 +328,68 @@ def _read_codex_auth_file() -> dict | None: return None +def _get_jwt_claims(token: str) -> dict | None: + """Decode JWT claims without verification for local expiry/account inspection.""" + import base64 + + try: + parts = token.split(".") + if len(parts) != 3: + return None + payload = parts[1] + padding = 4 - len(payload) % 4 + if padding != 4: + payload += "=" * padding + decoded = base64.urlsafe_b64decode(payload) + claims = json.loads(decoded) + return claims if isinstance(claims, dict) else None + except Exception: + return None + + +def _get_codex_token_expiry(auth_data: dict) -> float | None: + """Return the best-known expiry timestamp for a Codex access token.""" + from datetime import datetime + + tokens = auth_data.get("tokens", {}) + access_token = tokens.get("access_token") + explicit = ( + auth_data.get("expires_at") + or auth_data.get("expiresAt") + or tokens.get("expires_at") + or tokens.get("expiresAt") + ) + if isinstance(explicit, (int, float)): + return float(explicit) + if isinstance(explicit, str): + try: + return datetime.fromisoformat(explicit.replace("Z", "+00:00")).timestamp() + except (ValueError, TypeError): + pass + + if isinstance(access_token, str): + claims = _get_jwt_claims(access_token) or {} + exp = claims.get("exp") + if isinstance(exp, (int, float)): + return float(exp) + return None + + def _is_codex_token_expired(auth_data: dict) -> bool: """Check whether the Codex token is expired or close to expiry. The Codex auth.json has no explicit ``expiresAt`` field, so we infer expiry as ``last_refresh + _CODEX_TOKEN_LIFETIME_SECS``. Falls back - to the file mtime when ``last_refresh`` is absent. + to JWT ``exp`` or file age heuristics when no explicit timestamp exists. """ import time from datetime import datetime now = time.time() + explicit_expiry = _get_codex_token_expiry(auth_data) + if explicit_expiry is not None: + return now >= (explicit_expiry - _TOKEN_REFRESH_BUFFER_SECS) + last_refresh = auth_data.get("last_refresh") if last_refresh is None: @@ -431,6 +483,8 @@ def get_codex_token() -> str | None: Returns: The access token if available, None otherwise. """ + import time + # Try Keychain first, then file auth_data = _read_codex_keychain() or _read_codex_auth_file() if not auth_data: @@ -441,15 +495,20 @@ def get_codex_token() -> str | None: if not access_token: return None + explicit_expiry = _get_codex_token_expiry(auth_data) + is_expired = _is_codex_token_expired(auth_data) + # Check if token is still valid - if not _is_codex_token_expired(auth_data): + if not is_expired: return access_token # Token is expired or near expiry — attempt refresh refresh_token = tokens.get("refresh_token") if not refresh_token: logger.warning("Codex token expired and no refresh token available") - return access_token # Return expired token; it may still work briefly + if explicit_expiry is not None and time.time() >= explicit_expiry: + return None + return access_token logger.info("Codex token expired or near expiry, refreshing...") token_data = _refresh_codex_token(refresh_token) @@ -460,6 +519,8 @@ def get_codex_token() -> str | None: # Refresh failed — return the existing token and warn logger.warning("Codex token refresh failed. Run 'codex' to re-authenticate.") + if explicit_expiry is not None and time.time() >= explicit_expiry: + return None return access_token @@ -471,26 +532,12 @@ def _get_account_id_from_jwt(access_token: str) -> str | None: This is used as a fallback when the auth.json doesn't store the account_id explicitly. """ - import base64 - - try: - parts = access_token.split(".") - if len(parts) != 3: - return None - payload = parts[1] - # Add base64 padding - padding = 4 - len(payload) % 4 - if padding != 4: - payload += "=" * padding - decoded = base64.urlsafe_b64decode(payload) - claims = json.loads(decoded) - auth = claims.get("https://api.openai.com/auth") - if isinstance(auth, dict): - account_id = auth.get("chatgpt_account_id") - if isinstance(account_id, str) and account_id: - return account_id - except Exception: - pass + claims = _get_jwt_claims(access_token) or {} + auth = claims.get("https://api.openai.com/auth") + if isinstance(auth, dict): + account_id = auth.get("chatgpt_account_id") + if isinstance(account_id, str) and account_id: + return account_id return None @@ -863,8 +910,6 @@ def _is_antigravity_proxy_available() -> bool: return True except (OSError, TimeoutError): return False - - @dataclass class AgentInfo: """Information about an exported agent.""" @@ -1558,20 +1603,20 @@ def _setup(self, event_bus=None) -> None: # OpenAI Codex subscription routes through the ChatGPT backend # (chatgpt.com/backend-api/codex/responses), NOT the standard # OpenAI API. The consumer OAuth token lacks platform API scopes. - extra_headers: dict[str, str] = { - "Authorization": f"Bearer {api_key}", - "User-Agent": "CodexBar", - } account_id = get_codex_account_id() - if account_id: - extra_headers["ChatGPT-Account-Id"] = account_id self._llm = LiteLLMProvider( model=self.model, api_key=api_key, - api_base="https://chatgpt.com/backend-api/codex", - extra_headers=extra_headers, - store=False, - allowed_openai_params=["store"], + api_base=CODEX_API_BASE, + **build_codex_litellm_kwargs(api_key, account_id=account_id), + ) + elif api_key and use_kimi_code: + # Kimi Code subscription uses the Kimi coding API (OpenAI-compatible). + # The api_base is set automatically by LiteLLMProvider for kimi/ models. + self._llm = LiteLLMProvider( + model=self.model, + api_key=api_key, + api_base=api_base, ) elif api_key and use_kimi_code: # Kimi Code subscription uses the Kimi coding API (OpenAI-compatible). diff --git a/core/tests/test_config.py b/core/tests/test_config.py index 272b8403e7..ceaa40c60f 100644 --- a/core/tests/test_config.py +++ b/core/tests/test_config.py @@ -1,8 +1,15 @@ """Tests for framework/config.py - Hive configuration loading.""" import logging +from unittest.mock import patch -from framework.config import get_api_base, get_hive_config, get_preferred_model +from framework.config import ( + get_api_base, + get_hive_config, + get_llm_extra_kwargs, + get_preferred_model, +) +from framework.llm.codex_backend import CODEX_API_BASE class TestGetHiveConfig: @@ -59,9 +66,48 @@ def test_get_api_base_falls_back_to_openrouter_default(self, tmp_path, monkeypat def test_get_api_base_keeps_explicit_openrouter_api_base(self, tmp_path, monkeypatch): config_file = tmp_path / "configuration.json" config_file.write_text( - '{"llm":{"provider":"openrouter","model":"x-ai/grok-4.20-beta","api_base":"https://proxy.example/v1"}}', + ( + '{"llm":{"provider":"openrouter","model":"x-ai/grok-4.20-beta",' + '"api_base":"https://proxy.example/v1"}}' + ), encoding="utf-8", ) monkeypatch.setattr("framework.config.HIVE_CONFIG_FILE", config_file) assert get_api_base() == "https://proxy.example/v1" + + +class TestCodexConfig: + """Codex config helpers should share the same transport defaults.""" + + def test_get_api_base_uses_shared_codex_backend(self, tmp_path, monkeypatch): + config_file = tmp_path / "configuration.json" + config_file.write_text( + '{"llm":{"provider":"openai","model":"gpt-5.3-codex","use_codex_subscription":true}}', + encoding="utf-8", + ) + monkeypatch.setattr("framework.config.HIVE_CONFIG_FILE", config_file) + + assert get_api_base() == CODEX_API_BASE + + def test_get_llm_extra_kwargs_uses_shared_codex_transport(self, tmp_path, monkeypatch): + config_file = tmp_path / "configuration.json" + config_file.write_text( + '{"llm":{"provider":"openai","model":"gpt-5.3-codex","use_codex_subscription":true}}', + encoding="utf-8", + ) + monkeypatch.setattr("framework.config.HIVE_CONFIG_FILE", config_file) + + with ( + patch("framework.runner.runner.get_codex_token", return_value="tok_test"), + patch("framework.runner.runner.get_codex_account_id", return_value="acct_123"), + ): + kwargs = get_llm_extra_kwargs() + + assert kwargs["store"] is False + assert kwargs["allowed_openai_params"] == ["store"] + assert kwargs["extra_headers"] == { + "Authorization": "Bearer tok_test", + "User-Agent": "CodexBar", + "ChatGPT-Account-Id": "acct_123", + } diff --git a/core/tests/test_litellm_provider.py b/core/tests/test_litellm_provider.py index 6024f355c3..849765a440 100644 --- a/core/tests/test_litellm_provider.py +++ b/core/tests/test_litellm_provider.py @@ -238,6 +238,19 @@ def test_parse_tool_call_arguments_raises_when_unrepairable(self): with pytest.raises(ValueError, match="Failed to parse tool call arguments"): provider._parse_tool_call_arguments('{"question": foo', "ask_user") + def test_parse_tool_call_arguments_recovers_pythonish_payloads(self): + """Single-quoted and trailing-comma argument payloads should be recovered.""" + provider = LiteLLMProvider(model="openai/gpt-5.3-codex", api_key="test-key") + + parsed = provider._parse_tool_call_arguments( + "{'question': 'Continue?', 'options': ['Yes', 'No'],}", + "ask_user", + ) + + assert parsed == { + "question": "Continue?", + "options": ["Yes", "No"], + } class TestAnthropicProviderBackwardCompatibility: """Test AnthropicProvider backward compatibility with LiteLLM backend.""" @@ -728,6 +741,219 @@ def test_is_minimax_model_variants(self): assert not LiteLLMProvider(model="gpt-4o-mini", api_key="x")._is_minimax_model() +class TestCodexEmptyStreamRecovery: + """Codex empty streams should fall back before surfacing ghost-stream retries.""" + + @pytest.mark.asyncio + @patch("litellm.acompletion") + async def test_stream_recovers_empty_codex_stream_via_nonstream_completion( + self, + mock_acompletion, + ): + """An empty Codex stream should be salvaged with a non-stream completion.""" + from framework.llm.stream_events import FinishEvent, TextDeltaEvent + + provider = LiteLLMProvider( + model="openai/gpt-5.3-codex", + api_key="test-key", + api_base="https://chatgpt.com/backend-api/codex", + ) + + class EmptyStreamResponse: + chunks: list = [] + + def __aiter__(self): + return self + + async def __anext__(self): + raise StopAsyncIteration + + recovered = MagicMock() + recovered.choices = [MagicMock()] + recovered.choices[0].message.content = "Recovered via fallback" + recovered.choices[0].message.tool_calls = [] + recovered.choices[0].finish_reason = "stop" + recovered.model = provider.model + recovered.usage.prompt_tokens = 12 + recovered.usage.completion_tokens = 4 + + async def side_effect(*args, **kwargs): + if kwargs.get("stream"): + return EmptyStreamResponse() + return recovered + + mock_acompletion.side_effect = side_effect + + events = [] + async for event in provider.stream(messages=[{"role": "user", "content": "hi"}]): + events.append(event) + + text_events = [event for event in events if isinstance(event, TextDeltaEvent)] + assert len(text_events) == 1 + assert text_events[0].snapshot == "Recovered via fallback" + + finish_events = [event for event in events if isinstance(event, FinishEvent)] + assert len(finish_events) == 1 + assert finish_events[0].stop_reason == "stop" + assert finish_events[0].input_tokens == 12 + assert finish_events[0].output_tokens == 4 + + assert mock_acompletion.call_count == 2 + assert mock_acompletion.call_args_list[0].kwargs["stream"] is True + assert "stream" not in mock_acompletion.call_args_list[1].kwargs + + @pytest.mark.asyncio + @patch("litellm.acompletion") + async def test_stream_recovers_empty_codex_stream_with_tool_calls( + self, + mock_acompletion, + ): + """Non-stream fallback should preserve tool calls, not just text.""" + from framework.llm.stream_events import FinishEvent, ToolCallEvent + + provider = LiteLLMProvider( + model="openai/gpt-5.3-codex", + api_key="test-key", + api_base="https://chatgpt.com/backend-api/codex/responses", + ) + + class EmptyStreamResponse: + chunks: list = [] + + def __aiter__(self): + return self + + async def __anext__(self): + raise StopAsyncIteration + + tc = MagicMock() + tc.id = "tool_1" + tc.function.name = "ask_user" + tc.function.arguments = '{"question":"Continue?","options":["Yes","No"]}' + + recovered = MagicMock() + recovered.choices = [MagicMock()] + recovered.choices[0].message.content = "" + recovered.choices[0].message.tool_calls = [tc] + recovered.choices[0].finish_reason = "tool_calls" + recovered.model = provider.model + recovered.usage.prompt_tokens = 14 + recovered.usage.completion_tokens = 5 + + async def side_effect(*args, **kwargs): + if kwargs.get("stream"): + return EmptyStreamResponse() + return recovered + + mock_acompletion.side_effect = side_effect + + events = [] + async for event in provider.stream( + messages=[{"role": "user", "content": "Should we continue?"}], + tools=[ + Tool( + name="ask_user", + description="Ask the user", + parameters={"properties": {"question": {"type": "string"}}}, + ) + ], + ): + events.append(event) + + tool_events = [event for event in events if isinstance(event, ToolCallEvent)] + assert len(tool_events) == 1 + assert tool_events[0].tool_name == "ask_user" + assert tool_events[0].tool_input == { + "question": "Continue?", + "options": ["Yes", "No"], + } + + finish_events = [event for event in events if isinstance(event, FinishEvent)] + assert len(finish_events) == 1 + assert finish_events[0].stop_reason == "tool_calls" + + +class TestCodexRequestHardening: + def test_codex_build_completion_kwargs_splits_prompt_and_forces_tool_choice(self): + """Codex requests should chunk large system prompts and require tools when needed.""" + provider = LiteLLMProvider( + model="openai/gpt-5.3-codex", + api_key="test-key", + api_base="https://chatgpt.com/backend-api/codex/responses", + ) + kwargs = provider._build_completion_kwargs( + messages=[{"role": "user", "content": "hi"}], + system="# Identity\n" + ("rule\n" * 2000), + tools=[ + Tool( + name="ask_user", + description="Ask the user", + parameters={"properties": {"question": {"type": "string"}}}, + ) + ], + max_tokens=256, + response_format=None, + json_mode=False, + stream=True, + ) + + system_messages = [m for m in kwargs["messages"] if m["role"] == "system"] + assert len(system_messages) >= 2 + assert system_messages[0]["content"].startswith("# Codex Execution Contract") + assert kwargs["tool_choice"] == "required" + assert kwargs["store"] is False + assert "max_tokens" not in kwargs + assert "stream_options" not in kwargs + assert kwargs["api_base"] == "https://chatgpt.com/backend-api/codex" + assert "store" in kwargs["allowed_openai_params"] + + def test_codex_merge_tool_call_chunk_handles_parallel_calls_with_broken_indexes(self): + """Codex chunk merging should survive index=0 for multiple parallel tool calls.""" + from types import SimpleNamespace + + provider = LiteLLMProvider( + model="openai/gpt-5.3-codex", + api_key="test-key", + api_base="https://chatgpt.com/backend-api/codex", + ) + acc: dict[int, dict[str, str]] = {} + last_idx = 0 + + chunks = [ + SimpleNamespace( + id="tool_1", + index=0, + function=SimpleNamespace(name="web_search", arguments='{"query":"alpha'), + ), + SimpleNamespace( + id="tool_2", + index=0, + function=SimpleNamespace(name="read_file", arguments='{"path":"beta'), + ), + SimpleNamespace( + id=None, + index=0, + function=SimpleNamespace(name=None, arguments='"}'), + ), + SimpleNamespace( + id=None, + index=0, + function=SimpleNamespace(name=None, arguments='"}'), + ), + ] + + for chunk in chunks: + last_idx = provider._merge_tool_call_chunk(acc, chunk, last_idx) + + assert len(acc) == 2 + parsed = [ + provider._parse_tool_call_arguments(slot["arguments"], slot["name"]) + for _, slot in sorted(acc.items()) + ] + assert parsed == [ + {"query": "alpha"}, + {"path": "beta"}, + ] class TestOpenRouterToolCompatFallback: """OpenRouter models should fall back when native tool use is unavailable."""