From 317cb1c6621139e0e633ace5c94ccd7096bec9a1 Mon Sep 17 00:00:00 2001 From: Zac Hatfield-Dodds Date: Tue, 19 Nov 2024 17:43:27 -0800 Subject: [PATCH 01/23] Update _channel.py --- src/trio/_channel.py | 117 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 116 insertions(+), 1 deletion(-) diff --git a/src/trio/_channel.py b/src/trio/_channel.py index 6410d9120c..d325bb6be0 100644 --- a/src/trio/_channel.py +++ b/src/trio/_channel.py @@ -1,6 +1,9 @@ from __future__ import annotations from collections import OrderedDict, deque +from collections.abc import AsyncGenerator, Callable +from contextlib import AbstractAsyncContextManager, asynccontextmanager +from functools import wraps from math import inf from typing import ( TYPE_CHECKING, @@ -19,7 +22,23 @@ if TYPE_CHECKING: from types import TracebackType - from typing_extensions import Self + from typing_extensions import ParamSpec, Self + + P = ParamSpec("P") + +try: + from contextlib import aclosing # new in Python 3.10 +except ImportError: + + class aclosing: + def __init__(self, aiter): + self._aiter = aiter + + async def __aenter__(self): + return self._aiter + + async def __aexit__(self, *args): + await self._aiter.aclose() def _open_memory_channel( @@ -440,3 +459,99 @@ async def aclose(self) -> None: See `MemoryReceiveChannel.close`.""" self.close() await trio.lowlevel.checkpoint() + + +def background_with_channel(max_buffer_size: float = 0) -> Callable[ + [ + Callable[P, AsyncGenerator[T, None]], + ], + Callable[P, AbstractAsyncContextManager[trio.MemoryReceiveChannel[T]]], +]: + """Decorate an async generator function to make it cancellation-safe. + + The `yield` keyword offers a very convenient way to write iterators... + which makes it really unfortunate that async generators are so difficult + to call correctly. Yielding from the inside of a cancel scope or a nursery + to the outside `violates structured concurrency `_ + with consequences explainined in :pep:`789`. Even then, resource cleanup + errors remain common (:pep:`533`) unless you wrap every call in + :func:`~contextlib.aclosing`. + + This decorator gives you the best of both worlds: with careful exception + handling and a background task we preserve structured concurrency by + offering only the safe interface, and you can still write your iterables + with the convenience of `yield`. For example: + + @background_with_channel() + async def my_async_iterable(arg, *, kwarg=True): + while ...: + item = await ... + yield item + + async with my_async_iterable(...) as recv_chan: + async for item in recv_chan: + ... + + While the combined async-with-async-for can be inconvenient at first, + the context manager is indispensible for both correctness and for prompt + cleanup of resources. + """ + # Perhaps a future PEP will adopt `async with for` syntax, like + # https://coconut.readthedocs.io/en/master/DOCS.html#async-with-for + + def decorator( + fn: Callable[P, AsyncGenerator[T, None]], + ) -> Callable[P, AbstractAsyncContextManager[trio.MemoryReceiveChannel[T]]]: + @asynccontextmanager + @wraps(fn) + async def context_manager( + *args: P.args, **kwargs: P.kwargs + ) -> AsyncGenerator[trio.MemoryReceiveChannel[T], None]: + send_chan, recv_chan = trio.open_memory_channel[T](max_buffer_size) + async with trio.open_nursery() as nursery: + ait = fn(*args, **kwargs) + nursery.start_soon(_move_elems_to_channel, ait, send_chan) + async with recv_chan: + yield recv_chan + # Return promptly, without waiting for `await anext(ait)` + nursery.cancel_scope.cancel() + + return context_manager + + async def _move_elems_to_channel( + aiterable: AsyncGenerator[T, None], + send_chan: trio.MemorySendChannel[T], + ) -> None: + async with send_chan, aclosing(aiterable) as agen: + # Outer loop manually advances the aiterable; we can't use async-for because + # we're going to use `.asend(err)` to forward errors back to the generator. + while True: + # Get the next value from `agen`; return if exhausted + try: + value: T = await agen.__anext__() + except StopAsyncIteration: + return + # Inner loop ensures that we send values which are yielded after + # catching an exception which we sent back to the generator. + while True: + try: + # Send the value to the channel + await send_chan.send(value) + break + except trio.BrokenResourceError: + # Closing the corresponding receive channel should cause + # a clean shutdown of the generator. + return + except trio.Cancelled: + raise + except BaseException as error_from_send: + # Forward any other errors to the generator. Exit cleanly + # if exhausted; otherwise it was handled in there and we + # can continue the inner loop with this value. + try: + value = await agen.athrow(error_from_send) + except StopAsyncIteration: + return + # Phew. Context managers all cleaned up, we're done here. + + return decorator From b0b8b02bdc5ce0292804df0bd5ab51c225382428 Mon Sep 17 00:00:00 2001 From: jakkdl Date: Fri, 31 Jan 2025 13:38:04 +0100 Subject: [PATCH 02/23] add tests, and some types --- src/trio/__init__.py | 1 + src/trio/_channel.py | 33 ++++++++----- src/trio/_tests/test_channel.py | 84 +++++++++++++++++++++++++++++++-- 3 files changed, 103 insertions(+), 15 deletions(-) diff --git a/src/trio/__init__.py b/src/trio/__init__.py index 0b675ce473..20751852c9 100644 --- a/src/trio/__init__.py +++ b/src/trio/__init__.py @@ -28,6 +28,7 @@ MemoryChannelStatistics as MemoryChannelStatistics, MemoryReceiveChannel as MemoryReceiveChannel, MemorySendChannel as MemorySendChannel, + background_with_channel as background_with_channel, open_memory_channel as open_memory_channel, ) from ._core import ( diff --git a/src/trio/_channel.py b/src/trio/_channel.py index d325bb6be0..cd7ce1fa81 100644 --- a/src/trio/_channel.py +++ b/src/trio/_channel.py @@ -1,13 +1,15 @@ from __future__ import annotations +import sys from collections import OrderedDict, deque -from collections.abc import AsyncGenerator, Callable from contextlib import AbstractAsyncContextManager, asynccontextmanager from functools import wraps from math import inf from typing import ( TYPE_CHECKING, Generic, + Protocol, + TypeVar, ) import attrs @@ -20,24 +22,30 @@ from ._util import NoPublicConstructor, final, generic_function if TYPE_CHECKING: + from collections.abc import AsyncGenerator, Awaitable, Callable from types import TracebackType from typing_extensions import ParamSpec, Self P = ParamSpec("P") -try: +if sys.version_info >= (3, 10): from contextlib import aclosing # new in Python 3.10 -except ImportError: +else: + + class _SupportsAclose(Protocol): + def aclose(self) -> Awaitable[object]: ... + + _SupportsAcloseT = TypeVar("_SupportsAcloseT", bound=_SupportsAclose) - class aclosing: - def __init__(self, aiter): - self._aiter = aiter + class aclosing(AbstractAsyncContextManager[_SupportsAcloseT, None]): + def __init__(self, thing: _SupportsAcloseT) -> None: + self._aiter = thing - async def __aenter__(self): + async def __aenter__(self) -> _SupportsAcloseT: return self._aiter - async def __aexit__(self, *args): + async def __aexit__(self, *exc_info: object) -> None: await self._aiter.aclose() @@ -471,14 +479,14 @@ def background_with_channel(max_buffer_size: float = 0) -> Callable[ The `yield` keyword offers a very convenient way to write iterators... which makes it really unfortunate that async generators are so difficult - to call correctly. Yielding from the inside of a cancel scope or a nursery + to call correctly. Yielding from the inside of a cancel scope or a nursery to the outside `violates structured concurrency `_ - with consequences explainined in :pep:`789`. Even then, resource cleanup + with consequences explained in :pep:`789`. Even then, resource cleanup errors remain common (:pep:`533`) unless you wrap every call in :func:`~contextlib.aclosing`. This decorator gives you the best of both worlds: with careful exception - handling and a background task we preserve structured concurrency by + handling and a background task we preserve structured concurrency by offering only the safe interface, and you can still write your iterables with the convenience of `yield`. For example: @@ -493,7 +501,7 @@ async def my_async_iterable(arg, *, kwarg=True): ... While the combined async-with-async-for can be inconvenient at first, - the context manager is indispensible for both correctness and for prompt + the context manager is indispensable for both correctness and for prompt cleanup of resources. """ # Perhaps a future PEP will adopt `async with for` syntax, like @@ -545,6 +553,7 @@ async def _move_elems_to_channel( except trio.Cancelled: raise except BaseException as error_from_send: + # TODO: add test case ... but idk how # Forward any other errors to the generator. Exit cleanly # if exhausted; otherwise it was handled in there and we # can continue the inner loop with this value. diff --git a/src/trio/_tests/test_channel.py b/src/trio/_tests/test_channel.py index 104b17640f..58839213a6 100644 --- a/src/trio/_tests/test_channel.py +++ b/src/trio/_tests/test_channel.py @@ -1,13 +1,16 @@ from __future__ import annotations -from typing import Union +from typing import TYPE_CHECKING, Union import pytest import trio -from trio import EndOfChannel, open_memory_channel +from trio import EndOfChannel, background_with_channel, open_memory_channel -from ..testing import assert_checkpoints, wait_all_tasks_blocked +from ..testing import RaisesGroup, assert_checkpoints, wait_all_tasks_blocked + +if TYPE_CHECKING: + from collections.abc import AsyncGenerator async def test_channel() -> None: @@ -411,3 +414,78 @@ async def do_send(s: trio.MemorySendChannel[int], v: int) -> None: assert await r.receive() == 1 with pytest.raises(trio.WouldBlock): r.receive_nowait() + + +async def test_background_with_channel() -> None: + @background_with_channel() + async def agen() -> AsyncGenerator[int]: + yield 1 + await trio.sleep_forever() # simulate deadlock + yield 2 + + async with agen() as recv_chan: + async for x in recv_chan: + assert x == 1 + break # exit, cleanup should be quick + # comment `nursery.cancel_scope.cancel()` and it hangs + + +async def test_background_with_channel_exhaust() -> None: + @background_with_channel() + async def agen() -> AsyncGenerator[int]: + yield 1 + + async with agen() as recv_chan: + async for x in recv_chan: + assert x == 1 + + +async def test_background_with_channel_broken_resource() -> None: + @background_with_channel() + async def agen() -> AsyncGenerator[int]: + yield 1 + yield 2 + + async with agen() as recv_chan: + assert await recv_chan.__anext__() == 1 + + # close the receiving channel + await recv_chan.aclose() + + # trying to get the next element errors + with pytest.raises(trio.ClosedResourceError): + await recv_chan.__anext__() + + # but we don't get an error on exit of the cm + + +async def test_background_with_channel_cancelled() -> None: + with trio.CancelScope() as cs: + + @background_with_channel() + async def agen() -> AsyncGenerator[int]: + yield 1 + yield 1 + + async with agen(): + cs.cancel() + + +async def test_background_with_channel_waitwhat() -> None: + @background_with_channel() + async def agen() -> AsyncGenerator[int]: + yield 1 + # this exception sometimes disappear, and I don't know why + # gc? trio randomness? + # idk if it's gonna show up in CI, but I have like a 50% shot of failing + # when running the test case by itself + raise ValueError("oae") + + with RaisesGroup(ValueError): + async with agen() as recv_chan: + async for x in recv_chan: + assert x == 1 + + +# TODO: I'm also failing to figure out how to test max_buffer_size +# and/or what changing it even achieves From 8584cff459e648fd36a36b57c0b2a763fda133bd Mon Sep 17 00:00:00 2001 From: A5rocks Date: Tue, 11 Feb 2025 15:57:45 +0900 Subject: [PATCH 03/23] Fix race condition --- src/trio/_channel.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/trio/_channel.py b/src/trio/_channel.py index cd7ce1fa81..d01ddd04dd 100644 --- a/src/trio/_channel.py +++ b/src/trio/_channel.py @@ -519,7 +519,7 @@ async def context_manager( async with trio.open_nursery() as nursery: ait = fn(*args, **kwargs) nursery.start_soon(_move_elems_to_channel, ait, send_chan) - async with recv_chan: + async with send_chan, recv_chan: # keep the channel open yield recv_chan # Return promptly, without waiting for `await anext(ait)` nursery.cancel_scope.cancel() @@ -530,7 +530,7 @@ async def _move_elems_to_channel( aiterable: AsyncGenerator[T, None], send_chan: trio.MemorySendChannel[T], ) -> None: - async with send_chan, aclosing(aiterable) as agen: + async with aclosing(aiterable) as agen: # Outer loop manually advances the aiterable; we can't use async-for because # we're going to use `.asend(err)` to forward errors back to the generator. while True: From 274755fdbeeb900ff4a95187f38bea890bb78ce2 Mon Sep 17 00:00:00 2001 From: jakkdl <11260241+jakkdl@users.noreply.github.com> Date: Thu, 13 Feb 2025 17:24:55 +0100 Subject: [PATCH 04/23] fix race condition + exception eating, make sure we always clean up, add buffer_size tests, remove unused code --- src/trio/_channel.py | 42 +++++++++++------------------ src/trio/_tests/test_channel.py | 47 +++++++++++++++++++++++++++------ 2 files changed, 54 insertions(+), 35 deletions(-) diff --git a/src/trio/_channel.py b/src/trio/_channel.py index d01ddd04dd..7ac85c8d55 100644 --- a/src/trio/_channel.py +++ b/src/trio/_channel.py @@ -469,7 +469,7 @@ async def aclose(self) -> None: await trio.lowlevel.checkpoint() -def background_with_channel(max_buffer_size: float = 0) -> Callable[ +def background_with_channel(max_buffer_size: float = 1) -> Callable[ [ Callable[P, AsyncGenerator[T, None]], ], @@ -518,8 +518,10 @@ async def context_manager( send_chan, recv_chan = trio.open_memory_channel[T](max_buffer_size) async with trio.open_nursery() as nursery: ait = fn(*args, **kwargs) - nursery.start_soon(_move_elems_to_channel, ait, send_chan) - async with send_chan, recv_chan: # keep the channel open + # nursery.start to make sure that we will clean up send_chan & ait + await nursery.start(_move_elems_to_channel, ait, send_chan) + # async with recv_chan could eat exceptions, so use sync cm + with recv_chan: yield recv_chan # Return promptly, without waiting for `await anext(ait)` nursery.cancel_scope.cancel() @@ -529,38 +531,24 @@ async def context_manager( async def _move_elems_to_channel( aiterable: AsyncGenerator[T, None], send_chan: trio.MemorySendChannel[T], + task_status: trio.TaskStatus, ) -> None: - async with aclosing(aiterable) as agen: - # Outer loop manually advances the aiterable; we can't use async-for because - # we're going to use `.asend(err)` to forward errors back to the generator. - while True: - # Get the next value from `agen`; return if exhausted - try: - value: T = await agen.__anext__() - except StopAsyncIteration: - return - # Inner loop ensures that we send values which are yielded after - # catching an exception which we sent back to the generator. - while True: + # `async with send_chan` will eat exceptions, + # see https://github.com/python-trio/trio/issues/1559 + with send_chan: + async with aclosing(aiterable) as agen: + task_status.started() + # Outer loop manually advances the aiterable; we can't use async-for because + # we're going to use `.asend(err)` to forward errors back to the generator. + async for value in agen: + # Get the next value from `agen`; return if exhausted try: # Send the value to the channel await send_chan.send(value) - break except trio.BrokenResourceError: # Closing the corresponding receive channel should cause # a clean shutdown of the generator. return - except trio.Cancelled: - raise - except BaseException as error_from_send: - # TODO: add test case ... but idk how - # Forward any other errors to the generator. Exit cleanly - # if exhausted; otherwise it was handled in there and we - # can continue the inner loop with this value. - try: - value = await agen.athrow(error_from_send) - except StopAsyncIteration: - return # Phew. Context managers all cleaned up, we're done here. return decorator diff --git a/src/trio/_tests/test_channel.py b/src/trio/_tests/test_channel.py index 58839213a6..fbd5ff55a4 100644 --- a/src/trio/_tests/test_channel.py +++ b/src/trio/_tests/test_channel.py @@ -427,7 +427,6 @@ async def agen() -> AsyncGenerator[int]: async for x in recv_chan: assert x == 1 break # exit, cleanup should be quick - # comment `nursery.cancel_scope.cancel()` and it hangs async def test_background_with_channel_exhaust() -> None: @@ -471,14 +470,12 @@ async def agen() -> AsyncGenerator[int]: cs.cancel() -async def test_background_with_channel_waitwhat() -> None: +async def test_background_with_channel_no_race() -> None: + # this previously led to a race condition due to + # https://github.com/python-trio/trio/issues/1559 @background_with_channel() async def agen() -> AsyncGenerator[int]: yield 1 - # this exception sometimes disappear, and I don't know why - # gc? trio randomness? - # idk if it's gonna show up in CI, but I have like a 50% shot of failing - # when running the test case by itself raise ValueError("oae") with RaisesGroup(ValueError): @@ -487,5 +484,39 @@ async def agen() -> AsyncGenerator[int]: assert x == 1 -# TODO: I'm also failing to figure out how to test max_buffer_size -# and/or what changing it even achieves +async def test_background_with_channel_buffer_size_too_small( + autojump_clock: trio.testing.MockClock, +) -> None: + @background_with_channel(0) + async def agen() -> AsyncGenerator[int]: + yield 1 + yield 2 + raise AssertionError( + "buffer size 0 means we shouldn't be asked for another value" + ) + await trio.sleep_forever() + + with trio.move_on_after(5): + async with agen() as recv_chan: + async for x in recv_chan: + assert x == 1 + await trio.sleep_forever() + + +async def test_background_with_channel_buffer_size_just_right( + autojump_clock: trio.testing.MockClock, +) -> None: + event = trio.Event() + + @background_with_channel(2) + async def agen() -> AsyncGenerator[int]: + yield 1 + yield 2 + event.set() + + async with agen() as recv_chan: + await event.wait() + assert await recv_chan.__anext__() == 1 + assert await recv_chan.__anext__() == 2 + with pytest.raises(StopAsyncIteration): + await recv_chan.__anext__() From 75429734a749a420465c690b7839acc6cc2305c3 Mon Sep 17 00:00:00 2001 From: jakkdl Date: Thu, 13 Feb 2025 17:48:05 +0100 Subject: [PATCH 05/23] restore prev default, fix codecov --- src/trio/_channel.py | 2 +- src/trio/_tests/test_channel.py | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/trio/_channel.py b/src/trio/_channel.py index 7ac85c8d55..62e4accbc9 100644 --- a/src/trio/_channel.py +++ b/src/trio/_channel.py @@ -469,7 +469,7 @@ async def aclose(self) -> None: await trio.lowlevel.checkpoint() -def background_with_channel(max_buffer_size: float = 1) -> Callable[ +def background_with_channel(max_buffer_size: float = 0) -> Callable[ [ Callable[P, AsyncGenerator[T, None]], ], diff --git a/src/trio/_tests/test_channel.py b/src/trio/_tests/test_channel.py index fbd5ff55a4..e7cb7fbac3 100644 --- a/src/trio/_tests/test_channel.py +++ b/src/trio/_tests/test_channel.py @@ -421,7 +421,6 @@ async def test_background_with_channel() -> None: async def agen() -> AsyncGenerator[int]: yield 1 await trio.sleep_forever() # simulate deadlock - yield 2 async with agen() as recv_chan: async for x in recv_chan: @@ -493,8 +492,7 @@ async def agen() -> AsyncGenerator[int]: yield 2 raise AssertionError( "buffer size 0 means we shouldn't be asked for another value" - ) - await trio.sleep_forever() + ) # pragma: no cover with trio.move_on_after(5): async with agen() as recv_chan: From 86d3b0fbdc43d2d05c26e5ad219e322928bd49ba Mon Sep 17 00:00:00 2001 From: John Litborn <11260241+jakkdl@users.noreply.github.com> Date: Fri, 14 Feb 2025 15:20:19 +0100 Subject: [PATCH 06/23] Update src/trio/_channel.py add buffer_size note to docstring Co-authored-by: Zac Hatfield-Dodds --- src/trio/_channel.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/trio/_channel.py b/src/trio/_channel.py index 62e4accbc9..0ea20ffbb2 100644 --- a/src/trio/_channel.py +++ b/src/trio/_channel.py @@ -503,6 +503,20 @@ async def my_async_iterable(arg, *, kwarg=True): While the combined async-with-async-for can be inconvenient at first, the context manager is indispensable for both correctness and for prompt cleanup of resources. + + ... note:: + + With 'raw' async generators, code in the generator will never run + concurrently with that in the body of the ``async for`` loop - the + generator is resumed to compute each element on request. + Even with ``max_buffer_size=0``, a ``@background_with_channel()`` + function will 'precompute' each element in a background task, and + store it in the channel until requested by the loop. + + This is rarely a problem, so we've avoided the performance cost + of exactly replicating the behavior of raw generators. If you + concurent execution would cause problems, we recommend using a + :class:`trio.Lock` around the critical sections. """ # Perhaps a future PEP will adopt `async with for` syntax, like # https://coconut.readthedocs.io/en/master/DOCS.html#async-with-for From 2d11ea2d8245f5704d8cf988b84e9e0bd2a79087 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 14 Feb 2025 14:20:32 +0000 Subject: [PATCH 07/23] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/trio/_channel.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/trio/_channel.py b/src/trio/_channel.py index 0ea20ffbb2..a78c62c725 100644 --- a/src/trio/_channel.py +++ b/src/trio/_channel.py @@ -510,12 +510,12 @@ async def my_async_iterable(arg, *, kwarg=True): concurrently with that in the body of the ``async for`` loop - the generator is resumed to compute each element on request. Even with ``max_buffer_size=0``, a ``@background_with_channel()`` - function will 'precompute' each element in a background task, and + function will 'precompute' each element in a background task, and store it in the channel until requested by the loop. - This is rarely a problem, so we've avoided the performance cost + This is rarely a problem, so we've avoided the performance cost of exactly replicating the behavior of raw generators. If you - concurent execution would cause problems, we recommend using a + concurrent execution would cause problems, we recommend using a :class:`trio.Lock` around the critical sections. """ # Perhaps a future PEP will adopt `async with for` syntax, like From a5734f6bf6ca1d0afec141d15661f5e815486ee9 Mon Sep 17 00:00:00 2001 From: jakkdl Date: Fri, 14 Feb 2025 13:22:04 +0100 Subject: [PATCH 08/23] clean up comments, add some others, and remove unnecessary ait/agen distinction --- src/trio/_channel.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/src/trio/_channel.py b/src/trio/_channel.py index a78c62c725..1983f51749 100644 --- a/src/trio/_channel.py +++ b/src/trio/_channel.py @@ -531,31 +531,31 @@ async def context_manager( ) -> AsyncGenerator[trio.MemoryReceiveChannel[T], None]: send_chan, recv_chan = trio.open_memory_channel[T](max_buffer_size) async with trio.open_nursery() as nursery: - ait = fn(*args, **kwargs) - # nursery.start to make sure that we will clean up send_chan & ait - await nursery.start(_move_elems_to_channel, ait, send_chan) - # async with recv_chan could eat exceptions, so use sync cm + agen = fn(*args, **kwargs) + # `nursery.start` to make sure that we will clean up send_chan & ait + # If this errors we don't close `recv_chan`, but the caller + # never gets access to it, so that's not a problem. + await nursery.start(_move_elems_to_channel, agen, send_chan) + # `async with recv_chan` could eat exceptions, so use sync cm with recv_chan: yield recv_chan - # Return promptly, without waiting for `await anext(ait)` + # Return promptly, without waiting for the generator to yield the + # next value nursery.cancel_scope.cancel() return context_manager async def _move_elems_to_channel( - aiterable: AsyncGenerator[T, None], + agen: AsyncGenerator[T, None], send_chan: trio.MemorySendChannel[T], task_status: trio.TaskStatus, ) -> None: # `async with send_chan` will eat exceptions, # see https://github.com/python-trio/trio/issues/1559 with send_chan: - async with aclosing(aiterable) as agen: + async with aclosing(agen): task_status.started() - # Outer loop manually advances the aiterable; we can't use async-for because - # we're going to use `.asend(err)` to forward errors back to the generator. async for value in agen: - # Get the next value from `agen`; return if exhausted try: # Send the value to the channel await send_chan.send(value) @@ -563,6 +563,5 @@ async def _move_elems_to_channel( # Closing the corresponding receive channel should cause # a clean shutdown of the generator. return - # Phew. Context managers all cleaned up, we're done here. return decorator From 0b461d2c32ae512f70bb55695715b0012dd718c2 Mon Sep 17 00:00:00 2001 From: jakkdl Date: Fri, 14 Feb 2025 15:19:09 +0100 Subject: [PATCH 09/23] add newsfragment, docs. building docs is failing locally on AbstractAsyncContextManager, I have no clue why --- docs/source/reference-core.rst | 6 +++++- docs/source/reference-io.rst | 9 +++++++++ newsfragments/3197.feature.rst | 1 + src/trio/_channel.py | 4 ++-- 4 files changed, 17 insertions(+), 3 deletions(-) create mode 100644 newsfragments/3197.feature.rst diff --git a/docs/source/reference-core.rst b/docs/source/reference-core.rst index a21a6dec34..2e9371da2a 100644 --- a/docs/source/reference-core.rst +++ b/docs/source/reference-core.rst @@ -1607,7 +1607,11 @@ the numbers 0 through 9 with a 1-second delay before each one: trio.run(use_it) -Trio supports async generators, with some caveats described in this section. +Trio supports async generators, but there's several caveats and it's very +hard to handle them properly. Therefore Trio bundles a helper, +`trio.background_with_channel` that does it for you. + +The details on those problems are described in the following sections. Finalization ~~~~~~~~~~~~ diff --git a/docs/source/reference-io.rst b/docs/source/reference-io.rst index 665f62dd0b..02587f9fb5 100644 --- a/docs/source/reference-io.rst +++ b/docs/source/reference-io.rst @@ -181,6 +181,15 @@ Abstract base classes .. currentmodule:: trio +Converting Async Generators to use streams +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Using async generators is very handy, but also very treacherous. See +:ref:`async-generators`. Therefore Trio provides a handy helper that +solves that! + +.. autofunction:: trio.background_with_channel + Generic stream tools ~~~~~~~~~~~~~~~~~~~~ diff --git a/newsfragments/3197.feature.rst b/newsfragments/3197.feature.rst new file mode 100644 index 0000000000..5abdf0d6c9 --- /dev/null +++ b/newsfragments/3197.feature.rst @@ -0,0 +1 @@ +Add :func:`@trio.background_with_channel `, a wrapper that can be used to make async generators safe. See :ref:`async-generators`, `ASYNC900 `_, :pep:`789`, and :pep:`533`. diff --git a/src/trio/_channel.py b/src/trio/_channel.py index 1983f51749..224960e516 100644 --- a/src/trio/_channel.py +++ b/src/trio/_channel.py @@ -477,7 +477,7 @@ def background_with_channel(max_buffer_size: float = 0) -> Callable[ ]: """Decorate an async generator function to make it cancellation-safe. - The `yield` keyword offers a very convenient way to write iterators... + The ``yield`` keyword offers a very convenient way to write iterators... which makes it really unfortunate that async generators are so difficult to call correctly. Yielding from the inside of a cancel scope or a nursery to the outside `violates structured concurrency `_ @@ -488,7 +488,7 @@ def background_with_channel(max_buffer_size: float = 0) -> Callable[ This decorator gives you the best of both worlds: with careful exception handling and a background task we preserve structured concurrency by offering only the safe interface, and you can still write your iterables - with the convenience of `yield`. For example: + with the convenience of ``yield``. For example:: @background_with_channel() async def my_async_iterable(arg, *, kwarg=True): From b86eb54643e105dac688806887aae1ef3c5e4fcf Mon Sep 17 00:00:00 2001 From: jakkdl Date: Fri, 14 Feb 2025 15:27:34 +0100 Subject: [PATCH 10/23] fix minor docstring errors --- src/trio/_channel.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/trio/_channel.py b/src/trio/_channel.py index 224960e516..03f76f47d8 100644 --- a/src/trio/_channel.py +++ b/src/trio/_channel.py @@ -504,17 +504,18 @@ async def my_async_iterable(arg, *, kwarg=True): the context manager is indispensable for both correctness and for prompt cleanup of resources. - ... note:: + .. note:: With 'raw' async generators, code in the generator will never run concurrently with that in the body of the ``async for`` loop - the generator is resumed to compute each element on request. Even with ``max_buffer_size=0``, a ``@background_with_channel()`` function will 'precompute' each element in a background task, and - store it in the channel until requested by the loop. + send it to the internal channel, where it will wait until requested + by the loop. This is rarely a problem, so we've avoided the performance cost - of exactly replicating the behavior of raw generators. If you + of exactly replicating the behavior of raw generators. If concurrent execution would cause problems, we recommend using a :class:`trio.Lock` around the critical sections. """ From 1670674499491aa1dce878e92bc580dec424e387 Mon Sep 17 00:00:00 2001 From: jakkdl Date: Mon, 17 Feb 2025 17:08:51 +0100 Subject: [PATCH 11/23] docs&newsfragment fixes after review, remove aclosing --- docs/source/reference-core.rst | 14 ++++++-------- docs/source/reference-io.rst | 9 --------- newsfragments/3197.feature.rst | 2 +- src/trio/_channel.py | 35 ++++++++-------------------------- 4 files changed, 15 insertions(+), 45 deletions(-) diff --git a/docs/source/reference-core.rst b/docs/source/reference-core.rst index 2e9371da2a..bbeaea87b7 100644 --- a/docs/source/reference-core.rst +++ b/docs/source/reference-core.rst @@ -1611,7 +1611,10 @@ Trio supports async generators, but there's several caveats and it's very hard to handle them properly. Therefore Trio bundles a helper, `trio.background_with_channel` that does it for you. -The details on those problems are described in the following sections. + +.. autofunction:: trio.background_with_channel + +The details behind the problems are described in the following sections. Finalization ~~~~~~~~~~~~ @@ -1741,7 +1744,8 @@ so sometimes you'll get an unhelpful `TrioInternalError`. (And sometimes it will seem to work, which is probably the worst outcome of all, since then you might not notice the issue until you perform some minor refactoring of the generator or the code that's iterating it, or -just get unlucky. There is a `proposed Python enhancement +just get unlucky. There is a draft :pep:`798` with accompanying +`discussion thread `__ that would at least make it fail consistently.) @@ -1757,12 +1761,6 @@ the generator is suspended, what should the background tasks do? There's no good way to suspend them, but if they keep running and throw an exception, where can that exception be reraised? -If you have an async generator that wants to ``yield`` from within a nursery -or cancel scope, your best bet is to refactor it to be a separate task -that communicates over memory channels. The ``trio_util`` package offers a -`decorator that does this for you transparently -`__. - For more discussion, see Trio issues `264 `__ (especially `this comment diff --git a/docs/source/reference-io.rst b/docs/source/reference-io.rst index 02587f9fb5..665f62dd0b 100644 --- a/docs/source/reference-io.rst +++ b/docs/source/reference-io.rst @@ -181,15 +181,6 @@ Abstract base classes .. currentmodule:: trio -Converting Async Generators to use streams -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Using async generators is very handy, but also very treacherous. See -:ref:`async-generators`. Therefore Trio provides a handy helper that -solves that! - -.. autofunction:: trio.background_with_channel - Generic stream tools ~~~~~~~~~~~~~~~~~~~~ diff --git a/newsfragments/3197.feature.rst b/newsfragments/3197.feature.rst index 5abdf0d6c9..c306494e3c 100644 --- a/newsfragments/3197.feature.rst +++ b/newsfragments/3197.feature.rst @@ -1 +1 @@ -Add :func:`@trio.background_with_channel `, a wrapper that can be used to make async generators safe. See :ref:`async-generators`, `ASYNC900 `_, :pep:`789`, and :pep:`533`. +Add :func:`@trio.background_with_channel `, a wrapper that can be used to make async generators safe. This will be the suggested fix for `ASYNC900 `_. diff --git a/src/trio/_channel.py b/src/trio/_channel.py index 03f76f47d8..672efb4835 100644 --- a/src/trio/_channel.py +++ b/src/trio/_channel.py @@ -1,6 +1,5 @@ from __future__ import annotations -import sys from collections import OrderedDict, deque from contextlib import AbstractAsyncContextManager, asynccontextmanager from functools import wraps @@ -8,8 +7,6 @@ from typing import ( TYPE_CHECKING, Generic, - Protocol, - TypeVar, ) import attrs @@ -22,32 +19,13 @@ from ._util import NoPublicConstructor, final, generic_function if TYPE_CHECKING: - from collections.abc import AsyncGenerator, Awaitable, Callable + from collections.abc import AsyncGenerator, Callable from types import TracebackType from typing_extensions import ParamSpec, Self P = ParamSpec("P") -if sys.version_info >= (3, 10): - from contextlib import aclosing # new in Python 3.10 -else: - - class _SupportsAclose(Protocol): - def aclose(self) -> Awaitable[object]: ... - - _SupportsAcloseT = TypeVar("_SupportsAcloseT", bound=_SupportsAclose) - - class aclosing(AbstractAsyncContextManager[_SupportsAcloseT, None]): - def __init__(self, thing: _SupportsAcloseT) -> None: - self._aiter = thing - - async def __aenter__(self) -> _SupportsAcloseT: - return self._aiter - - async def __aexit__(self, *exc_info: object) -> None: - await self._aiter.aclose() - def _open_memory_channel( max_buffer_size: int | float, # noqa: PYI041 @@ -533,15 +511,15 @@ async def context_manager( send_chan, recv_chan = trio.open_memory_channel[T](max_buffer_size) async with trio.open_nursery() as nursery: agen = fn(*args, **kwargs) - # `nursery.start` to make sure that we will clean up send_chan & ait + # `nursery.start` to make sure that we will clean up send_chan & agen # If this errors we don't close `recv_chan`, but the caller # never gets access to it, so that's not a problem. await nursery.start(_move_elems_to_channel, agen, send_chan) # `async with recv_chan` could eat exceptions, so use sync cm with recv_chan: yield recv_chan - # Return promptly, without waiting for the generator to yield the - # next value + # User has exited context manager, cancel to immediately close the + # abandoned generator if it's still alive. nursery.cancel_scope.cancel() return context_manager @@ -554,7 +532,7 @@ async def _move_elems_to_channel( # `async with send_chan` will eat exceptions, # see https://github.com/python-trio/trio/issues/1559 with send_chan: - async with aclosing(agen): + try: task_status.started() async for value in agen: try: @@ -564,5 +542,8 @@ async def _move_elems_to_channel( # Closing the corresponding receive channel should cause # a clean shutdown of the generator. return + finally: + # replace try-finally with contextlib.aclosing once python39 is dropped + await agen.aclose() return decorator From 7acf3a0ef3980aa4df7f07f13a17a1c0f12ecbda Mon Sep 17 00:00:00 2001 From: Spencer Brown Date: Tue, 18 Feb 2025 10:00:39 +1000 Subject: [PATCH 12/23] Fix sphinx type hint resolution --- src/trio/_channel.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/trio/_channel.py b/src/trio/_channel.py index 672efb4835..ab5e6f47c4 100644 --- a/src/trio/_channel.py +++ b/src/trio/_channel.py @@ -1,6 +1,8 @@ from __future__ import annotations +import sys from collections import OrderedDict, deque +from collections.abc import AsyncGenerator, Callable # noqa: TC003 # Needed for Sphinx from contextlib import AbstractAsyncContextManager, asynccontextmanager from functools import wraps from math import inf @@ -19,12 +21,19 @@ from ._util import NoPublicConstructor, final, generic_function if TYPE_CHECKING: - from collections.abc import AsyncGenerator, Callable from types import TracebackType from typing_extensions import ParamSpec, Self P = ParamSpec("P") +elif "sphinx" in sys.modules: + # P needs to exist for Sphinx to parse the type hints successfully. + try: + from typing_extensions import ParamSpec + except ImportError: + P = ... # This is valid in Callable, though not correct + else: + P = ParamSpec("P") def _open_memory_channel( From efe2d00bd01b6a96dfe3fd9dd8e9450839d880d1 Mon Sep 17 00:00:00 2001 From: jakkdl Date: Mon, 24 Feb 2025 13:07:57 +0100 Subject: [PATCH 13/23] fix coverage. Would be great to have tox+coverage now... :eyes: --- pyproject.toml | 1 + src/trio/_tests/test_channel.py | 6 ++++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index ffc93e170f..5a03ee300e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -330,6 +330,7 @@ exclude_also = [ "@overload", 'class .*\bProtocol\b.*\):', "raise NotImplementedError", + '.*if "sphinx" in sys.modules:', 'TODO: test this line' ] partial_branches = [ diff --git a/src/trio/_tests/test_channel.py b/src/trio/_tests/test_channel.py index e7cb7fbac3..8261ad750e 100644 --- a/src/trio/_tests/test_channel.py +++ b/src/trio/_tests/test_channel.py @@ -463,7 +463,9 @@ async def test_background_with_channel_cancelled() -> None: @background_with_channel() async def agen() -> AsyncGenerator[int]: yield 1 - yield 1 + raise AssertionError( # pragma: no cover + "cancel before consumption means generator should not be iteratod" + ) async with agen(): cs.cancel() @@ -496,7 +498,7 @@ async def agen() -> AsyncGenerator[int]: with trio.move_on_after(5): async with agen() as recv_chan: - async for x in recv_chan: + async for x in recv_chan: # pragma: no branch assert x == 1 await trio.sleep_forever() From f78f641f35cb209eacda2632e1d98d0e75132bae Mon Sep 17 00:00:00 2001 From: jakkdl Date: Tue, 25 Feb 2025 13:38:19 +0100 Subject: [PATCH 14/23] fix interleaved execution on non-0 buffer size --- src/trio/_channel.py | 83 ++++++++++++++++++++++--------- src/trio/_tests/test_channel.py | 86 ++++++++++++++++++++++++++++++++- 2 files changed, 144 insertions(+), 25 deletions(-) diff --git a/src/trio/_channel.py b/src/trio/_channel.py index ab5e6f47c4..46029007f5 100644 --- a/src/trio/_channel.py +++ b/src/trio/_channel.py @@ -456,11 +456,40 @@ async def aclose(self) -> None: await trio.lowlevel.checkpoint() -def background_with_channel(max_buffer_size: float = 0) -> Callable[ +class RecvChanWrapper(ReceiveChannel[T]): + def __init__( + self, recv_chan: MemoryReceiveChannel[T], send_semaphore: trio.Semaphore | None + ) -> None: + self.recv_chan = recv_chan + self.send_semaphore = send_semaphore + + # TODO: should this allow clones? + + async def receive(self) -> T: + if self.send_semaphore is not None: + self.send_semaphore.release() + return await self.recv_chan.receive() + + async def aclose(self) -> None: + await self.recv_chan.aclose() + + def __enter__(self) -> Self: + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, + ) -> None: + self.recv_chan.close() + + +def background_with_channel(max_buffer_size: int | None = 0) -> Callable[ [ Callable[P, AsyncGenerator[T, None]], ], - Callable[P, AbstractAsyncContextManager[trio.MemoryReceiveChannel[T]]], + Callable[P, AbstractAsyncContextManager[trio.abc.ReceiveChannel[T]]], ]: """Decorate an async generator function to make it cancellation-safe. @@ -491,42 +520,42 @@ async def my_async_iterable(arg, *, kwarg=True): the context manager is indispensable for both correctness and for prompt cleanup of resources. - .. note:: - - With 'raw' async generators, code in the generator will never run - concurrently with that in the body of the ``async for`` loop - the - generator is resumed to compute each element on request. - Even with ``max_buffer_size=0``, a ``@background_with_channel()`` - function will 'precompute' each element in a background task, and - send it to the internal channel, where it will wait until requested - by the loop. - - This is rarely a problem, so we've avoided the performance cost - of exactly replicating the behavior of raw generators. If - concurrent execution would cause problems, we recommend using a - :class:`trio.Lock` around the critical sections. + If you specify ``max_buffer_size>0`` the async generator will run concurrently + with your iterator, until the buffer is full. """ # Perhaps a future PEP will adopt `async with for` syntax, like # https://coconut.readthedocs.io/en/master/DOCS.html#async-with-for + if not isinstance(max_buffer_size, int) and max_buffer_size is not None: + raise TypeError( + "`max_buffer_size` must be int or None, not {type(max_buffer_size)}. " + "Did you forget the parentheses in `@background_with_channel()`?" + ) + def decorator( fn: Callable[P, AsyncGenerator[T, None]], - ) -> Callable[P, AbstractAsyncContextManager[trio.MemoryReceiveChannel[T]]]: + ) -> Callable[P, AbstractAsyncContextManager[trio._channel.RecvChanWrapper[T]]]: @asynccontextmanager @wraps(fn) async def context_manager( *args: P.args, **kwargs: P.kwargs - ) -> AsyncGenerator[trio.MemoryReceiveChannel[T], None]: - send_chan, recv_chan = trio.open_memory_channel[T](max_buffer_size) + ) -> AsyncGenerator[trio._channel.RecvChanWrapper[T], None]: + max_buf_size_float = inf if max_buffer_size is None else max_buffer_size + send_chan, recv_chan = trio.open_memory_channel[T](max_buf_size_float) async with trio.open_nursery() as nursery: agen = fn(*args, **kwargs) + send_semaphore = ( + None if max_buffer_size is None else trio.Semaphore(max_buffer_size) + ) # `nursery.start` to make sure that we will clean up send_chan & agen # If this errors we don't close `recv_chan`, but the caller # never gets access to it, so that's not a problem. - await nursery.start(_move_elems_to_channel, agen, send_chan) + await nursery.start( + _move_elems_to_channel, agen, send_chan, send_semaphore + ) # `async with recv_chan` could eat exceptions, so use sync cm - with recv_chan: - yield recv_chan + with RecvChanWrapper(recv_chan, send_semaphore) as wrapped_recv_chan: + yield wrapped_recv_chan # User has exited context manager, cancel to immediately close the # abandoned generator if it's still alive. nursery.cancel_scope.cancel() @@ -536,6 +565,7 @@ async def context_manager( async def _move_elems_to_channel( agen: AsyncGenerator[T, None], send_chan: trio.MemorySendChannel[T], + send_semaphore: trio.Semaphore | None, task_status: trio.TaskStatus, ) -> None: # `async with send_chan` will eat exceptions, @@ -543,7 +573,14 @@ async def _move_elems_to_channel( with send_chan: try: task_status.started() - async for value in agen: + while True: + # wait for send_chan to be unblocked + if send_semaphore is not None: + await send_semaphore.acquire() + try: + value = await agen.__anext__() + except StopAsyncIteration: + return try: # Send the value to the channel await send_chan.send(value) diff --git a/src/trio/_tests/test_channel.py b/src/trio/_tests/test_channel.py index 8261ad750e..8bed4e75d2 100644 --- a/src/trio/_tests/test_channel.py +++ b/src/trio/_tests/test_channel.py @@ -7,7 +7,7 @@ import trio from trio import EndOfChannel, background_with_channel, open_memory_channel -from ..testing import RaisesGroup, assert_checkpoints, wait_all_tasks_blocked +from ..testing import Matcher, RaisesGroup, assert_checkpoints, wait_all_tasks_blocked if TYPE_CHECKING: from collections.abc import AsyncGenerator @@ -511,8 +511,8 @@ async def test_background_with_channel_buffer_size_just_right( @background_with_channel(2) async def agen() -> AsyncGenerator[int]: yield 1 - yield 2 event.set() + yield 2 async with agen() as recv_chan: await event.wait() @@ -520,3 +520,85 @@ async def agen() -> AsyncGenerator[int]: assert await recv_chan.__anext__() == 2 with pytest.raises(StopAsyncIteration): await recv_chan.__anext__() + + +async def test_background_with_channel_no_interleave() -> None: + @background_with_channel() + async def agen() -> AsyncGenerator[int]: + yield 1 + raise AssertionError + + async with agen() as recv_chan: + assert await recv_chan.__anext__() == 1 + await trio.lowlevel.checkpoint() + + +async def test_background_with_channel_multiple_errors() -> None: + event = trio.Event() + + @background_with_channel(1) + async def agen() -> AsyncGenerator[int]: + yield 1 + event.set() + raise ValueError("agen") + + with RaisesGroup( + Matcher(ValueError, match="^agen$"), + Matcher(TypeError, match="^iterator$"), + ): + async with agen() as recv_chan: + async for i in recv_chan: + assert i == 1 + await event.wait() + raise TypeError("iterator") + + +async def test_background_with_channel_genexit_finally() -> None: + events: list[str] = [] + + @background_with_channel() + async def agen(stuff: list[str]) -> AsyncGenerator[int]: + try: + yield 1 + except BaseException as e: + stuff.append(repr(e)) + raise + finally: + stuff.append("finally") + raise ValueError("agen") + + with RaisesGroup( + Matcher(ValueError, match="^agen$"), + Matcher(TypeError, match="^iterator$"), + ): + async with agen(events) as recv_chan: + async for i in recv_chan: + assert i == 1 + raise TypeError("iterator") + + assert events == ["GeneratorExit()", "finally"] + + +async def test_background_with_channel_nested_loop() -> None: + @background_with_channel() + async def agen() -> AsyncGenerator[int]: + for i in range(2): + yield i + + ii = 0 + async with agen() as recv_chan1: + async for i in recv_chan1: + async with agen() as recv_chan: + jj = 0 + async for j in recv_chan: + assert (i, j) == (ii, jj) + jj += 1 + ii += 1 + + +async def test_background_with_channel_no_parens() -> None: + with pytest.raises(TypeError, match="must be int or None"): + + @background_with_channel # type: ignore[arg-type] + async def agen() -> AsyncGenerator[None]: + yield # pragma: no cover From 5bfb0c5694384fd664d0d8a1f4bdedd97a0823b9 Mon Sep 17 00:00:00 2001 From: jakkdl Date: Tue, 25 Feb 2025 13:43:01 +0100 Subject: [PATCH 15/23] specify strict_exception_groups, clarify drop-in replacement status --- src/trio/_channel.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/trio/_channel.py b/src/trio/_channel.py index 46029007f5..5e491cfdee 100644 --- a/src/trio/_channel.py +++ b/src/trio/_channel.py @@ -493,6 +493,11 @@ def background_with_channel(max_buffer_size: int | None = 0) -> Callable[ ]: """Decorate an async generator function to make it cancellation-safe. + This is mostly a drop-in replacement, except for the fact that it will + wrap errors in exception groups due to the internal nursery. Although when + using it without a buffer it should be exceedingly rare to get multiple + exceptions. + The ``yield`` keyword offers a very convenient way to write iterators... which makes it really unfortunate that async generators are so difficult to call correctly. Yielding from the inside of a cancel scope or a nursery @@ -542,7 +547,7 @@ async def context_manager( ) -> AsyncGenerator[trio._channel.RecvChanWrapper[T], None]: max_buf_size_float = inf if max_buffer_size is None else max_buffer_size send_chan, recv_chan = trio.open_memory_channel[T](max_buf_size_float) - async with trio.open_nursery() as nursery: + async with trio.open_nursery(strict_exception_groups=True) as nursery: agen = fn(*args, **kwargs) send_semaphore = ( None if max_buffer_size is None else trio.Semaphore(max_buffer_size) From 54800e2d08351f26b108285b52538d070948021e Mon Sep 17 00:00:00 2001 From: John Litborn <11260241+jakkdl@users.noreply.github.com> Date: Tue, 25 Feb 2025 15:02:30 +0100 Subject: [PATCH 16/23] Apply suggestions from code review Co-authored-by: A5rocks --- docs/source/reference-core.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/reference-core.rst b/docs/source/reference-core.rst index bbeaea87b7..bfcb6c57e4 100644 --- a/docs/source/reference-core.rst +++ b/docs/source/reference-core.rst @@ -1744,7 +1744,7 @@ so sometimes you'll get an unhelpful `TrioInternalError`. (And sometimes it will seem to work, which is probably the worst outcome of all, since then you might not notice the issue until you perform some minor refactoring of the generator or the code that's iterating it, or -just get unlucky. There is a draft :pep:`798` with accompanying +just get unlucky. There is a draft :pep:`789` with accompanying `discussion thread `__ that would at least make it fail consistently.) From 0e34b8516b43362e83db25775304609253d2dfe7 Mon Sep 17 00:00:00 2001 From: jakkdl Date: Mon, 3 Mar 2025 13:43:36 +0100 Subject: [PATCH 17/23] codecov, fix tests after functionality change --- src/trio/_tests/test_channel.py | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/src/trio/_tests/test_channel.py b/src/trio/_tests/test_channel.py index 8bed4e75d2..ef54540944 100644 --- a/src/trio/_tests/test_channel.py +++ b/src/trio/_tests/test_channel.py @@ -417,7 +417,7 @@ async def do_send(s: trio.MemorySendChannel[int], v: int) -> None: async def test_background_with_channel() -> None: - @background_with_channel() + @background_with_channel(1) async def agen() -> AsyncGenerator[int]: yield 1 await trio.sleep_forever() # simulate deadlock @@ -461,11 +461,11 @@ async def test_background_with_channel_cancelled() -> None: with trio.CancelScope() as cs: @background_with_channel() - async def agen() -> AsyncGenerator[int]: - yield 1 - raise AssertionError( # pragma: no cover - "cancel before consumption means generator should not be iteratod" + async def agen() -> AsyncGenerator[None]: # pragma: no cover + raise AssertionError( + "cancel before consumption means generator should not be iterated" ) + yield # indicate that we're an iterator async with agen(): cs.cancel() @@ -491,7 +491,6 @@ async def test_background_with_channel_buffer_size_too_small( @background_with_channel(0) async def agen() -> AsyncGenerator[int]: yield 1 - yield 2 raise AssertionError( "buffer size 0 means we shouldn't be asked for another value" ) # pragma: no cover @@ -526,7 +525,7 @@ async def test_background_with_channel_no_interleave() -> None: @background_with_channel() async def agen() -> AsyncGenerator[int]: yield 1 - raise AssertionError + raise AssertionError # pragma: no cover async with agen() as recv_chan: assert await recv_chan.__anext__() == 1 @@ -547,7 +546,7 @@ async def agen() -> AsyncGenerator[int]: Matcher(TypeError, match="^iterator$"), ): async with agen() as recv_chan: - async for i in recv_chan: + async for i in recv_chan: # pragma: no branch assert i == 1 await event.wait() raise TypeError("iterator") @@ -572,7 +571,7 @@ async def agen(stuff: list[str]) -> AsyncGenerator[int]: Matcher(TypeError, match="^iterator$"), ): async with agen(events) as recv_chan: - async for i in recv_chan: + async for i in recv_chan: # pragma: no branch assert i == 1 raise TypeError("iterator") @@ -602,3 +601,18 @@ async def test_background_with_channel_no_parens() -> None: @background_with_channel # type: ignore[arg-type] async def agen() -> AsyncGenerator[None]: yield # pragma: no cover + + +async def test_background_with_channel_inf_buffer() -> None: + event = trio.Event() + + # agen immediately starts yielding numbers + # into the buffer upon entering the cm + @background_with_channel(None) + async def agen() -> AsyncGenerator[int]: + for i in range(10): + yield i + event.set() + + async with agen() as _: + await event.wait() From 1b8ce0a1f00c97f8ba55232b4159ed7e41f7b3e8 Mon Sep 17 00:00:00 2001 From: jakkdl Date: Mon, 3 Mar 2025 15:04:57 +0100 Subject: [PATCH 18/23] codecov --- src/trio/_tests/test_channel.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/trio/_tests/test_channel.py b/src/trio/_tests/test_channel.py index ef54540944..d351887924 100644 --- a/src/trio/_tests/test_channel.py +++ b/src/trio/_tests/test_channel.py @@ -614,5 +614,9 @@ async def agen() -> AsyncGenerator[int]: yield i event.set() - async with agen() as _: + async with agen() as recv_chan: await event.wait() + j = 0 + async for i in recv_chan: + assert i == j + j += 1 From 9f8a2abbff93b69994992a51e1b3121d356ad178 Mon Sep 17 00:00:00 2001 From: jakkdl Date: Mon, 3 Mar 2025 15:31:16 +0100 Subject: [PATCH 19/23] :100: plx --- src/trio/_tests/test_channel.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/trio/_tests/test_channel.py b/src/trio/_tests/test_channel.py index d351887924..7b5abf8e3e 100644 --- a/src/trio/_tests/test_channel.py +++ b/src/trio/_tests/test_channel.py @@ -613,6 +613,8 @@ async def agen() -> AsyncGenerator[int]: for i in range(10): yield i event.set() + # keep agen alive to receive values + await trio.sleep_forever() async with agen() as recv_chan: await event.wait() @@ -620,3 +622,5 @@ async def agen() -> AsyncGenerator[int]: async for i in recv_chan: assert i == j j += 1 + if j == 10: + break From bfa981c1eba79d159ca9cceff4dc1a3017f87096 Mon Sep 17 00:00:00 2001 From: jakkdl Date: Tue, 4 Mar 2025 12:34:19 +0100 Subject: [PATCH 20/23] okay now actually 100% coverage --- src/trio/_tests/test_channel.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/trio/_tests/test_channel.py b/src/trio/_tests/test_channel.py index 7b5abf8e3e..54f94e9770 100644 --- a/src/trio/_tests/test_channel.py +++ b/src/trio/_tests/test_channel.py @@ -471,6 +471,23 @@ async def agen() -> AsyncGenerator[None]: # pragma: no cover cs.cancel() +async def test_background_with_channel_recv_closed( + autojump_clock: trio.testing.MockClock, +) -> None: + event = trio.Event() + + @background_with_channel(1) + async def agen() -> AsyncGenerator[int]: + await event.wait() + yield 1 + + async with agen() as recv_chan: + await recv_chan.aclose() + event.set() + # wait for agen to try sending on the closed channel + await trio.sleep(1) + + async def test_background_with_channel_no_race() -> None: # this previously led to a race condition due to # https://github.com/python-trio/trio/issues/1559 From 26ed1c6c277982a592aba62aad28afd8ae494506 Mon Sep 17 00:00:00 2001 From: jakkdl Date: Wed, 12 Mar 2025 11:35:01 +0100 Subject: [PATCH 21/23] do everything but unwrapping the exception from inside the group --- src/trio/_channel.py | 91 ++++++++++++--------------------- src/trio/_tests/test_channel.py | 90 ++++---------------------------- 2 files changed, 43 insertions(+), 138 deletions(-) diff --git a/src/trio/_channel.py b/src/trio/_channel.py index 5e491cfdee..d2d980bd16 100644 --- a/src/trio/_channel.py +++ b/src/trio/_channel.py @@ -458,16 +458,16 @@ async def aclose(self) -> None: class RecvChanWrapper(ReceiveChannel[T]): def __init__( - self, recv_chan: MemoryReceiveChannel[T], send_semaphore: trio.Semaphore | None + self, recv_chan: MemoryReceiveChannel[T], send_semaphore: trio.Semaphore ) -> None: self.recv_chan = recv_chan self.send_semaphore = send_semaphore - # TODO: should this allow clones? + # TODO: should this allow clones? We'd signal that by inheriting from + # MemoryReceiveChannel. async def receive(self) -> T: - if self.send_semaphore is not None: - self.send_semaphore.release() + self.send_semaphore.release() return await self.recv_chan.receive() async def aclose(self) -> None: @@ -485,12 +485,9 @@ def __exit__( self.recv_chan.close() -def background_with_channel(max_buffer_size: int | None = 0) -> Callable[ - [ - Callable[P, AsyncGenerator[T, None]], - ], - Callable[P, AbstractAsyncContextManager[trio.abc.ReceiveChannel[T]]], -]: +def background_with_channel( + fn: Callable[P, AsyncGenerator[T, None]], +) -> Callable[P, AbstractAsyncContextManager[ReceiveChannel[T]]]: """Decorate an async generator function to make it cancellation-safe. This is mostly a drop-in replacement, except for the fact that it will @@ -511,7 +508,7 @@ def background_with_channel(max_buffer_size: int | None = 0) -> Callable[ offering only the safe interface, and you can still write your iterables with the convenience of ``yield``. For example:: - @background_with_channel() + @background_with_channel async def my_async_iterable(arg, *, kwarg=True): while ...: item = await ... @@ -531,46 +528,30 @@ async def my_async_iterable(arg, *, kwarg=True): # Perhaps a future PEP will adopt `async with for` syntax, like # https://coconut.readthedocs.io/en/master/DOCS.html#async-with-for - if not isinstance(max_buffer_size, int) and max_buffer_size is not None: - raise TypeError( - "`max_buffer_size` must be int or None, not {type(max_buffer_size)}. " - "Did you forget the parentheses in `@background_with_channel()`?" - ) - - def decorator( - fn: Callable[P, AsyncGenerator[T, None]], - ) -> Callable[P, AbstractAsyncContextManager[trio._channel.RecvChanWrapper[T]]]: - @asynccontextmanager - @wraps(fn) - async def context_manager( - *args: P.args, **kwargs: P.kwargs - ) -> AsyncGenerator[trio._channel.RecvChanWrapper[T], None]: - max_buf_size_float = inf if max_buffer_size is None else max_buffer_size - send_chan, recv_chan = trio.open_memory_channel[T](max_buf_size_float) - async with trio.open_nursery(strict_exception_groups=True) as nursery: - agen = fn(*args, **kwargs) - send_semaphore = ( - None if max_buffer_size is None else trio.Semaphore(max_buffer_size) - ) - # `nursery.start` to make sure that we will clean up send_chan & agen - # If this errors we don't close `recv_chan`, but the caller - # never gets access to it, so that's not a problem. - await nursery.start( - _move_elems_to_channel, agen, send_chan, send_semaphore - ) - # `async with recv_chan` could eat exceptions, so use sync cm - with RecvChanWrapper(recv_chan, send_semaphore) as wrapped_recv_chan: - yield wrapped_recv_chan - # User has exited context manager, cancel to immediately close the - # abandoned generator if it's still alive. - nursery.cancel_scope.cancel() - - return context_manager + @asynccontextmanager + @wraps(fn) + async def context_manager( + *args: P.args, **kwargs: P.kwargs + ) -> AsyncGenerator[trio._channel.RecvChanWrapper[T], None]: + send_chan, recv_chan = trio.open_memory_channel[T](0) + async with trio.open_nursery(strict_exception_groups=True) as nursery: + agen = fn(*args, **kwargs) + send_semaphore = trio.Semaphore(0) + # `nursery.start` to make sure that we will clean up send_chan & agen + # If this errors we don't close `recv_chan`, but the caller + # never gets access to it, so that's not a problem. + await nursery.start(_move_elems_to_channel, agen, send_chan, send_semaphore) + # `async with recv_chan` could eat exceptions, so use sync cm + with RecvChanWrapper(recv_chan, send_semaphore) as wrapped_recv_chan: + yield wrapped_recv_chan + # User has exited context manager, cancel to immediately close the + # abandoned generator if it's still alive. + nursery.cancel_scope.cancel() async def _move_elems_to_channel( agen: AsyncGenerator[T, None], send_chan: trio.MemorySendChannel[T], - send_semaphore: trio.Semaphore | None, + send_semaphore: trio.Semaphore, task_status: trio.TaskStatus, ) -> None: # `async with send_chan` will eat exceptions, @@ -579,22 +560,16 @@ async def _move_elems_to_channel( try: task_status.started() while True: - # wait for send_chan to be unblocked - if send_semaphore is not None: - await send_semaphore.acquire() + # wait for receiver to call next on the aiter + await send_semaphore.acquire() try: value = await agen.__anext__() except StopAsyncIteration: return - try: - # Send the value to the channel - await send_chan.send(value) - except trio.BrokenResourceError: - # Closing the corresponding receive channel should cause - # a clean shutdown of the generator. - return + # Send the value to the channel + await send_chan.send(value) finally: # replace try-finally with contextlib.aclosing once python39 is dropped await agen.aclose() - return decorator + return context_manager diff --git a/src/trio/_tests/test_channel.py b/src/trio/_tests/test_channel.py index 54f94e9770..58abed0764 100644 --- a/src/trio/_tests/test_channel.py +++ b/src/trio/_tests/test_channel.py @@ -417,7 +417,7 @@ async def do_send(s: trio.MemorySendChannel[int], v: int) -> None: async def test_background_with_channel() -> None: - @background_with_channel(1) + @background_with_channel async def agen() -> AsyncGenerator[int]: yield 1 await trio.sleep_forever() # simulate deadlock @@ -429,7 +429,7 @@ async def agen() -> AsyncGenerator[int]: async def test_background_with_channel_exhaust() -> None: - @background_with_channel() + @background_with_channel async def agen() -> AsyncGenerator[int]: yield 1 @@ -439,7 +439,7 @@ async def agen() -> AsyncGenerator[int]: async def test_background_with_channel_broken_resource() -> None: - @background_with_channel() + @background_with_channel async def agen() -> AsyncGenerator[int]: yield 1 yield 2 @@ -460,7 +460,7 @@ async def agen() -> AsyncGenerator[int]: async def test_background_with_channel_cancelled() -> None: with trio.CancelScope() as cs: - @background_with_channel() + @background_with_channel async def agen() -> AsyncGenerator[None]: # pragma: no cover raise AssertionError( "cancel before consumption means generator should not be iterated" @@ -476,7 +476,7 @@ async def test_background_with_channel_recv_closed( ) -> None: event = trio.Event() - @background_with_channel(1) + @background_with_channel async def agen() -> AsyncGenerator[int]: await event.wait() yield 1 @@ -491,7 +491,7 @@ async def agen() -> AsyncGenerator[int]: async def test_background_with_channel_no_race() -> None: # this previously led to a race condition due to # https://github.com/python-trio/trio/issues/1559 - @background_with_channel() + @background_with_channel async def agen() -> AsyncGenerator[int]: yield 1 raise ValueError("oae") @@ -505,7 +505,7 @@ async def agen() -> AsyncGenerator[int]: async def test_background_with_channel_buffer_size_too_small( autojump_clock: trio.testing.MockClock, ) -> None: - @background_with_channel(0) + @background_with_channel async def agen() -> AsyncGenerator[int]: yield 1 raise AssertionError( @@ -519,27 +519,8 @@ async def agen() -> AsyncGenerator[int]: await trio.sleep_forever() -async def test_background_with_channel_buffer_size_just_right( - autojump_clock: trio.testing.MockClock, -) -> None: - event = trio.Event() - - @background_with_channel(2) - async def agen() -> AsyncGenerator[int]: - yield 1 - event.set() - yield 2 - - async with agen() as recv_chan: - await event.wait() - assert await recv_chan.__anext__() == 1 - assert await recv_chan.__anext__() == 2 - with pytest.raises(StopAsyncIteration): - await recv_chan.__anext__() - - async def test_background_with_channel_no_interleave() -> None: - @background_with_channel() + @background_with_channel async def agen() -> AsyncGenerator[int]: yield 1 raise AssertionError # pragma: no cover @@ -549,30 +530,10 @@ async def agen() -> AsyncGenerator[int]: await trio.lowlevel.checkpoint() -async def test_background_with_channel_multiple_errors() -> None: - event = trio.Event() - - @background_with_channel(1) - async def agen() -> AsyncGenerator[int]: - yield 1 - event.set() - raise ValueError("agen") - - with RaisesGroup( - Matcher(ValueError, match="^agen$"), - Matcher(TypeError, match="^iterator$"), - ): - async with agen() as recv_chan: - async for i in recv_chan: # pragma: no branch - assert i == 1 - await event.wait() - raise TypeError("iterator") - - async def test_background_with_channel_genexit_finally() -> None: events: list[str] = [] - @background_with_channel() + @background_with_channel async def agen(stuff: list[str]) -> AsyncGenerator[int]: try: yield 1 @@ -596,7 +557,7 @@ async def agen(stuff: list[str]) -> AsyncGenerator[int]: async def test_background_with_channel_nested_loop() -> None: - @background_with_channel() + @background_with_channel async def agen() -> AsyncGenerator[int]: for i in range(2): yield i @@ -610,34 +571,3 @@ async def agen() -> AsyncGenerator[int]: assert (i, j) == (ii, jj) jj += 1 ii += 1 - - -async def test_background_with_channel_no_parens() -> None: - with pytest.raises(TypeError, match="must be int or None"): - - @background_with_channel # type: ignore[arg-type] - async def agen() -> AsyncGenerator[None]: - yield # pragma: no cover - - -async def test_background_with_channel_inf_buffer() -> None: - event = trio.Event() - - # agen immediately starts yielding numbers - # into the buffer upon entering the cm - @background_with_channel(None) - async def agen() -> AsyncGenerator[int]: - for i in range(10): - yield i - event.set() - # keep agen alive to receive values - await trio.sleep_forever() - - async with agen() as recv_chan: - await event.wait() - j = 0 - async for i in recv_chan: - assert i == j - j += 1 - if j == 10: - break From e7556b4ef778a6b89bcb93d0c664799dda0e7e83 Mon Sep 17 00:00:00 2001 From: jakkdl Date: Wed, 9 Apr 2025 15:32:00 +0200 Subject: [PATCH 22/23] unwrap exceptiongroup, add test --- src/trio/_channel.py | 46 +++++++++++++++++++++++---------- src/trio/_tests/test_channel.py | 40 ++++++++++++++++------------ 2 files changed, 55 insertions(+), 31 deletions(-) diff --git a/src/trio/_channel.py b/src/trio/_channel.py index d2d980bd16..3cfcf988e1 100644 --- a/src/trio/_channel.py +++ b/src/trio/_channel.py @@ -18,7 +18,15 @@ from ._abc import ReceiveChannel, ReceiveType, SendChannel, SendType, T from ._core import Abort, RaiseCancelT, Task, enable_ki_protection -from ._util import NoPublicConstructor, final, generic_function +from ._util import ( + NoPublicConstructor, + final, + generic_function, + raise_single_exception_from_group, +) + +if sys.version_info < (3, 11): + from exceptiongroup import BaseExceptionGroup if TYPE_CHECKING: from types import TracebackType @@ -534,19 +542,29 @@ async def context_manager( *args: P.args, **kwargs: P.kwargs ) -> AsyncGenerator[trio._channel.RecvChanWrapper[T], None]: send_chan, recv_chan = trio.open_memory_channel[T](0) - async with trio.open_nursery(strict_exception_groups=True) as nursery: - agen = fn(*args, **kwargs) - send_semaphore = trio.Semaphore(0) - # `nursery.start` to make sure that we will clean up send_chan & agen - # If this errors we don't close `recv_chan`, but the caller - # never gets access to it, so that's not a problem. - await nursery.start(_move_elems_to_channel, agen, send_chan, send_semaphore) - # `async with recv_chan` could eat exceptions, so use sync cm - with RecvChanWrapper(recv_chan, send_semaphore) as wrapped_recv_chan: - yield wrapped_recv_chan - # User has exited context manager, cancel to immediately close the - # abandoned generator if it's still alive. - nursery.cancel_scope.cancel() + try: + async with trio.open_nursery(strict_exception_groups=True) as nursery: + agen = fn(*args, **kwargs) + send_semaphore = trio.Semaphore(0) + # `nursery.start` to make sure that we will clean up send_chan & agen + # If this errors we don't close `recv_chan`, but the caller + # never gets access to it, so that's not a problem. + await nursery.start( + _move_elems_to_channel, agen, send_chan, send_semaphore + ) + # `async with recv_chan` could eat exceptions, so use sync cm + with RecvChanWrapper(recv_chan, send_semaphore) as wrapped_recv_chan: + yield wrapped_recv_chan + # User has exited context manager, cancel to immediately close the + # abandoned generator if it's still alive. + nursery.cancel_scope.cancel() + except BaseExceptionGroup as eg: + try: + raise_single_exception_from_group(eg) + except AssertionError: + raise RuntimeError( + "Encountered exception during cleanup of generator object, as well as exception in the contextmanager body" + ) from eg async def _move_elems_to_channel( agen: AsyncGenerator[T, None], diff --git a/src/trio/_tests/test_channel.py b/src/trio/_tests/test_channel.py index 58abed0764..dcf7ad6725 100644 --- a/src/trio/_tests/test_channel.py +++ b/src/trio/_tests/test_channel.py @@ -416,18 +416,6 @@ async def do_send(s: trio.MemorySendChannel[int], v: int) -> None: r.receive_nowait() -async def test_background_with_channel() -> None: - @background_with_channel - async def agen() -> AsyncGenerator[int]: - yield 1 - await trio.sleep_forever() # simulate deadlock - - async with agen() as recv_chan: - async for x in recv_chan: - assert x == 1 - break # exit, cleanup should be quick - - async def test_background_with_channel_exhaust() -> None: @background_with_channel async def agen() -> AsyncGenerator[int]: @@ -496,7 +484,7 @@ async def agen() -> AsyncGenerator[int]: yield 1 raise ValueError("oae") - with RaisesGroup(ValueError): + with pytest.raises(ValueError, match=r"^oae$"): async with agen() as recv_chan: async for x in recv_chan: assert x == 1 @@ -544,16 +532,20 @@ async def agen(stuff: list[str]) -> AsyncGenerator[int]: stuff.append("finally") raise ValueError("agen") - with RaisesGroup( - Matcher(ValueError, match="^agen$"), - Matcher(TypeError, match="^iterator$"), - ): + with pytest.raises( + RuntimeError, + match=r"^Encountered exception during cleanup of generator object, as well as exception in the contextmanager body.$", + ) as excinfo: async with agen(events) as recv_chan: async for i in recv_chan: # pragma: no branch assert i == 1 raise TypeError("iterator") assert events == ["GeneratorExit()", "finally"] + RaisesGroup( + Matcher(ValueError, match="^agen$"), + Matcher(TypeError, match="^iterator$"), + ).matches(excinfo.value.__cause__) async def test_background_with_channel_nested_loop() -> None: @@ -571,3 +563,17 @@ async def agen() -> AsyncGenerator[int]: assert (i, j) == (ii, jj) jj += 1 ii += 1 + + +async def test_doesnt_leak_cancellation() -> None: + @background_with_channel + async def agenfn() -> AsyncGenerator[None]: + with trio.CancelScope() as cscope: + cscope.cancel() + yield + + with pytest.raises(AssertionError): + async with agenfn() as recv_chan: + async for _ in recv_chan: + pass + raise AssertionError("should be reachable") From da8a415bde71ee11eeab69801e2a05315de76c13 Mon Sep 17 00:00:00 2001 From: jakkdl Date: Wed, 9 Apr 2025 15:42:15 +0200 Subject: [PATCH 23/23] . --- src/trio/_channel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/trio/_channel.py b/src/trio/_channel.py index 3cfcf988e1..8b13b95275 100644 --- a/src/trio/_channel.py +++ b/src/trio/_channel.py @@ -563,7 +563,7 @@ async def context_manager( raise_single_exception_from_group(eg) except AssertionError: raise RuntimeError( - "Encountered exception during cleanup of generator object, as well as exception in the contextmanager body" + "Encountered exception during cleanup of generator object, as well as exception in the contextmanager body." ) from eg async def _move_elems_to_channel(