Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 34 additions & 6 deletions python/cocoindex/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
import threading
import asyncio
import inspect
from typing import Any, Callable, Coroutine, TypeVar, Awaitable
import warnings

from typing import Any, Callable, Awaitable, TypeVar, Coroutine

T = TypeVar("T")

Expand All @@ -24,15 +25,42 @@ def event_loop(self) -> asyncio.AbstractEventLoop:
"""Get the event loop for the cocoindex library."""
with self._lock:
if self._event_loop is None:
self._event_loop = asyncio.new_event_loop()
threading.Thread(
target=self._event_loop.run_forever, daemon=True
).start()
loop = asyncio.new_event_loop()
self._event_loop = loop

def _runner(l: asyncio.AbstractEventLoop) -> None:
asyncio.set_event_loop(l)
l.run_forever()

threading.Thread(target=_runner, args=(loop,), daemon=True).start()
return self._event_loop

def run(self, coro: Coroutine[Any, Any, T]) -> T:
"""Run a coroutine in the event loop, blocking until it finishes. Return its result."""
return asyncio.run_coroutine_threadsafe(coro, self.event_loop).result()
try:
running_loop = asyncio.get_running_loop()
except RuntimeError:
running_loop = None

loop = self.event_loop

if running_loop is not None:
if running_loop is loop:
raise RuntimeError(
"CocoIndex sync API was called from inside CocoIndex's async context. "
"Use the async variant of this method instead."
)
warnings.warn(
"CocoIndex sync API was called inside an existing event loop. "
"This may block other tasks. Prefer the async method.",
RuntimeWarning,
stacklevel=2,
)
fut = asyncio.run_coroutine_threadsafe(coro, loop)
return fut.result()

fut = asyncio.run_coroutine_threadsafe(coro, loop)
return fut.result()


execution_context = _ExecutionContext()
Expand Down
Loading