From b87c57176b0f8008fb34d355ccbd50e3eb34956d Mon Sep 17 00:00:00 2001 From: Elias Freider Date: Fri, 23 Aug 2024 08:38:06 +0000 Subject: [PATCH 1/2] Make synchronizer a context manager for managing event loop life cycle explicitly Can also be used lazily for usages outside of context manager --- synchronicity/synchronizer.py | 15 +++++++- test/_shutdown.py | 8 ++++ test/_shutdown_ctxmgr.py | 36 ++++++++++++++++++ test/shutdown_test.py | 70 +++++++++++++++++++++++++++++++---- 4 files changed, 121 insertions(+), 8 deletions(-) create mode 100644 test/_shutdown_ctxmgr.py diff --git a/synchronicity/synchronizer.py b/synchronicity/synchronizer.py index 8ea3aa1..04effed 100644 --- a/synchronicity/synchronizer.py +++ b/synchronicity/synchronizer.py @@ -20,6 +20,10 @@ from .exceptions import UserCodeException, unwrap_coro_exception, wrap_coro_exception from .interface import Interface +import logging +logger = logging.getLogger("synchronicity") + + _BUILTIN_ASYNC_METHODS = { "__aiter__": "__iter__", "__aenter__": "__enter__", @@ -136,13 +140,19 @@ def __init__( self.create_blocking(typing.Generic, "WrappedGeneric", __name__) - atexit.register(self._close_loop) + atexit.register(self._close_loop) # shut down at container exit if we haven't already _PICKLE_ATTRS = [ "_multiwrap_warning", "_async_leakage_warning", ] + def __enter__(self): + self._start_loop() + + def __exit__(self, *args, **kwargs): + self._close_loop() + def __getstate__(self): return dict([(attr, getattr(self, attr)) for attr in self._PICKLE_ATTRS]) @@ -155,6 +165,7 @@ def _start_loop(self): if self._loop and self._loop.is_running(): return self._loop + logger.debug("starting up synchronicity event loop") is_ready = threading.Event() def thread_inner(): @@ -182,9 +193,11 @@ def _close_loop(self): if self._thread is not None: if not self._loop.is_closed(): # This also serves the purpose of waking up an idle loop + logger.debug("start shutting down synchronicity event loop") self._loop.call_soon_threadsafe(self._stopping.set) self._thread.join() self._thread = None + logger.debug("finished shutting down synchronicity event loop") def __del__(self): self._close_loop() diff --git a/test/_shutdown.py b/test/_shutdown.py index aaae0a6..fa7a879 100644 --- a/test/_shutdown.py +++ b/test/_shutdown.py @@ -1,6 +1,12 @@ import asyncio +import logging +import sys from synchronicity import Synchronizer +from synchronicity.synchronizer import logger + +logger.addHandler(logging.StreamHandler(sys.stdout)) +logger.setLevel(logging.DEBUG) async def run(): @@ -19,7 +25,9 @@ async def run(): s = Synchronizer() +print("calling wrapped func") try: s.create_blocking(run)() except KeyboardInterrupt: pass +print("eof") diff --git a/test/_shutdown_ctxmgr.py b/test/_shutdown_ctxmgr.py new file mode 100644 index 0000000..13b0c01 --- /dev/null +++ b/test/_shutdown_ctxmgr.py @@ -0,0 +1,36 @@ +import asyncio +import logging +import sys + +from synchronicity import Synchronizer +from synchronicity.synchronizer import logger + +logger.addHandler(logging.StreamHandler(sys.stdout)) +logger.setLevel(logging.DEBUG) + + +async def run(): + try: + while True: + print("running") + await asyncio.sleep(0.3) + except asyncio.CancelledError: + print("cancelled") + raise + finally: + print("stopping") + await asyncio.sleep(0.1) + print("exiting") + + +s = Synchronizer() +wrapped_func = s.create_blocking(run) + +try: + with s: + print("calling wrapped func") + wrapped_func() +except KeyboardInterrupt: + pass + +print("eof") diff --git a/test/shutdown_test.py b/test/shutdown_test.py index fadff2c..05c2d42 100644 --- a/test/shutdown_test.py +++ b/test/shutdown_test.py @@ -2,6 +2,16 @@ import signal import subprocess import sys +from typing import List + +def assert_prints(p: subprocess.Popen, *messages: str): + for msg in messages: + line = p.stdout.readline() + if not line: + print("STDERR") + print(p.stderr.read()) + raise Exception("Unexpected empty line in output, see stderr:") + assert line == msg + "\n" def test_shutdown(): @@ -12,12 +22,58 @@ def test_shutdown(): stdout=subprocess.PIPE, stderr=subprocess.PIPE, env={"PYTHONUNBUFFERED": "1"}, + encoding="utf8" + ) + assert_prints( + p, + "calling wrapped func", + "starting up synchronicity event loop", + *(["running"] * 3) # wait for 3 "running" messages before siginting the process ) - for i in range(3): # this number doesn't matter, it's a while loop - assert p.stdout.readline() == b"running\n" p.send_signal(signal.SIGINT) - assert p.stdout.readline() == b"cancelled\n" - assert p.stdout.readline() == b"stopping\n" - assert p.stdout.readline() == b"exiting\n" - stderr_content = p.stderr.read() - assert b"Traceback" not in stderr_content + assert_prints( + p, + "eof", + "start shutting down synchronicity event loop", + "cancelled", + "stopping", + "exiting", + "finished shutting down synchronicity event loop", + ) + out, err = p.communicate() + assert out == "" + assert err == "" + assert p.returncode == 0 + + +def test_shutdown_ctx_mgr(): + # We run it in a separate process so we can simulate interrupting it + fn = os.path.join(os.path.dirname(__file__), "_shutdown_ctxmgr.py") + p = subprocess.Popen( + [sys.executable, fn], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env={"PYTHONUNBUFFERED": "1"}, + encoding="utf8" + ) + assert_prints( + p, + "starting up synchronicity event loop", # start up loop explicitly + "calling wrapped func", + *(["running"] * 3) + ) + p.send_signal(signal.SIGINT) + + assert_prints( + p, + "start shutting down synchronicity event loop", + "cancelled", + "stopping", + "exiting", + "finished shutting down synchronicity event loop", # shut down loop explicitly, + "eof", + ) + out, err = p.communicate() + assert out == "" + assert err == "" + assert p.returncode == 0 From b618bbeb1f9b8231c21d759c32c0bd41945d3a73 Mon Sep 17 00:00:00 2001 From: Elias Freider Date: Fri, 23 Aug 2024 08:40:57 +0000 Subject: [PATCH 2/2] lint --- synchronicity/synchronizer.py | 4 ++-- test/_shutdown_ctxmgr.py | 2 +- test/shutdown_test.py | 12 ++++++------ 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/synchronicity/synchronizer.py b/synchronicity/synchronizer.py index 04effed..133f5d5 100644 --- a/synchronicity/synchronizer.py +++ b/synchronicity/synchronizer.py @@ -4,6 +4,7 @@ import contextlib import functools import inspect +import logging import platform import threading import typing @@ -20,7 +21,6 @@ from .exceptions import UserCodeException, unwrap_coro_exception, wrap_coro_exception from .interface import Interface -import logging logger = logging.getLogger("synchronicity") @@ -149,7 +149,7 @@ def __init__( def __enter__(self): self._start_loop() - + def __exit__(self, *args, **kwargs): self._close_loop() diff --git a/test/_shutdown_ctxmgr.py b/test/_shutdown_ctxmgr.py index 13b0c01..fc1ae1f 100644 --- a/test/_shutdown_ctxmgr.py +++ b/test/_shutdown_ctxmgr.py @@ -28,7 +28,7 @@ async def run(): try: with s: - print("calling wrapped func") + print("calling wrapped func") wrapped_func() except KeyboardInterrupt: pass diff --git a/test/shutdown_test.py b/test/shutdown_test.py index 05c2d42..3b37dfd 100644 --- a/test/shutdown_test.py +++ b/test/shutdown_test.py @@ -2,7 +2,7 @@ import signal import subprocess import sys -from typing import List + def assert_prints(p: subprocess.Popen, *messages: str): for msg in messages: @@ -22,13 +22,13 @@ def test_shutdown(): stdout=subprocess.PIPE, stderr=subprocess.PIPE, env={"PYTHONUNBUFFERED": "1"}, - encoding="utf8" + encoding="utf8", ) assert_prints( p, "calling wrapped func", "starting up synchronicity event loop", - *(["running"] * 3) # wait for 3 "running" messages before siginting the process + *(["running"] * 3), # wait for 3 "running" messages before siginting the process ) p.send_signal(signal.SIGINT) assert_prints( @@ -54,16 +54,16 @@ def test_shutdown_ctx_mgr(): stdout=subprocess.PIPE, stderr=subprocess.PIPE, env={"PYTHONUNBUFFERED": "1"}, - encoding="utf8" + encoding="utf8", ) assert_prints( p, "starting up synchronicity event loop", # start up loop explicitly "calling wrapped func", - *(["running"] * 3) + *(["running"] * 3), ) p.send_signal(signal.SIGINT) - + assert_prints( p, "start shutting down synchronicity event loop",