Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions core/framework/llm/litellm.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@

import ast
import asyncio
import concurrent.futures
import hashlib
import json
import logging
import os
import re
import time
from collections.abc import AsyncIterator
from collections.abc import AsyncIterator, Callable
from datetime import datetime
from pathlib import Path
from typing import Any
Expand Down Expand Up @@ -705,8 +706,8 @@ def complete(
# Codex ChatGPT backend requires streaming — delegate to the unified
# async streaming path which properly handles tool calls.
if self._codex_backend:
return asyncio.run(
self.acomplete(
return self._run_coro_from_sync(
lambda: self.acomplete(
messages=messages,
system=system,
tools=tools,
Expand Down Expand Up @@ -783,6 +784,25 @@ def complete(
raw_response=response,
)

@staticmethod
def _run_coro_from_sync(coro_factory: Callable[[], Any]) -> Any:
"""Run a coroutine from sync code, even when an event loop is active.

``asyncio.run()`` raises when called from a thread with an active loop
(notebooks, async frameworks). In that case, offload coroutine execution
to a worker thread with its own event loop.
"""
try:
asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(coro_factory())

def _run_in_worker() -> Any:
return asyncio.run(coro_factory())

with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
return executor.submit(_run_in_worker).result()
Comment on lines +795 to +804
Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a running event loop is detected, this offloads the coroutine to a worker thread but does not propagate contextvars into that thread. This can drop trace/execution context in logs/headers produced inside acomplete()/stream() (the codebase elsewhere explicitly copies context before running work in a thread). Consider wrapping the worker call with contextvars.copy_context() (e.g., submit ctx.run(...)) so observability context is preserved.

Copilot uses AI. Check for mistakes.

# ------------------------------------------------------------------
# Async variants — non-blocking on the event loop
# ------------------------------------------------------------------
Expand Down
31 changes: 31 additions & 0 deletions core/tests/test_litellm_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,37 @@ def test_init_ollama_no_key_needed(self):
class TestLiteLLMProviderComplete:
"""Test LiteLLMProvider.complete() method."""

def test_complete_codex_backend_in_sync_context(self):
"""Codex path should work from normal sync callers."""
provider = LiteLLMProvider(
model="gpt-4o-mini",
api_key="test-key",
api_base="https://chatgpt.com/backend-api/codex",
)
expected = LLMResponse(content="codex ok", model="codex-test")

with patch.object(provider, "acomplete", new=AsyncMock(return_value=expected)) as mock_ac:
result = provider.complete(messages=[{"role": "user", "content": "Hello"}])

assert result == expected
mock_ac.assert_awaited_once()

@pytest.mark.asyncio
async def test_complete_codex_backend_in_running_event_loop(self):
"""Codex path should not crash when complete() is called from async code."""
provider = LiteLLMProvider(
model="gpt-4o-mini",
api_key="test-key",
api_base="https://chatgpt.com/backend-api/codex",
)
expected = LLMResponse(content="codex async ok", model="codex-test")

with patch.object(provider, "acomplete", new=AsyncMock(return_value=expected)) as mock_ac:
result = provider.complete(messages=[{"role": "user", "content": "Hello"}])

assert result == expected
mock_ac.assert_awaited_once()

@patch("litellm.completion")
def test_complete_basic(self, mock_completion):
"""Test basic completion call."""
Expand Down
Loading