diff --git a/synchronicity/synchronizer.py b/synchronicity/synchronizer.py index 8ea3aa1..133f5d5 100644 --- a/synchronicity/synchronizer.py +++ b/synchronicity/synchronizer.py @@ -4,6 +4,7 @@ import contextlib import functools import inspect +import logging import platform import threading import typing @@ -20,6 +21,9 @@ from .exceptions import UserCodeException, unwrap_coro_exception, wrap_coro_exception from .interface import Interface +logger = logging.getLogger("synchronicity") + + _BUILTIN_ASYNC_METHODS = { "__aiter__": "__iter__", "__aenter__": "__enter__", @@ -136,13 +140,19 @@ def __init__( self.create_blocking(typing.Generic, "WrappedGeneric", __name__) - atexit.register(self._close_loop) + atexit.register(self._close_loop) # shut down at container exit if we haven't already _PICKLE_ATTRS = [ "_multiwrap_warning", "_async_leakage_warning", ] + def __enter__(self): + self._start_loop() + + def __exit__(self, *args, **kwargs): + self._close_loop() + def __getstate__(self): return dict([(attr, getattr(self, attr)) for attr in self._PICKLE_ATTRS]) @@ -155,6 +165,7 @@ def _start_loop(self): if self._loop and self._loop.is_running(): return self._loop + logger.debug("starting up synchronicity event loop") is_ready = threading.Event() def thread_inner(): @@ -182,9 +193,11 @@ def _close_loop(self): if self._thread is not None: if not self._loop.is_closed(): # This also serves the purpose of waking up an idle loop + logger.debug("start shutting down synchronicity event loop") self._loop.call_soon_threadsafe(self._stopping.set) self._thread.join() self._thread = None + logger.debug("finished shutting down synchronicity event loop") def __del__(self): self._close_loop() diff --git a/test/_shutdown.py b/test/_shutdown.py index aaae0a6..fa7a879 100644 --- a/test/_shutdown.py +++ b/test/_shutdown.py @@ -1,6 +1,12 @@ import asyncio +import logging +import sys from synchronicity import Synchronizer +from synchronicity.synchronizer import logger + +logger.addHandler(logging.StreamHandler(sys.stdout)) +logger.setLevel(logging.DEBUG) async def run(): @@ -19,7 +25,9 @@ async def run(): s = Synchronizer() +print("calling wrapped func") try: s.create_blocking(run)() except KeyboardInterrupt: pass +print("eof") diff --git a/test/_shutdown_ctxmgr.py b/test/_shutdown_ctxmgr.py new file mode 100644 index 0000000..fc1ae1f --- /dev/null +++ b/test/_shutdown_ctxmgr.py @@ -0,0 +1,36 @@ +import asyncio +import logging +import sys + +from synchronicity import Synchronizer +from synchronicity.synchronizer import logger + +logger.addHandler(logging.StreamHandler(sys.stdout)) +logger.setLevel(logging.DEBUG) + + +async def run(): + try: + while True: + print("running") + await asyncio.sleep(0.3) + except asyncio.CancelledError: + print("cancelled") + raise + finally: + print("stopping") + await asyncio.sleep(0.1) + print("exiting") + + +s = Synchronizer() +wrapped_func = s.create_blocking(run) + +try: + with s: + print("calling wrapped func") + wrapped_func() +except KeyboardInterrupt: + pass + +print("eof") diff --git a/test/shutdown_test.py b/test/shutdown_test.py index fadff2c..3b37dfd 100644 --- a/test/shutdown_test.py +++ b/test/shutdown_test.py @@ -4,6 +4,16 @@ import sys +def assert_prints(p: subprocess.Popen, *messages: str): + for msg in messages: + line = p.stdout.readline() + if not line: + print("STDERR") + print(p.stderr.read()) + raise Exception("Unexpected empty line in output, see stderr:") + assert line == msg + "\n" + + def test_shutdown(): # We run it in a separate process so we can simulate interrupting it fn = os.path.join(os.path.dirname(__file__), "_shutdown.py") @@ -12,12 +22,58 @@ def test_shutdown(): stdout=subprocess.PIPE, stderr=subprocess.PIPE, env={"PYTHONUNBUFFERED": "1"}, + encoding="utf8", + ) + assert_prints( + p, + "calling wrapped func", + "starting up synchronicity event loop", + *(["running"] * 3), # wait for 3 "running" messages before siginting the process ) - for i in range(3): # this number doesn't matter, it's a while loop - assert p.stdout.readline() == b"running\n" p.send_signal(signal.SIGINT) - assert p.stdout.readline() == b"cancelled\n" - assert p.stdout.readline() == b"stopping\n" - assert p.stdout.readline() == b"exiting\n" - stderr_content = p.stderr.read() - assert b"Traceback" not in stderr_content + assert_prints( + p, + "eof", + "start shutting down synchronicity event loop", + "cancelled", + "stopping", + "exiting", + "finished shutting down synchronicity event loop", + ) + out, err = p.communicate() + assert out == "" + assert err == "" + assert p.returncode == 0 + + +def test_shutdown_ctx_mgr(): + # We run it in a separate process so we can simulate interrupting it + fn = os.path.join(os.path.dirname(__file__), "_shutdown_ctxmgr.py") + p = subprocess.Popen( + [sys.executable, fn], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env={"PYTHONUNBUFFERED": "1"}, + encoding="utf8", + ) + assert_prints( + p, + "starting up synchronicity event loop", # start up loop explicitly + "calling wrapped func", + *(["running"] * 3), + ) + p.send_signal(signal.SIGINT) + + assert_prints( + p, + "start shutting down synchronicity event loop", + "cancelled", + "stopping", + "exiting", + "finished shutting down synchronicity event loop", # shut down loop explicitly, + "eof", + ) + out, err = p.communicate() + assert out == "" + assert err == "" + assert p.returncode == 0