diff --git a/synchronicity/synchronizer.py b/synchronicity/synchronizer.py index dce78af..249826a 100644 --- a/synchronicity/synchronizer.py +++ b/synchronicity/synchronizer.py @@ -4,10 +4,13 @@ import contextlib import functools import inspect +import logging import platform import threading +import time import typing import warnings +from contextlib import asynccontextmanager from typing import ForwardRef, Optional from synchronicity.annotations import evaluated_annotation @@ -18,6 +21,9 @@ from .exceptions import UserCodeException, unwrap_coro_exception, wrap_coro_exception from .interface import Interface +logger = logging.getLogger("synchronicity") + + _BUILTIN_ASYNC_METHODS = { "__aiter__": "__iter__", "__aenter__": "__enter__", @@ -82,6 +88,24 @@ def _type_requires_aio_usage(annotation, declaration_module): return False + +@asynccontextmanager +async def loop_delay_monitor(monitor_period: float, warning_threshold: float): + async def monitor(): + while 1: + t0 = time.monotonic() + await asyncio.sleep(monitor_period) + duration = time.monotonic() - t0 + delay = duration - monitor_period + if delay >= warning_threshold: + logger.warning(f"Detected an event loop delay of {delay:.2f}s") + + loop_task = asyncio.create_task(monitor()) + try: + yield + finally: + loop_task.cancel() + def should_have_aio_interface(func): # determines if a blocking function gets an .aio attribute with an async interface to the function or not if inspect.iscoroutinefunction(func) or inspect.isasyncgenfunction(func): @@ -102,7 +126,11 @@ def __init__( self, multiwrap_warning=False, async_leakage_warning=True, + loop_delay_monitor_period = None, # seconds + loop_delay_monitor_threshold = None # seconds ): + self._loop_delay_monitor_period = loop_delay_monitor_period + self._loop_delay_monitor_threshold = loop_delay_monitor_threshold self._multiwrap_warning = multiwrap_warning self._async_leakage_warning = async_leakage_warning self._loop = None @@ -156,7 +184,11 @@ async def loop_inner(): self._loop = asyncio.get_running_loop() self._stopping = asyncio.Event() is_ready.set() - await self._stopping.wait() # wait until told to stop + if self._loop_delay_monitor_period: + async with loop_delay_monitor(self._loop_delay_monitor_period, self._loop_delay_monitor_threshold): + await self._stopping.wait() # wait until told to stop + else: + await self._stopping.wait() try: asyncio.run(loop_inner()) @@ -168,6 +200,7 @@ async def loop_inner(): raise exc self._thread = threading.Thread(target=thread_inner, daemon=True) + self._thread.name = f"synchronicity-thread ({id(self)})" self._thread.start() is_ready.wait() # TODO: this might block for a very short time return self._loop