Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ipc failure while streaming #346

Merged
merged 23 commits into from
Jan 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
4f97718
Handle broken mem chan on `Actor._push_result()`
goodboy Nov 10, 2022
de04bbb
Don't raise on a broken IPC-context when sending stop msg
goodboy Nov 14, 2022
a4874a3
Always set the `parent_exit: trio.Event` on exit
goodboy Nov 10, 2022
d27c081
Ensure arbiter sockaddr type before usage
goodboy Dec 13, 2022
97d5f72
Fix uid2nursery lookup table type annot
goodboy Oct 17, 2022
158569a
Add WIP example of silent IPC breaks while streaming
goodboy Jan 26, 2023
ddf3d0d
Show tracebacks for un-shipped/propagated errors
goodboy Jan 26, 2023
df01294
Show more functiony syntax in ctx-cancelled log msgs
goodboy Jan 8, 2023
7394a18
Name one-way streaming (con generators) what it is
goodboy Jan 27, 2023
36a83cb
Refine example to drop IPC mid-stream
goodboy Jan 27, 2023
fb9ff45
Move example to a new `advanced_faults` egs subset dir
goodboy Jan 27, 2023
4f8586a
Wrap ex in new test, change dir helpers to use `pathlib.Path`
goodboy Jan 27, 2023
1d92f25
Adjust other examples tests to expect `pathlib` objects
goodboy Jan 27, 2023
7fddb44
Handle `mp` spawn method cases in test suite
goodboy Jan 27, 2023
3a0817f
Skip `advanced_faults/` subset in docs examples tests
goodboy Jan 27, 2023
6c35ba2
Add IPC breakage on both parent and child side
goodboy Jan 28, 2023
e34823a
Add parent vs. child cancels first cases
goodboy Jan 28, 2023
3967c0e
Add a simplified zombie lord specific process reaping test
goodboy Jan 28, 2023
556f462
Tweak warning msg for still-alive-after-cancelled actor
goodboy Jan 28, 2023
aa4871b
Call `MsgStream.aclose()` in `Context.open_stream.__aexit__()`
goodboy Jan 28, 2023
195d2f0
Add nooz
goodboy Jan 28, 2023
af6c325
Bump up legacy streaming timeout a smidgen
goodboy Jan 28, 2023
13c9ead
Move result log msg up and drop else block
goodboy Jan 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 151 additions & 0 deletions examples/advanced_faults/ipc_failure_during_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
'''
Complex edge case where during real-time streaming the IPC tranport
channels are wiped out (purposely in this example though it could have
been an outage) and we want to ensure that despite being in debug mode
(or not) the user can sent SIGINT once they notice the hang and the
actor tree will eventually be cancelled without leaving any zombies.

'''
import trio
from tractor import (
open_nursery,
context,
Context,
MsgStream,
)


async def break_channel_silently_then_error(
stream: MsgStream,
):
async for msg in stream:
await stream.send(msg)

# XXX: close the channel right after an error is raised
# purposely breaking the IPC transport to make sure the parent
# doesn't get stuck in debug or hang on the connection join.
# this more or less simulates an infinite msg-receive hang on
# the other end.
await stream._ctx.chan.send(None)
assert 0


async def close_stream_and_error(
stream: MsgStream,
):
async for msg in stream:
await stream.send(msg)

# wipe out channel right before raising
await stream._ctx.chan.send(None)
await stream.aclose()
assert 0


@context
async def recv_and_spawn_net_killers(

ctx: Context,
break_ipc_after: bool | int = False,

) -> None:
'''
Receive stream msgs and spawn some IPC killers mid-stream.

'''
await ctx.started()
async with (
ctx.open_stream() as stream,
trio.open_nursery() as n,
):
async for i in stream:
print(f'child echoing {i}')
await stream.send(i)
if (
break_ipc_after
and i > break_ipc_after
):
'#################################\n'
'Simulating child-side IPC BREAK!\n'
'#################################'
n.start_soon(break_channel_silently_then_error, stream)
n.start_soon(close_stream_and_error, stream)


async def main(
debug_mode: bool = False,
start_method: str = 'trio',

# by default we break the parent IPC first (if configured to break
# at all), but this can be changed so the child does first (even if
# both are set to break).
break_parent_ipc_after: int | bool = False,
break_child_ipc_after: int | bool = False,

) -> None:

async with (
open_nursery(
start_method=start_method,

# NOTE: even debugger is used we shouldn't get
# a hang since it never engages due to broken IPC
debug_mode=debug_mode,
loglevel='warning',

) as an,
):
portal = await an.start_actor(
'chitty_hijo',
enable_modules=[__name__],
)

async with portal.open_context(
recv_and_spawn_net_killers,
break_ipc_after=break_child_ipc_after,

) as (ctx, sent):
async with ctx.open_stream() as stream:
for i in range(1000):

if (
break_parent_ipc_after
and i > break_parent_ipc_after
):
print(
'#################################\n'
'Simulating parent-side IPC BREAK!\n'
'#################################'
)
await stream._ctx.chan.send(None)

# it actually breaks right here in the
# mp_spawn/forkserver backends and thus the zombie
# reaper never even kicks in?
print(f'parent sending {i}')
await stream.send(i)

with trio.move_on_after(2) as cs:

# NOTE: in the parent side IPC failure case this
# will raise an ``EndOfChannel`` after the child
# is killed and sends a stop msg back to it's
# caller/this-parent.
rx = await stream.receive()

print(f"I'm a happy user and echoed to me is {rx}")

if cs.cancelled_caught:
# pretend to be a user seeing no streaming action
# thinking it's a hang, and then hitting ctl-c..
print("YOO i'm a user anddd thingz hangin..")

print(
"YOO i'm mad send side dun but thingz hangin..\n"
'MASHING CTlR-C Ctl-c..'
)
raise KeyboardInterrupt


if __name__ == '__main__':
trio.run(main)
15 changes: 15 additions & 0 deletions nooz/346.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
Fixes to ensure IPC (channel) breakage doesn't result in hung actor
trees; the zombie reaping and general supervision machinery will always
clean up and terminate.

This includes not only the (mostly minor) fixes to solve these cases but
also a new extensive test suite in `test_advanced_faults.py` with an
accompanying highly configurable example module-script in
`examples/advanced_faults/ipc_failure_during_stream.py`. Tests ensure we
never get hang or zombies despite operating in debug mode and attempt to
simulate all possible IPC transport failure cases for a local-host actor
tree.

Further we simplify `Context.open_stream.__aexit__()` to just call
`MsgStream.aclose()` directly more or less avoiding a pure duplicate
code path.
26 changes: 17 additions & 9 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import random
import signal
import platform
import pathlib
import time
import inspect
from functools import partial, wraps
Expand Down Expand Up @@ -113,14 +114,21 @@ async def _main():
)


def repodir():
"""Return the abspath to the repo directory.
"""
dirname = os.path.dirname
dirpath = os.path.abspath(
dirname(dirname(os.path.realpath(__file__)))
)
return dirpath
def repodir() -> pathlib.Path:
'''
Return the abspath to the repo directory.

'''
# 2 parents up to step up through tests/<repo_dir>
return pathlib.Path(__file__).parent.parent.absolute()


def examples_dir() -> pathlib.Path:
'''
Return the abspath to the examples directory as `pathlib.Path`.

'''
return repodir() / 'examples'


def pytest_addoption(parser):
Expand Down Expand Up @@ -151,7 +159,7 @@ def loglevel(request):


@pytest.fixture(scope='session')
def spawn_backend(request):
def spawn_backend(request) -> str:
return request.config.option.spawn_backend


Expand Down
Loading