Skip to content

Gemini causes 'Event loop is closed' when running inside an async context #748

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
asaf opened this issue Jan 22, 2025 · 41 comments · May be fixed by #1695
Open

Gemini causes 'Event loop is closed' when running inside an async context #748

asaf opened this issue Jan 22, 2025 · 41 comments · May be fixed by #1695
Labels
more info More information required question Further information is requested

Comments

@asaf
Copy link

asaf commented Jan 22, 2025

Hi,

This is an attempt to use google-gla:gemini-2.0-flash-exp via streamit in async,

Code below causes RuntimeError: Event loop is closed, same code works just fine for openai:gpt-4o

import asyncio

import streamlit as st
from pydantic_ai import Agent
from pydantic_ai.messages import ModelRequest, UserPromptPart


async def main():
    st.title("Simple chat")

    # Initialize chat history
    if "messages" not in st.session_state:
        st.session_state.messages = []

    # Display chat messages from history on app rerun
    for message in st.session_state.messages:
        part1 = message.parts[0]
        role = ""
        if isinstance(part1, UserPromptPart):
            role = "user"
        with st.chat_message("human" if role == "user" else "ai"):
            st.markdown(message.parts[0].content)

    # Accept user input
    if prompt := st.chat_input("Say something..."):
        # Display user message in chat message container
        st.chat_message("user").markdown(prompt)

        # Add user message to chat history
        st.session_state.messages.append(
            ModelRequest(parts=[UserPromptPart(content=prompt)])
        )

        # Display assistant response in chat message container
        with st.chat_message("assistant"):
            response_container = st.empty()  # Placeholder for streaming response
            agent = Agent(
                "google-gla:gemini-2.0-flash-exp",
                # "openai:gpt-4o", # works!
                # https://ai.pydantic.dev/results/#structured-result-validation
                result_type=str,  # type: ignore
                system_prompt="you are a nice bot",
            )
            result = await agent.run(prompt)
            response_container.markdown(result.data)


if __name__ == "__main__":
    asyncio.run(main())
@izzyacademy
Copy link
Contributor

@asaf, it looks like the event loop in GeminiAgentModel, which is responsible for managing and running asynchronous tasks, has been shut down, and the program is trying to reuse it. Once closed, the event loop cannot execute any more tasks.

Please could you share the full stack trace of the error so that we can narrow down which lines are the culprit of this problem.

Thanks in advance!

@dmontagu
Copy link
Contributor

Unfortunately, I'm not familiar enough with streamlit to be able to reproduce the issue on my own, I tried and just get this:

2025-01-22 13:43:09.553 WARNING streamlit.runtime.scriptrunner_utils.script_run_context: Thread 'MainThread': missing ScriptRunContext! This warning can be ignored when running in bare mode.
2025-01-22 13:43:09.621 
  Warning: to view this Streamlit app on a browser, run it with the following
  command:

    streamlit run /Users/davidmontague/Library/Application Support/JetBrains/PyCharm2024.3/scratches/scratch_66.py [ARGUMENTS]
2025-01-22 13:43:09.621 Thread 'MainThread': missing ScriptRunContext! This warning can be ignored when running in bare mode.
2025-01-22 13:43:09.621 Thread 'MainThread': missing ScriptRunContext! This warning can be ignored when running in bare mode.
2025-01-22 13:43:09.622 Session state does not function when running a script without `streamlit run`
2025-01-22 13:43:09.622 Thread 'MainThread': missing ScriptRunContext! This warning can be ignored when running in bare mode.
2025-01-22 13:43:09.622 Thread 'MainThread': missing ScriptRunContext! This warning can be ignored when running in bare mode.
2025-01-22 13:43:09.622 Thread 'MainThread': missing ScriptRunContext! This warning can be ignored when running in bare mode.
2025-01-22 13:43:09.622 Thread 'MainThread': missing ScriptRunContext! This warning can be ignored when running in bare mode.
2025-01-22 13:43:09.622 Thread 'MainThread': missing ScriptRunContext! This warning can be ignored when running in bare mode.
2025-01-22 13:43:09.622 Thread 'MainThread': missing ScriptRunContext! This warning can be ignored when running in bare mode.
2025-01-22 13:43:09.622 Thread 'MainThread': missing ScriptRunContext! This warning can be ignored when running in bare mode.
2025-01-22 13:43:09.622 Thread 'MainThread': missing ScriptRunContext! This warning can be ignored when running in bare mode.

Process finished with exit code 0

If you can provide a more detailed traceback or a self-contained way to reproduce this (i.e., more information about how to execute the script to cause the problem, any environment settings that need to be set, etc.) then we can continue to look into this more.

@dmontagu dmontagu added the more info More information required label Jan 22, 2025
@asaf
Copy link
Author

asaf commented Jan 22, 2025

Hey guys,
Here's a stacktrace, let me know if you need more info:

File "`/Users/johndoe/geminitest`/.venv/lib/python3.13/site-packages/streamlit/runtime/scriptrunner/exec_code.py", line 88, in exec_func_with_error_handling
    result = func()
File "/Users/johndoe/geminitest/.venv/lib/python3.13/site-packages/streamlit/runtime/scriptrunner/script_runner.py", line 579, in code_to_exec
    exec(code, module.__dict__)
    ~~~~^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/johndoe/geminitest/gemini-bug.py", line 49, in <module>
    asyncio.run(main())
    ~~~~~~~~~~~^^^^^^^^
File "/usr/local/Cellar/[email protected]/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/runners.py", line 194, in run
    return runner.run(main)
           ~~~~~~~~~~^^^^^^
File "/usr/local/Cellar/[email protected]/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
File "/usr/local/Cellar/[email protected]/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/base_events.py", line 721, in run_until_complete
    return future.result()
           ~~~~~~~~~~~~~^^
File "/Users/johndoe/geminitest/gemini-bug.py", line 44, in main
    result = await agent.run(prompt)
             ^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/johndoe/geminitest/.venv/lib/python3.13/site-packages/pydantic_ai/agent.py", line 300, in run
    model_response, request_usage = await agent_model.request(messages, model_settings)
                                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/johndoe/geminitest/.venv/lib/python3.13/site-packages/pydantic_ai/models/gemini.py", line 174, in request
    async with self._make_request(messages, False, model_settings) as http_response:
               ~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/Cellar/[email protected]/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/contextlib.py", line 214, in __aenter__
    return await anext(self.gen)
           ^^^^^^^^^^^^^^^^^^^^^
File "/Users/johndoe/geminitest/.venv/lib/python3.13/site-packages/pydantic_ai/models/gemini.py", line 220, in _make_request
    async with self.http_client.stream(
               ~~~~~~~~~~~~~~~~~~~~~~~^
        'POST',
        ^^^^^^^
    ...<3 lines>...
        timeout=(model_settings or {}).get('timeout', USE_CLIENT_DEFAULT),
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    ) as r:
    ^
File "/usr/local/Cellar/[email protected]/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/contextlib.py", line 214, in __aenter__
    return await anext(self.gen)
           ^^^^^^^^^^^^^^^^^^^^^
File "/Users/johndoe/geminitest/.venv/lib/python3.13/site-packages/httpx/_client.py", line 1583, in stream
    response = await self.send(
               ^^^^^^^^^^^^^^^^
    ...<4 lines>...
    )
    ^
File "/Users/johndoe/geminitest/.venv/lib/python3.13/site-packages/httpx/_client.py", line 1629, in send
    response = await self._send_handling_auth(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    ...<4 lines>...
    )
    ^
File "/Users/johndoe/geminitest/.venv/lib/python3.13/site-packages/httpx/_client.py", line 1657, in _send_handling_auth
    response = await self._send_handling_redirects(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    ...<3 lines>...
    )
    ^
File "/Users/johndoe/geminitest/.venv/lib/python3.13/site-packages/httpx/_client.py", line 1694, in _send_handling_redirects
    response = await self._send_single_request(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/johndoe/geminitest/.venv/lib/python3.13/site-packages/httpx/_client.py", line 1730, in _send_single_request
    response = await transport.handle_async_request(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/johndoe/geminitest/.venv/lib/python3.13/site-packages/httpx/_transports/default.py", line 394, in handle_async_request
    resp = await self._pool.handle_async_request(req)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/johndoe/geminitest/.venv/lib/python3.13/site-packages/httpcore/_async/connection_pool.py", line 256, in handle_async_request
    raise exc from None
File "/Users/johndoe/geminitest/.venv/lib/python3.13/site-packages/httpcore/_async/connection_pool.py", line 236, in handle_async_request
    response = await connection.handle_async_request(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        pool_request.request
        ^^^^^^^^^^^^^^^^^^^^
    )
    ^
File "/Users/johndoe/geminitest/.venv/lib/python3.13/site-packages/httpcore/_async/connection.py", line 103, in handle_async_request
    return await self._connection.handle_async_request(request)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/johndoe/geminitest/.venv/lib/python3.13/site-packages/httpcore/_async/http11.py", line 135, in handle_async_request
    await self._response_closed()
File "/Users/johndoe/geminitest/.venv/lib/python3.13/site-packages/httpcore/_async/http11.py", line 250, in _response_closed
    await self.aclose()
File "/Users/johndoe/geminitest/.venv/lib/python3.13/site-packages/httpcore/_async/http11.py", line 258, in aclose
    await self._network_stream.aclose()
File "/Users/johndoe/geminitest/.venv/lib/python3.13/site-packages/httpcore/_backends/anyio.py", line 53, in aclose
    await self._stream.aclose()
File "/Users/johndoe/geminitest/.venv/lib/python3.13/site-packages/anyio/streams/tls.py", line 201, in aclose
    await self.transport_stream.aclose()
File "/Users/johndoe/geminitest/.venv/lib/python3.13/site-packages/anyio/_backends/_asyncio.py", line 1306, in aclose
    self._transport.close()
    ~~~~~~~~~~~~~~~~~~~~~^^
File "/usr/local/Cellar/[email protected]/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/selector_events.py", line 1202, in close
    super().close()
    ~~~~~~~~~~~~~^^
File "/usr/local/Cellar/[email protected]/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/selector_events.py", line 865, in close
    self._loop.call_soon(self._call_connection_lost, None)
    ~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/Cellar/[email protected]/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/base_events.py", line 829, in call_soon
    self._check_closed()
    ~~~~~~~~~~~~~~~~~~^^
File "/usr/local/Cellar/[email protected]/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/base_events.py", line 552, in _check_closed
    raise RuntimeError('Event loop is closed')

@izzyacademy
Copy link
Contributor

izzyacademy commented Jan 23, 2025

@asaf I just did some research and this is what I found. I hope the explanation makes sense and the suggestions work for you.

The RuntimeError: Event loop is closed error in Streamlit when running code in an async context occurs due to event loop conflicts. This happens because Streamlit itself uses asynchronous programming internally, and improper handling of the event loop can lead to the error. This is very similiar to the issues with Jupyter notebooks addressed already in the docs [1]

  • Streamlit internally manages its own asynchronous event loop for handling user interactions and re-running the app code. If you create or use a conflicting event loop (with asyncio.run() or asyncio.get_event_loop()), it may cause the event loop to close or behave unexpectedly.
  • If an async task is started in one context and tries to use the event loop from another context (after Streamlit's loop is closed), this error can occur.
  • If Pydantic AI creates its own event loops, this may conflict with Streamlit’s loop or attempt to use a closed loop

Possible Fixes

Try to call your main() function directly

# Call  main() directly, without the need for wrapping it with asyncio.run() 
# Let me know if this works for you

import asyncio

import streamlit as st
from pydantic_ai import Agent
from pydantic_ai.messages import ModelRequest, UserPromptPart


async def main():
    st.title("Simple chat")

    # Initialize chat history
    if "messages" not in st.session_state:
        st.session_state.messages = []

    # Display chat messages from history on app rerun
    for message in st.session_state.messages:
        part1 = message.parts[0]
        role = ""
        if isinstance(part1, UserPromptPart):
            role = "user"
        with st.chat_message("human" if role == "user" else "ai"):
            st.markdown(message.parts[0].content)

    # Accept user input
    if prompt := st.chat_input("Say something..."):
        # Display user message in chat message container
        st.chat_message("user").markdown(prompt)

        # Add user message to chat history
        st.session_state.messages.append(
            ModelRequest(parts=[UserPromptPart(content=prompt)])
        )

        # Display assistant response in chat message container
        with st.chat_message("assistant"):
            response_container = st.empty()  # Placeholder for streaming response
            agent = Agent(
                "google-gla:gemini-2.0-flash-exp",
                # "openai:gpt-4o", # works!
                # https://ai.pydantic.dev/results/#structured-result-validation
                result_type=str,  # type: ignore
                system_prompt="you are a nice bot",
            )
            result = await agent.run(prompt)
            response_container.markdown(result.data)


if __name__ == "__main__":
    main()

Alternatively, call nest_asyncio.apply() at the beginning of your code right after the imports as shown in [1]

import asyncio
import nest_asyncio

import streamlit as st
from pydantic_ai import Agent
from pydantic_ai.messages import ModelRequest, UserPromptPart

nest_asyncio.apply()  # Allows nesting of event loops

async def main():
    st.title("Simple chat")

    # Initialize chat history
    if "messages" not in st.session_state:
        st.session_state.messages = []

    # Display chat messages from history on app rerun
    for message in st.session_state.messages:
        part1 = message.parts[0]
        role = ""
        if isinstance(part1, UserPromptPart):
            role = "user"
        with st.chat_message("human" if role == "user" else "ai"):
            st.markdown(message.parts[0].content)

    # Accept user input
    if prompt := st.chat_input("Say something..."):
        # Display user message in chat message container
        st.chat_message("user").markdown(prompt)

        # Add user message to chat history
        st.session_state.messages.append(
            ModelRequest(parts=[UserPromptPart(content=prompt)])
        )

        # Display assistant response in chat message container
        with st.chat_message("assistant"):
            response_container = st.empty()  # Placeholder for streaming response
            agent = Agent(
                "google-gla:gemini-2.0-flash-exp",
                # "openai:gpt-4o", # works!
                # https://ai.pydantic.dev/results/#structured-result-validation
                result_type=str,  # type: ignore
                system_prompt="you are a nice bot",
            )
            result = await agent.run(prompt)
            response_container.markdown(result.data)


if __name__ == "__main__":
    asyncio.run(main())

References

[1] https://ai.pydantic.dev/troubleshooting/#runtimeerror-this-event-loop-is-already-running

@asaf
Copy link
Author

asaf commented Jan 23, 2025

Hey @izzyacademy thanks for answering,

  • I don't think option 1 is feasible as main is async.
  • I find it very odd that the behavior is different when using one model rather another.
  • I already read the reference and tried nesting the loop but I got:
File "/Users/xyz/.venv/lib/python3.13/site-packages/anyio/streams/tls.py", line 147, in _call_sslobject_method
    data = await self.transport_stream.receive()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/xyz/.venv/lib/python3.13/site-packages/anyio/_backends/_asyncio.py", line 1246, in receive
    await self._protocol.read_event.wait()
File "/usr/local/Cellar/[email protected]/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/locks.py", line 210, in wait
    fut = self._get_loop().create_future()
          ~~~~~~~~~~~~~~^^
File "/usr/local/Cellar/[email protected]/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/mixins.py", line 20, in _get_loop
    raise RuntimeError(f'{self!r} is bound to a different event loop')

Best,
Asaf.

@sydney-runkle sydney-runkle added the question Further information is requested label Jan 24, 2025
Copy link

github-actions bot commented Feb 1, 2025

This issue is stale, and will be closed in 3 days if no reply is received.

@github-actions github-actions bot added the Stale label Feb 1, 2025
Copy link

github-actions bot commented Feb 4, 2025

Closing this issue as it has been inactive for 10 days.

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Feb 4, 2025
@jsmidt
Copy link

jsmidt commented Feb 17, 2025

Just wanted to confirm, nest_asyncio.apply() does not work with Gemini models such as 1.5-flash and 2.0-flash, but even without nest_asyncio.apply() it works with other model providers like groq. Here is a different script to see the problem as well.

Even though this was closed, it seems like a bug since it only seems to effect Gemini models. The error for the script below ends with:

File "/opt/homebrew/lib/python3.12/site-packages/anyio/streams/tls.py", line 204, in receive
    data = await self._call_sslobject_method(self._ssl_object.read, max_bytes)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/lib/python3.12/site-packages/anyio/streams/tls.py", line 147, in _call_sslobject_method
    data = await self.transport_stream.receive()
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 1289, in receive
    await self._protocol.read_event.wait()
File "/opt/homebrew/Cellar/[email protected]/3.12.9/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/locks.py", line 209, in wait
    fut = self._get_loop().create_future()
        ^^^^^^^^^^^^^^^^
File "/opt/homebrew/Cellar/[email protected]/3.12.9/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/mixins.py", line 20, in _get_loop
    raise RuntimeError(f'{self!r} is bound to a different event loop')
RuntimeError: <asyncio.locks.Event object at 0x1237ebb30 [unset]> is bound to a different event loop

Here is the script:

import streamlit as st
import asyncio
import nest_asyncio

from pydantic_ai import Agent
from pydantic_ai.messages import ModelResponse, TextPart, ModelRequest, UserPromptPart

# nest_asyncio.apply() 
# Does not work with or without the above uncommented
#agent = Agent("google-gla:gemini-1.5-flash", system_prompt="Be a helpful assistant who responds like a pirate.")

# Works!
agent = Agent('groq:llama-3.3-70b-versatile', system_prompt="Be a helpful assistant who responds like a pirate.")

def display_message_part(part):
    """
    Display a single part of a message in the Streamlit UI.
    Customize how you display system prompts, user prompts,
    tool calls, tool returns, etc.
    """
    # system-prompt
    if part.part_kind == 'system-prompt':
        with st.chat_message("system"):
            st.markdown(f"**System**: {part.content}")
    # user-prompt
    elif part.part_kind == 'user-prompt':
        with st.chat_message("user"):
            st.markdown(part.content)
    # text
    elif part.part_kind == 'text':
        with st.chat_message("assistant"):
            st.markdown(part.content)  


# Streamlit UI
async def run_agent_with_streaming(user_input: str):
    """
    Run the agent with streaming text for the user_input prompt,
    while maintaining the entire conversation in `st.session_state.messages`.
    """

    # Run the agent in a stream
    async with agent.run_stream(
        user_input,
        message_history= st.session_state.messages[:-1],  # pass entire conversation so far
    ) as result:
        # We'll gather partial text to show incrementally
        partial_text = ""
        message_placeholder = st.empty()

        # Render partial text as it arrives
        async for chunk in result.stream_text(delta=True):
            partial_text += chunk
            message_placeholder.markdown(partial_text)

        # Now that the stream is finished, we have a final result.
        # Add new messages from this run, excluding user-prompt messages
        filtered_messages = [msg for msg in result.new_messages() 
                            if not (hasattr(msg, 'parts') and 
                                    any(part.part_kind == 'user-prompt' for part in msg.parts))]
        st.session_state.messages.extend(filtered_messages)

        # Add the final response to the messages
        st.session_state.messages.append(
            ModelResponse(parts=[TextPart(content=partial_text)])
        )


async def main():
    st.title("Archon - Agent Builder")
    st.write("Describe to me an AI agent you want to build and I'll code it for you with Pydantic AI.")

    # Initialize chat history in session state if not present
    if "messages" not in st.session_state:
        st.session_state.messages = []

    # Display all messages from the conversation so far
    # Each message is either a ModelRequest or ModelResponse.
    # We iterate over their parts to decide how to display them.
    for msg in st.session_state.messages:
        if isinstance(msg, ModelRequest) or isinstance(msg, ModelResponse):
            for part in msg.parts:
                display_message_part(part)

    # Chat input for the user
    user_input = st.chat_input("What do you want to build today?")

    if user_input:
        # We append a new request to the conversation explicitly
        st.session_state.messages.append(
            ModelRequest(parts=[UserPromptPart(content=user_input)])
        )
        
        # Display user prompt in the UI
        with st.chat_message("user"):
            st.markdown(user_input)

        # Display the assistant's partial response while streaming
        with st.chat_message("assistant"):
            # Actually run the agent now, streaming the text
            await run_agent_with_streaming(user_input)


if __name__ == "__main__":
    asyncio.run(main())

@rubentorresbonet
Copy link

Also having a hard time mixing streamlit with pydantic-ai + Gemini.

@Kludex
Copy link
Member

Kludex commented Mar 1, 2025

Did you try the last release? @rubentorresbonet

@rubentorresbonet
Copy link

I am on "0.0.30"; I think that is pretty much the latest one? Thanks

@Kludex
Copy link
Member

Kludex commented Mar 1, 2025

Do you have an MRE I can reproduce the issue?

@oscar-broman
Copy link

oscar-broman commented Mar 2, 2025

Hi, I have the same issue.

MRE without streamlit here: https://github.com/oscar-broman/pydantic-ai-gemini-issue-mre

@Kludex Kludex reopened this Mar 2, 2025
@github-actions github-actions bot removed the Stale label Mar 2, 2025
Copy link

This issue is stale, and will be closed in 3 days if no reply is received.

@github-actions github-actions bot added the Stale label Mar 10, 2025
@coderhour
Copy link

Got same issue when using pytest to test.
If only run one test with agent.run_stream, it works.
But if there are more than one test cases with agent.run_stream call, only the first works and the following will be fail with "Event loop is closed" error.

Add the following can work sometimes but can't guarantee.

@pytest.mark.asyncio(loop_scope="session")

@github-actions github-actions bot removed the Stale label Mar 12, 2025
@alperta
Copy link

alperta commented Mar 17, 2025

Got the same issue.

Copy link

This issue is stale, and will be closed in 3 days if no reply is received.

@github-actions github-actions bot added the Stale label Mar 25, 2025
@tjosgood
Copy link

Got the same issue when using gemini 2.0-flash inside a flask[async] endpoint
I believe the flask async model is to run a new event loop for each request in multithreaded mode,
the problem does not seem to occur running locally on windows on the Werkzeug dev server but happens almost every time when running in prod container where the app runs behind nginx

@darioprencipe
Copy link

Hello,

getting the same issue even when using run_sync.

Reproducible example:

from decimal import Decimal
from enum import Enum
from typing import Union

from pydantic import BaseModel, Field
from pydantic_ai.settings import ModelSettings
from pydantic_ai import Agent, BinaryContent
from pydantic_ai.models.gemini import GeminiModel
from pydantic_ai.settings import ModelSettings

class FileValidationError(Exception):
    """Custom exception to be raised when provided file to parse is corrupted"""

    pass

class AttachmentPayment(BaseModel):
    beneficiary: Union[str, None] = Field(
        description="""Person full name or juridicial name of the entity that should receive the payment. 
        If the document is an invoice, it's the supplier name. 
        If it's a salary slip, then it's the employee name"""
    )
    reason: Union[str, None] = Field(
        description="Payment reference within the document. It's the invoice number in case of an invoice or the salary period in case of a salary slip"
    )
    amount: Decimal = Field(
        description="Total amount due, including taxes, to be paid out. Preferably, in EUR currency"
    )


# Initialize Document Reader agent
model = GeminiModel(
    "gemini-2.0-flash",
    provider="google-gla",
    api_key=settings.gemini_api_key,
)
agent = Agent(
    model,
    deps_type=None,
    system_prompt="You will be provided with a document or image. Use the uploaded document or image to extract information requested by the user prompt. Return the information in structured format according to the provided model",
    name="document-parser",
)


def parse_document_g(file_path: str, agent: Agent) -> AttachmentPayment:
    start = time.time()
    path = Path(file_path)
    if not path.is_file():
        raise FileValidationError(
            "provided file path does not resolve to a proper file"
        )

    result = agent.run_sync(
        user_prompt=[
            "You are given a single document like an invoice, a quote or a salary slip. The document is the justification for an upcoming payment. I need to pre-fill the payment data. Parse this document and find the requested information.",
            BinaryContent(data=path.read_bytes(), media_type=get_file_mime_type(path)),
        ],
        result_type=AttachmentPayment,
        model_settings=ModelSettings(temperature=0.0),
    )

    # BinaryContent(data=path.read_bytes(), media_type=get_file_mime_type(path)

    end = time.time()
    print(f"Execution time: {end - start}")
    return result.data


result_1 = parse_document_g("file-invoice-foreign.pdf", agent)

result_2 = parse_document_g("file-invoice.pdf", agent)

Gives me the following error (traceback here):

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/darioprencipe/sibill/code/cumana/cumana/src/parsing/parser.py", line 70, in parse_document_g
    result = agent.run_sync(
             ^^^^^^^^^^^^^^^
  File "/Users/darioprencipe/.local/share/virtualenvs/cumana-7tSU8H4U/lib/python3.12/site-packages/pydantic_ai/agent.py", line 570, in run_sync
    return run_until_complete(
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/darioprencipe/.local/share/virtualenvs/cumana-7tSU8H4U/lib/python3.12/site-packages/pydantic_graph/_utils.py", line 107, in run_until_complete
    return runner.run(coro)
           ^^^^^^^^^^^^^^^^
  File "/Users/darioprencipe/.pyenv/versions/3.12.3/lib/python3.12/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/darioprencipe/.pyenv/versions/3.12.3/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/Users/darioprencipe/.local/share/virtualenvs/cumana-7tSU8H4U/lib/python3.12/site-packages/pydantic_ai/agent.py", line 327, in run
    async for _ in agent_run:
  File "/Users/darioprencipe/.local/share/virtualenvs/cumana-7tSU8H4U/lib/python3.12/site-packages/pydantic_ai/agent.py", line 1414, in __anext__
    next_node = await self._graph_run.__anext__()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/darioprencipe/.local/share/virtualenvs/cumana-7tSU8H4U/lib/python3.12/site-packages/pydantic_graph/graph.py", line 782, in __anext__
    return await self.next(self._next_node)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/darioprencipe/.local/share/virtualenvs/cumana-7tSU8H4U/lib/python3.12/site-packages/pydantic_graph/graph.py", line 760, in next
    self._next_node = await node.run(ctx)
                      ^^^^^^^^^^^^^^^^^^^
  File "/Users/darioprencipe/.local/share/virtualenvs/cumana-7tSU8H4U/lib/python3.12/site-packages/pydantic_ai/_agent_graph.py", line 262, in run
    return await self._make_request(ctx)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/darioprencipe/.local/share/virtualenvs/cumana-7tSU8H4U/lib/python3.12/site-packages/pydantic_ai/_agent_graph.py", line 314, in _make_request
    model_response, request_usage = await ctx.deps.model.request(
                                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/darioprencipe/.local/share/virtualenvs/cumana-7tSU8H4U/lib/python3.12/site-packages/pydantic_ai/models/gemini.py", line 173, in request
    async with self._make_request(
  File "/Users/darioprencipe/.pyenv/versions/3.12.3/lib/python3.12/contextlib.py", line 210, in __aenter__
    return await anext(self.gen)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/darioprencipe/.local/share/virtualenvs/cumana-7tSU8H4U/lib/python3.12/site-packages/pydantic_ai/models/gemini.py", line 267, in _make_request
    async with self.client.stream(
  File "/Users/darioprencipe/.pyenv/versions/3.12.3/lib/python3.12/contextlib.py", line 210, in __aenter__
    return await anext(self.gen)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/darioprencipe/.local/share/virtualenvs/cumana-7tSU8H4U/lib/python3.12/site-packages/httpx/_client.py", line 1583, in stream
    response = await self.send(
               ^^^^^^^^^^^^^^^^
  File "/Users/darioprencipe/.local/share/virtualenvs/cumana-7tSU8H4U/lib/python3.12/site-packages/httpx/_client.py", line 1629, in send
    response = await self._send_handling_auth(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/darioprencipe/.local/share/virtualenvs/cumana-7tSU8H4U/lib/python3.12/site-packages/httpx/_client.py", line 1657, in _send_handling_auth
    response = await self._send_handling_redirects(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/darioprencipe/.local/share/virtualenvs/cumana-7tSU8H4U/lib/python3.12/site-packages/httpx/_client.py", line 1694, in _send_handling_redirects
    response = await self._send_single_request(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/darioprencipe/.local/share/virtualenvs/cumana-7tSU8H4U/lib/python3.12/site-packages/httpx/_client.py", line 1730, in _send_single_request
    response = await transport.handle_async_request(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/darioprencipe/.local/share/virtualenvs/cumana-7tSU8H4U/lib/python3.12/site-packages/httpx/_transports/default.py", line 394, in handle_async_request
    resp = await self._pool.handle_async_request(req)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/darioprencipe/.local/share/virtualenvs/cumana-7tSU8H4U/lib/python3.12/site-packages/httpcore/_async/connection_pool.py", line 256, in handle_async_request
    raise exc from None
  File "/Users/darioprencipe/.local/share/virtualenvs/cumana-7tSU8H4U/lib/python3.12/site-packages/httpcore/_async/connection_pool.py", line 229, in handle_async_request
    await self._close_connections(closing)
  File "/Users/darioprencipe/.local/share/virtualenvs/cumana-7tSU8H4U/lib/python3.12/site-packages/httpcore/_async/connection_pool.py", line 345, in _close_connections
    await connection.aclose()
  File "/Users/darioprencipe/.local/share/virtualenvs/cumana-7tSU8H4U/lib/python3.12/site-packages/httpcore/_async/connection.py", line 173, in aclose
    await self._connection.aclose()
  File "/Users/darioprencipe/.local/share/virtualenvs/cumana-7tSU8H4U/lib/python3.12/site-packages/httpcore/_async/http11.py", line 258, in aclose
    await self._network_stream.aclose()
  File "/Users/darioprencipe/.local/share/virtualenvs/cumana-7tSU8H4U/lib/python3.12/site-packages/httpcore/_backends/anyio.py", line 53, in aclose
    await self._stream.aclose()
  File "/Users/darioprencipe/.local/share/virtualenvs/cumana-7tSU8H4U/lib/python3.12/site-packages/anyio/streams/tls.py", line 216, in aclose
    await self.transport_stream.aclose()
  File "/Users/darioprencipe/.local/share/virtualenvs/cumana-7tSU8H4U/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 1314, in aclose
    self._transport.close()
  File "/Users/darioprencipe/.pyenv/versions/3.12.3/lib/python3.12/asyncio/selector_events.py", line 1210, in close
    super().close()
  File "/Users/darioprencipe/.pyenv/versions/3.12.3/lib/python3.12/asyncio/selector_events.py", line 875, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "/Users/darioprencipe/.pyenv/versions/3.12.3/lib/python3.12/asyncio/base_events.py", line 795, in call_soon
    self._check_closed()
  File "/Users/darioprencipe/.pyenv/versions/3.12.3/lib/python3.12/asyncio/base_events.py", line 541, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

Anyone else experiencing issues with Gemini synchronous agent runs?

Thanks in advance!

@github-actions github-actions bot removed the Stale label Mar 26, 2025
@mcginleyr1
Copy link

mcginleyr1 commented Mar 26, 2025

I get it with an even simpler reproduction

from pydantic_ai.models.gemini import GeminiModel
from pydantic_ai import Agent

GEMINI_PROVIDER = "google-vertex"
model_name: str = "gemini-2.0-flash-001"
model = GeminiModel(model_name, provider=GEMINI_PROVIDER)

llm = Agent(model)

print(llm.run_sync("What is the capital of pennsylvania?"))

print(llm.run_sync("What is the capital of pennsylvania?"))

@Kludex
Copy link
Member

Kludex commented Apr 1, 2025

It seems the problem is related to the fact that we cache the httpx.AsyncTransport.

@Ossian531
Copy link

I was able to reproduce a similar error with this code

import asyncio


from pydantic_ai.models.gemini import GeminiModel
from pydantic_ai.providers.google_vertex import GoogleVertexProvider
from pydantic_ai import Agent




judge = Agent(model=GeminiModel("gemini-2.0-flash", provider=GoogleVertexProvider()))

async def main():
    response = await judge.run("Testing")

    response = judge.run_sync("Testing")
asyncio.run(main())

@teshnizi
Copy link

teshnizi commented Apr 10, 2025

I was able to fix this by creating a global loop and using it for all streams.

Add this globally:

import threading
import asyncio

global_loop = asyncio.new_event_loop()

def start_global_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()


# Start the global event loop in a background thread.
threading.Thread(target=start_global_loop, args=(global_loop,), daemon=True).start()

And use it like this wherever you want to do the async generate:

async def async_generate():
    async for chunk in agent.astream(chat_input):
          yield json.dumps(chunk)

def generate():
    agen = async_generate()
    try:
        while True:
            future = asyncio.run_coroutine_threadsafe(agen.__anext__(), global_loop)
            chunk = future.result()
            yield chunk
    except StopAsyncIteration:
        pass

@codenprogressive
Copy link

@teshnizi I tried your approach it fixed the event loop closed issue but I got an error related Failed to detach context. Any suggestion please?

here is a snippet code to reporduce:

import threading
import asyncio
from pydantic_ai.models.gemini import GeminiModel
from pydantic_ai.providers.google_vertex import GoogleVertexProvider
from pydantic_ai import Agent


global_loop = asyncio.new_event_loop()

def start_global_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

# Start the global event loop in a background thread.
threading.Thread(target=start_global_loop, args=(global_loop,), daemon=True).start()


agent = Agent(model=GeminiModel("gemini-2.0-flash", provider=GoogleVertexProvider()))

async def async_generate(chat_input):
    async with agent.run_stream(chat_input) as result:  
        async for chunk in result.stream_text(): 
            yield chunk

async def generate(chat_input):
    agen = async_generate(chat_input)
    try:
        while True:
            future = asyncio.run_coroutine_threadsafe(agen.__anext__(), global_loop)
            chunk = future.result()
            yield chunk
    except StopAsyncIteration:
        pass

async def main():
    
    print("Response for 'Testing':")
    async for chunk in generate("Testing"):
        print(chunk)

    #Consume the second generator
    print("Response for 'Testing 2':")
    async for chunk in generate("Testing 2"):
        print(chunk)

# Run everything in the same event loop
if __name__ == "__main__":
    asyncio.run(main())

when you run the script you generate the response then you get the error:

Response for 'Testing':
Okay!
Okay! I'm ready to be tested.  What kind of test are you thinking
Okay! I'm ready to be tested.  What kind of test are you thinking of?  To give you the best response, tell me what you'd like
Okay! I'm ready to be tested.  What kind of test are you thinking of?  To give you the best response, tell me what you'd like me to do. For example:

*   **Do you want to test my knowledge?** If so, what subject or topic?
*   **Do you
Okay! I'm ready to be tested.  What kind of test are you thinking of?  To give you the best response, tell me what you'd like me to do. For example:

*   **Do you want to test my knowledge?** If so, what subject or topic?
*   **Do you want to test my reasoning or problem-solving abilities?** Give me a scenario or question.
*   **Do you want to test my ability to generate creative content
Okay! I'm ready to be tested.  What kind of test are you thinking of?  To give you the best response, tell me what you'd like me to do. For example:

*   **Do you want to test my knowledge?** If so, what subject or topic?
*   **Do you want to test my reasoning or problem-solving abilities?** Give me a scenario or question.
*   **Do you want to test my ability to generate creative content?** Give me a prompt.
*   **Do you want to test my coding abilities?** Give me a task or problem.
*   **Do you want to test my conversational abilities and how well I understand context?** Just ask me questions or engage in a conversation.

The more information you give me, the better I can understand your request and provide a relevant and helpful response. Let's begin!
Failed to detach context
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/opentelemetry/context/__init__.py", line 155, in detach
    _RUNTIME_CONTEXT.detach(token)
  File "/usr/local/lib/python3.10/site-packages/opentelemetry/context/contextvars_context.py", line 53, in detach
    self._current_context.reset(token)
ValueError: <Token var=<ContextVar name='current_context' default={} at 0x7f1fe3b82980> at 0x7f1fe1636680> was created in a different Context

@anwarower
Copy link

anwarower commented Apr 14, 2025

i am getting the same issue using Gemini models in async contexts even without using pydantic-ai. Are there any fixes planned for this?

@Kludex
Copy link
Member

Kludex commented Apr 14, 2025

i am getting the same issue using Gemini models in async contexts even without using pydantic-ai. Are there any fixes planned for this?

What do you mean without even pydantic-ai?

@tjosgood
Copy link

I found the same problem exists using native google.genai package the common factor seems to be use of the httpx library. Other forums seem to suggest the problem is with httpx connection pooling creating connections in one asyncio loop and then trying to use that same connection from another asyncio loop.

Suggestions from httpx issue tracker was to add header "Connection: close" to all requests going to gemini endpoints. In the google.genaiI added

genai.Client(api_key=self.api_key, http_options=HttpOptions(headers={"Connection": "close"})

This does fix the issue for me, so maybe it will help with this issue.

I did mention this, and what is causing it about 3 weeks ago 😂

@teshnizi
Copy link

@teshnizi I tried your approach it fixed the event loop closed issue but I got an error related Failed to detach context. Any suggestion please?

here is a snippet code to reporduce:

import threading
import asyncio
from pydantic_ai.models.gemini import GeminiModel
from pydantic_ai.providers.google_vertex import GoogleVertexProvider
from pydantic_ai import Agent

global_loop = asyncio.new_event_loop()

def start_global_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()

Start the global event loop in a background thread.

threading.Thread(target=start_global_loop, args=(global_loop,), daemon=True).start()

agent = Agent(model=GeminiModel("gemini-2.0-flash", provider=GoogleVertexProvider()))

async def async_generate(chat_input):
async with agent.run_stream(chat_input) as result:
async for chunk in result.stream_text():
yield chunk

async def generate(chat_input):
agen = async_generate(chat_input)
try:
while True:
future = asyncio.run_coroutine_threadsafe(agen.anext(), global_loop)
chunk = future.result()
yield chunk
except StopAsyncIteration:
pass

async def main():

print("Response for 'Testing':")
async for chunk in generate("Testing"):
    print(chunk)

#Consume the second generator
print("Response for 'Testing 2':")
async for chunk in generate("Testing 2"):
    print(chunk)

Run everything in the same event loop

if name == "main":
asyncio.run(main())
when you run the script you generate the response then you get the error:

Response for 'Testing':
Okay!
Okay! I'm ready to be tested.  What kind of test are you thinking
Okay! I'm ready to be tested.  What kind of test are you thinking of?  To give you the best response, tell me what you'd like
Okay! I'm ready to be tested.  What kind of test are you thinking of?  To give you the best response, tell me what you'd like me to do. For example:

*   **Do you want to test my knowledge?** If so, what subject or topic?
*   **Do you
Okay! I'm ready to be tested.  What kind of test are you thinking of?  To give you the best response, tell me what you'd like me to do. For example:

*   **Do you want to test my knowledge?** If so, what subject or topic?
*   **Do you want to test my reasoning or problem-solving abilities?** Give me a scenario or question.
*   **Do you want to test my ability to generate creative content
Okay! I'm ready to be tested.  What kind of test are you thinking of?  To give you the best response, tell me what you'd like me to do. For example:

*   **Do you want to test my knowledge?** If so, what subject or topic?
*   **Do you want to test my reasoning or problem-solving abilities?** Give me a scenario or question.
*   **Do you want to test my ability to generate creative content?** Give me a prompt.
*   **Do you want to test my coding abilities?** Give me a task or problem.
*   **Do you want to test my conversational abilities and how well I understand context?** Just ask me questions or engage in a conversation.

The more information you give me, the better I can understand your request and provide a relevant and helpful response. Let's begin!
Failed to detach context
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/opentelemetry/context/__init__.py", line 155, in detach
    _RUNTIME_CONTEXT.detach(token)
  File "/usr/local/lib/python3.10/site-packages/opentelemetry/context/contextvars_context.py", line 53, in detach
    self._current_context.reset(token)
ValueError: <Token var=<ContextVar name='current_context' default={} at 0x7f1fe3b82980> at 0x7f1fe1636680> was created in a different Context

Unfortunately not sure.

@anwarower
Copy link

i am getting the same issue using Gemini models in async contexts even without using pydantic-ai. Are there any fixes planned for this?

What do you mean without even pydantic-ai

I mean I am not using the PydanticAI framework. I had the same issue using async calls with langchain.

Copy link

This issue is stale, and will be closed in 3 days if no reply is received.

@github-actions github-actions bot added the Stale label Apr 22, 2025
Copy link

Closing this issue as it has been inactive for 10 days.

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Apr 26, 2025
@Kludex Kludex reopened this Apr 26, 2025
@github-actions github-actions bot removed the Stale label Apr 27, 2025
Copy link

github-actions bot commented May 5, 2025

This issue is stale, and will be closed in 3 days if no reply is received.

@github-actions github-actions bot added the Stale label May 5, 2025
@Kludex Kludex removed the Stale label May 6, 2025
@bojan2501
Copy link

bojan2501 commented May 8, 2025

Same issue with "Event loop is closed" when using Streamlit, Pydantic AI and Gemini for testing.
Tried all sorts of workaround and nothing works.

@ewjoachim
Copy link

ewjoachim commented May 12, 2025

Working on a tentative PR to fix that, but as far as I can tell, the issue is in _cached_async_http_transport that will return objects with ties to the loop active when it was first called, event if we stop the loop and run in a new loop.

To be precise, if I've made a first call to Gemini in a given even loop, then:

GeminiModel(model_name="gemini-2.0-flash-lite", provider="google-vertex").client._transport._pool._connections[0]._connection._network_stream._stream.transport_stream._transport._loop

will be a pointer to that event loop even if it's closed, and if we try to use the GeminiModel class again (even with a new instance), it will try to reuse that loop.

I think if we make the client/transport cache smarter (e.g. loop dependent), the problem will disappear.

ewjoachim added a commit to ewjoachim/pydantic-ai that referenced this issue May 12, 2025
ewjoachim added a commit to ewjoachim/pydantic-ai that referenced this issue May 12, 2025
…t loops

# Conflicts:
#	pydantic_ai_slim/pydantic_ai/models/__init__.py
@ewjoachim
Copy link

ewjoachim commented May 13, 2025

Just for completeness sake, this issue in httpcore is likely around the same issue: encode/httpcore#659 (comment)

And so is agronholm/anyio#743

  • Pydantic AI chooses to cache httpx transports
  • httpx transports are linked to httpcore connection pools
  • httpcore chooses to cache connections in pools (honnestly, that's what pools are for)
  • httpcore connections use anyio
  • anyio chooses to cache the loop (though it probably makes sense that a given tcp connection cannot be used on a different loop just like that)

If anyone in this list didn't do that, the problem wouldn't not exist :D

@ewjoachim
Copy link

In both issues, anyio and httpx maintainers advised towards improving connection (& connection pool) management, strongly hinting towards using async context manager.

That said, I don't know how that might work with PydanticAI's API.

But it would make sense to consider that defining things like the HTTP connection object & transport shouldn't happen before we're in an event loop.

Copy link

This issue is stale, and will be closed in 3 days if no reply is received.

@github-actions github-actions bot added the Stale label May 21, 2025
@ewjoachim
Copy link

(I think this issue is still relevant. Especially since there's a PR)

@Kludex Kludex removed the Stale label May 23, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
more info More information required question Further information is requested
Projects
None yet
Development

Successfully merging a pull request may close this issue.