From dec2b1f0f52fad7002420451995912028e45715f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Thu, 20 Mar 2025 17:50:22 -0400 Subject: [PATCH 01/24] Reapply "Port all tests to new `reg_addr` fixture name" This reverts-the-revert of commit bc13599e1f38cefb5f936bc22311aa62925747f6 which was needed to land pre `multihomed` feat branch history. --- tests/test_cancellation.py | 31 +++++---- tests/test_child_manages_service_nursery.py | 2 +- tests/test_debugger.py | 2 +- tests/test_discovery.py | 76 +++++++++++++-------- tests/test_infected_asyncio.py | 26 +++---- tests/test_legacy_one_way_streaming.py | 22 +++--- tests/test_local.py | 12 ++-- tests/test_multi_program.py | 10 +-- tests/test_pubsub.py | 8 +-- tests/test_rpc.py | 4 +- tests/test_spawning.py | 14 ++-- tests/test_task_broadcasting.py | 18 ++--- 12 files changed, 126 insertions(+), 99 deletions(-) diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index 14e4d0ae..b8c14af3 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -14,7 +14,7 @@ from tractor._testing import ( tractor_test, ) -from .conftest import no_windows +from conftest import no_windows def is_win(): @@ -45,7 +45,7 @@ async def do_nuthin(): ], ids=['no_args', 'unexpected_args'], ) -def test_remote_error(arb_addr, args_err): +def test_remote_error(reg_addr, args_err): ''' Verify an error raised in a subactor that is propagated to the parent nursery, contains the underlying boxed builtin @@ -57,7 +57,7 @@ def test_remote_error(arb_addr, args_err): async def main(): async with tractor.open_nursery( - arbiter_addr=arb_addr, + registry_addrs=[reg_addr], ) as nursery: # on a remote type error caused by bad input args @@ -99,7 +99,7 @@ async def main(): assert exc.type == errtype -def test_multierror(arb_addr): +def test_multierror(reg_addr): ''' Verify we raise a ``BaseExceptionGroup`` out of a nursery where more then one actor errors. @@ -107,7 +107,7 @@ def test_multierror(arb_addr): ''' async def main(): async with tractor.open_nursery( - arbiter_addr=arb_addr, + registry_addrs=[reg_addr], ) as nursery: await nursery.run_in_actor(assert_err, name='errorer1') @@ -132,14 +132,14 @@ async def main(): @pytest.mark.parametrize( 'num_subactors', range(25, 26), ) -def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay): +def test_multierror_fast_nursery(reg_addr, start_method, num_subactors, delay): """Verify we raise a ``BaseExceptionGroup`` out of a nursery where more then one actor errors and also with a delay before failure to test failure during an ongoing spawning. """ async def main(): async with tractor.open_nursery( - arbiter_addr=arb_addr, + registry_addrs=[reg_addr], ) as nursery: for i in range(num_subactors): @@ -177,15 +177,20 @@ async def do_nothing(): @pytest.mark.parametrize('mechanism', ['nursery_cancel', KeyboardInterrupt]) -def test_cancel_single_subactor(arb_addr, mechanism): - """Ensure a ``ActorNursery.start_actor()`` spawned subactor +def test_cancel_single_subactor(reg_addr, mechanism): + ''' + Ensure a ``ActorNursery.start_actor()`` spawned subactor cancels when the nursery is cancelled. - """ + + ''' async def spawn_actor(): - """Spawn an actor that blocks indefinitely. - """ + ''' + Spawn an actor that blocks indefinitely then cancel via + either `ActorNursery.cancel()` or an exception raise. + + ''' async with tractor.open_nursery( - arbiter_addr=arb_addr, + registry_addrs=[reg_addr], ) as nursery: portal = await nursery.start_actor( diff --git a/tests/test_child_manages_service_nursery.py b/tests/test_child_manages_service_nursery.py index 1dcbe031..350f939b 100644 --- a/tests/test_child_manages_service_nursery.py +++ b/tests/test_child_manages_service_nursery.py @@ -142,7 +142,7 @@ async def open_actor_local_nursery( ) def test_actor_managed_trio_nursery_task_error_cancels_aio( asyncio_mode: bool, - arb_addr + reg_addr: tuple, ): ''' Verify that a ``trio`` nursery created managed in a child actor diff --git a/tests/test_debugger.py b/tests/test_debugger.py index a10ecad9..923aa94d 100644 --- a/tests/test_debugger.py +++ b/tests/test_debugger.py @@ -83,7 +83,7 @@ def mk_cmd(ex_name: str) -> str: def spawn( start_method, testdir, - arb_addr, + reg_addr, ) -> 'pexpect.spawn': if start_method != 'trio': diff --git a/tests/test_discovery.py b/tests/test_discovery.py index b56c3a2e..cd9dc022 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -14,19 +14,19 @@ @tractor_test -async def test_reg_then_unreg(arb_addr): +async def test_reg_then_unreg(reg_addr): actor = tractor.current_actor() assert actor.is_arbiter assert len(actor._registry) == 1 # only self is registered async with tractor.open_nursery( - arbiter_addr=arb_addr, + registry_addrs=[reg_addr], ) as n: portal = await n.start_actor('actor', enable_modules=[__name__]) uid = portal.channel.uid - async with tractor.get_arbiter(*arb_addr) as aportal: + async with tractor.get_arbiter(*reg_addr) as aportal: # this local actor should be the arbiter assert actor is aportal.actor @@ -52,15 +52,27 @@ async def hi(): return the_line.format(tractor.current_actor().name) -async def say_hello(other_actor): +async def say_hello( + other_actor: str, + reg_addr: tuple[str, int], +): await trio.sleep(1) # wait for other actor to spawn - async with tractor.find_actor(other_actor) as portal: + async with tractor.find_actor( + other_actor, + registry_addrs=[reg_addr], + ) as portal: assert portal is not None return await portal.run(__name__, 'hi') -async def say_hello_use_wait(other_actor): - async with tractor.wait_for_actor(other_actor) as portal: +async def say_hello_use_wait( + other_actor: str, + reg_addr: tuple[str, int], +): + async with tractor.wait_for_actor( + other_actor, + registry_addr=reg_addr, + ) as portal: assert portal is not None result = await portal.run(__name__, 'hi') return result @@ -68,21 +80,29 @@ async def say_hello_use_wait(other_actor): @tractor_test @pytest.mark.parametrize('func', [say_hello, say_hello_use_wait]) -async def test_trynamic_trio(func, start_method, arb_addr): - """Main tractor entry point, the "master" process (for now - acts as the "director"). - """ +async def test_trynamic_trio( + func, + start_method, + reg_addr, +): + ''' + Root actor acting as the "director" and running one-shot-task-actors + for the directed subs. + + ''' async with tractor.open_nursery() as n: print("Alright... Action!") donny = await n.run_in_actor( func, other_actor='gretchen', + reg_addr=reg_addr, name='donny', ) gretchen = await n.run_in_actor( func, other_actor='donny', + reg_addr=reg_addr, name='gretchen', ) print(await gretchen.result()) @@ -130,7 +150,7 @@ async def unpack_reg(actor_or_portal): async def spawn_and_check_registry( - arb_addr: tuple, + reg_addr: tuple, use_signal: bool, remote_arbiter: bool = False, with_streaming: bool = False, @@ -138,9 +158,9 @@ async def spawn_and_check_registry( ) -> None: async with tractor.open_root_actor( - arbiter_addr=arb_addr, + registry_addrs=[reg_addr], ): - async with tractor.get_arbiter(*arb_addr) as portal: + async with tractor.get_arbiter(*reg_addr) as portal: # runtime needs to be up to call this actor = tractor.current_actor() @@ -212,17 +232,19 @@ async def spawn_and_check_registry( def test_subactors_unregister_on_cancel( start_method, use_signal, - arb_addr, + reg_addr, with_streaming, ): - """Verify that cancelling a nursery results in all subactors + ''' + Verify that cancelling a nursery results in all subactors deregistering themselves with the arbiter. - """ + + ''' with pytest.raises(KeyboardInterrupt): trio.run( partial( spawn_and_check_registry, - arb_addr, + reg_addr, use_signal, remote_arbiter=False, with_streaming=with_streaming, @@ -236,7 +258,7 @@ def test_subactors_unregister_on_cancel_remote_daemon( daemon, start_method, use_signal, - arb_addr, + reg_addr, with_streaming, ): """Verify that cancelling a nursery results in all subactors @@ -247,7 +269,7 @@ def test_subactors_unregister_on_cancel_remote_daemon( trio.run( partial( spawn_and_check_registry, - arb_addr, + reg_addr, use_signal, remote_arbiter=True, with_streaming=with_streaming, @@ -261,7 +283,7 @@ async def streamer(agen): async def close_chans_before_nursery( - arb_addr: tuple, + reg_addr: tuple, use_signal: bool, remote_arbiter: bool = False, ) -> None: @@ -274,9 +296,9 @@ async def close_chans_before_nursery( entries_at_end = 1 async with tractor.open_root_actor( - arbiter_addr=arb_addr, + registry_addrs=[reg_addr], ): - async with tractor.get_arbiter(*arb_addr) as aportal: + async with tractor.get_arbiter(*reg_addr) as aportal: try: get_reg = partial(unpack_reg, aportal) @@ -328,7 +350,7 @@ async def close_chans_before_nursery( def test_close_channel_explicit( start_method, use_signal, - arb_addr, + reg_addr, ): """Verify that closing a stream explicitly and killing the actor's "root nursery" **before** the containing nursery tears down also @@ -338,7 +360,7 @@ def test_close_channel_explicit( trio.run( partial( close_chans_before_nursery, - arb_addr, + reg_addr, use_signal, remote_arbiter=False, ), @@ -350,7 +372,7 @@ def test_close_channel_explicit_remote_arbiter( daemon, start_method, use_signal, - arb_addr, + reg_addr, ): """Verify that closing a stream explicitly and killing the actor's "root nursery" **before** the containing nursery tears down also @@ -360,7 +382,7 @@ def test_close_channel_explicit_remote_arbiter( trio.run( partial( close_chans_before_nursery, - arb_addr, + reg_addr, use_signal, remote_arbiter=True, ), diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index f9670225..568708a2 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -47,7 +47,7 @@ async def trio_cancels_single_aio_task(): await tractor.to_asyncio.run_task(sleep_forever) -def test_trio_cancels_aio_on_actor_side(arb_addr): +def test_trio_cancels_aio_on_actor_side(reg_addr): ''' Spawn an infected actor that is cancelled by the ``trio`` side task using std cancel scope apis. @@ -55,7 +55,7 @@ def test_trio_cancels_aio_on_actor_side(arb_addr): ''' async def main(): async with tractor.open_nursery( - arbiter_addr=arb_addr + registry_addrs=[reg_addr] ) as n: await n.run_in_actor( trio_cancels_single_aio_task, @@ -94,7 +94,7 @@ async def asyncio_actor( raise -def test_aio_simple_error(arb_addr): +def test_aio_simple_error(reg_addr): ''' Verify a simple remote asyncio error propagates back through trio to the parent actor. @@ -103,7 +103,7 @@ def test_aio_simple_error(arb_addr): ''' async def main(): async with tractor.open_nursery( - arbiter_addr=arb_addr + registry_addrs=[reg_addr] ) as n: await n.run_in_actor( asyncio_actor, @@ -131,7 +131,7 @@ async def main(): assert err.type == AssertionError -def test_tractor_cancels_aio(arb_addr): +def test_tractor_cancels_aio(reg_addr): ''' Verify we can cancel a spawned asyncio task gracefully. @@ -150,7 +150,7 @@ async def main(): trio.run(main) -def test_trio_cancels_aio(arb_addr): +def test_trio_cancels_aio(reg_addr): ''' Much like the above test with ``tractor.Portal.cancel_actor()`` except we just use a standard ``trio`` cancellation api. @@ -206,7 +206,7 @@ async def trio_ctx( ids='parent_actor_cancels_child={}'.format ) def test_context_spawns_aio_task_that_errors( - arb_addr, + reg_addr, parent_cancels: bool, ): ''' @@ -288,7 +288,7 @@ async def aio_cancel(): await sleep_forever() -def test_aio_cancelled_from_aio_causes_trio_cancelled(arb_addr): +def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr): async def main(): async with tractor.open_nursery() as n: @@ -436,7 +436,7 @@ async def consume( 'fan_out', [False, True], ids='fan_out_w_chan_subscribe={}'.format ) -def test_basic_interloop_channel_stream(arb_addr, fan_out): +def test_basic_interloop_channel_stream(reg_addr, fan_out): async def main(): async with tractor.open_nursery() as n: portal = await n.run_in_actor( @@ -450,7 +450,7 @@ async def main(): # TODO: parametrize the above test and avoid the duplication here? -def test_trio_error_cancels_intertask_chan(arb_addr): +def test_trio_error_cancels_intertask_chan(reg_addr): async def main(): async with tractor.open_nursery() as n: portal = await n.run_in_actor( @@ -469,7 +469,7 @@ async def main(): assert exc.type == Exception -def test_trio_closes_early_and_channel_exits(arb_addr): +def test_trio_closes_early_and_channel_exits(reg_addr): async def main(): async with tractor.open_nursery() as n: portal = await n.run_in_actor( @@ -484,7 +484,7 @@ async def main(): trio.run(main) -def test_aio_errors_and_channel_propagates_and_closes(arb_addr): +def test_aio_errors_and_channel_propagates_and_closes(reg_addr): async def main(): async with tractor.open_nursery() as n: portal = await n.run_in_actor( @@ -561,7 +561,7 @@ async def aio_echo_server( ids='raise_error={}'.format, ) def test_echoserver_detailed_mechanics( - arb_addr, + reg_addr, raise_error_mid_stream, ): diff --git a/tests/test_legacy_one_way_streaming.py b/tests/test_legacy_one_way_streaming.py index 61fff75c..6092bca7 100644 --- a/tests/test_legacy_one_way_streaming.py +++ b/tests/test_legacy_one_way_streaming.py @@ -58,7 +58,7 @@ async def context_stream( async def stream_from_single_subactor( - arb_addr, + reg_addr, start_method, stream_func, ): @@ -67,7 +67,7 @@ async def stream_from_single_subactor( # only one per host address, spawns an actor if None async with tractor.open_nursery( - arbiter_addr=arb_addr, + registry_addrs=[reg_addr], start_method=start_method, ) as nursery: @@ -118,13 +118,13 @@ async def stream_from_single_subactor( @pytest.mark.parametrize( 'stream_func', [async_gen_stream, context_stream] ) -def test_stream_from_single_subactor(arb_addr, start_method, stream_func): +def test_stream_from_single_subactor(reg_addr, start_method, stream_func): """Verify streaming from a spawned async generator. """ trio.run( partial( stream_from_single_subactor, - arb_addr, + reg_addr, start_method, stream_func=stream_func, ), @@ -228,14 +228,14 @@ async def a_quadruple_example(): return result_stream -async def cancel_after(wait, arb_addr): - async with tractor.open_root_actor(arbiter_addr=arb_addr): +async def cancel_after(wait, reg_addr): + async with tractor.open_root_actor(registry_addrs=[reg_addr]): with trio.move_on_after(wait): return await a_quadruple_example() @pytest.fixture(scope='module') -def time_quad_ex(arb_addr, ci_env, spawn_backend): +def time_quad_ex(reg_addr, ci_env, spawn_backend): if spawn_backend == 'mp': """no idea but the mp *nix runs are flaking out here often... """ @@ -243,7 +243,7 @@ def time_quad_ex(arb_addr, ci_env, spawn_backend): timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4 start = time.time() - results = trio.run(cancel_after, timeout, arb_addr) + results = trio.run(cancel_after, timeout, reg_addr) diff = time.time() - start assert results return results, diff @@ -263,14 +263,14 @@ def test_a_quadruple_example(time_quad_ex, ci_env, spawn_backend): list(map(lambda i: i/10, range(3, 9))) ) def test_not_fast_enough_quad( - arb_addr, time_quad_ex, cancel_delay, ci_env, spawn_backend + reg_addr, time_quad_ex, cancel_delay, ci_env, spawn_backend ): """Verify we can cancel midway through the quad example and all actors cancel gracefully. """ results, diff = time_quad_ex delay = max(diff - cancel_delay, 0) - results = trio.run(cancel_after, delay, arb_addr) + results = trio.run(cancel_after, delay, reg_addr) system = platform.system() if system in ('Windows', 'Darwin') and results is not None: # In CI envoirments it seems later runs are quicker then the first @@ -283,7 +283,7 @@ def test_not_fast_enough_quad( @tractor_test async def test_respawn_consumer_task( - arb_addr, + reg_addr, spawn_backend, loglevel, ): diff --git a/tests/test_local.py b/tests/test_local.py index bb013043..a019d771 100644 --- a/tests/test_local.py +++ b/tests/test_local.py @@ -24,7 +24,7 @@ async def test_no_runtime(): @tractor_test -async def test_self_is_registered(arb_addr): +async def test_self_is_registered(reg_addr): "Verify waiting on the arbiter to register itself using the standard api." actor = tractor.current_actor() assert actor.is_arbiter @@ -34,20 +34,20 @@ async def test_self_is_registered(arb_addr): @tractor_test -async def test_self_is_registered_localportal(arb_addr): +async def test_self_is_registered_localportal(reg_addr): "Verify waiting on the arbiter to register itself using a local portal." actor = tractor.current_actor() assert actor.is_arbiter - async with tractor.get_arbiter(*arb_addr) as portal: + async with tractor.get_arbiter(*reg_addr) as portal: assert isinstance(portal, tractor._portal.LocalPortal) with trio.fail_after(0.2): sockaddr = await portal.run_from_ns( 'self', 'wait_for_actor', name='root') - assert sockaddr[0] == arb_addr + assert sockaddr[0] == reg_addr -def test_local_actor_async_func(arb_addr): +def test_local_actor_async_func(reg_addr): """Verify a simple async function in-process. """ nums = [] @@ -55,7 +55,7 @@ def test_local_actor_async_func(arb_addr): async def print_loop(): async with tractor.open_root_actor( - arbiter_addr=arb_addr, + registry_addrs=[reg_addr], ): # arbiter is started in-proc if dne assert tractor.current_actor().is_arbiter diff --git a/tests/test_multi_program.py b/tests/test_multi_program.py index d3eadabf..92f4c52d 100644 --- a/tests/test_multi_program.py +++ b/tests/test_multi_program.py @@ -30,9 +30,9 @@ def test_abort_on_sigint(daemon): @tractor_test -async def test_cancel_remote_arbiter(daemon, arb_addr): +async def test_cancel_remote_arbiter(daemon, reg_addr): assert not tractor.current_actor().is_arbiter - async with tractor.get_arbiter(*arb_addr) as portal: + async with tractor.get_arbiter(*reg_addr) as portal: await portal.cancel_actor() time.sleep(0.1) @@ -41,16 +41,16 @@ async def test_cancel_remote_arbiter(daemon, arb_addr): # no arbiter socket should exist with pytest.raises(OSError): - async with tractor.get_arbiter(*arb_addr) as portal: + async with tractor.get_arbiter(*reg_addr) as portal: pass -def test_register_duplicate_name(daemon, arb_addr): +def test_register_duplicate_name(daemon, reg_addr): async def main(): async with tractor.open_nursery( - arbiter_addr=arb_addr, + registry_addrs=[reg_addr], ) as n: assert not tractor.current_actor().is_arbiter diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index 69f4c513..6d416f89 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -159,7 +159,7 @@ async def test_required_args(callwith_expecterror): ) def test_multi_actor_subs_arbiter_pub( loglevel, - arb_addr, + reg_addr, pub_actor, ): """Try out the neato @pub decorator system. @@ -169,7 +169,7 @@ def test_multi_actor_subs_arbiter_pub( async def main(): async with tractor.open_nursery( - arbiter_addr=arb_addr, + registry_addrs=[reg_addr], enable_modules=[__name__], ) as n: @@ -254,12 +254,12 @@ async def main(): def test_single_subactor_pub_multitask_subs( loglevel, - arb_addr, + reg_addr, ): async def main(): async with tractor.open_nursery( - arbiter_addr=arb_addr, + registry_addrs=[reg_addr], enable_modules=[__name__], ) as n: diff --git a/tests/test_rpc.py b/tests/test_rpc.py index b16f2f1d..71f3258b 100644 --- a/tests/test_rpc.py +++ b/tests/test_rpc.py @@ -52,7 +52,7 @@ async def short_sleep(): 'fail_on_syntax', ], ) -def test_rpc_errors(arb_addr, to_call, testdir): +def test_rpc_errors(reg_addr, to_call, testdir): """Test errors when making various RPC requests to an actor that either doesn't have the requested module exposed or doesn't define the named function. @@ -84,7 +84,7 @@ async def main(): # spawn a subactor which calls us back async with tractor.open_nursery( - arbiter_addr=arb_addr, + arbiter_addr=reg_addr, enable_modules=exposed_mods.copy(), ) as n: diff --git a/tests/test_spawning.py b/tests/test_spawning.py index 3f4772e9..6a4b2988 100644 --- a/tests/test_spawning.py +++ b/tests/test_spawning.py @@ -16,14 +16,14 @@ async def spawn( is_arbiter: bool, data: dict, - arb_addr: tuple[str, int], + reg_addr: tuple[str, int], ): namespaces = [__name__] await trio.sleep(0.1) async with tractor.open_root_actor( - arbiter_addr=arb_addr, + arbiter_addr=reg_addr, ): actor = tractor.current_actor() @@ -41,7 +41,7 @@ async def spawn( is_arbiter=False, name='sub-actor', data=data, - arb_addr=arb_addr, + reg_addr=reg_addr, enable_modules=namespaces, ) @@ -55,12 +55,12 @@ async def spawn( return 10 -def test_local_arbiter_subactor_global_state(arb_addr): +def test_local_arbiter_subactor_global_state(reg_addr): result = trio.run( spawn, True, data_to_pass_down, - arb_addr, + reg_addr, ) assert result == 10 @@ -140,7 +140,7 @@ async def check_loglevel(level): def test_loglevel_propagated_to_subactor( start_method, capfd, - arb_addr, + reg_addr, ): if start_method == 'mp_forkserver': pytest.skip( @@ -152,7 +152,7 @@ async def main(): async with tractor.open_nursery( name='arbiter', start_method=start_method, - arbiter_addr=arb_addr, + arbiter_addr=reg_addr, ) as tn: await tn.run_in_actor( diff --git a/tests/test_task_broadcasting.py b/tests/test_task_broadcasting.py index 5e18e10a..d7a29134 100644 --- a/tests/test_task_broadcasting.py +++ b/tests/test_task_broadcasting.py @@ -66,13 +66,13 @@ async def ensure_sequence( async def open_sequence_streamer( sequence: list[int], - arb_addr: tuple[str, int], + reg_addr: tuple[str, int], start_method: str, ) -> tractor.MsgStream: async with tractor.open_nursery( - arbiter_addr=arb_addr, + arbiter_addr=reg_addr, start_method=start_method, ) as tn: @@ -93,7 +93,7 @@ async def open_sequence_streamer( def test_stream_fan_out_to_local_subscriptions( - arb_addr, + reg_addr, start_method, ): @@ -103,7 +103,7 @@ async def main(): async with open_sequence_streamer( sequence, - arb_addr, + reg_addr, start_method, ) as stream: @@ -138,7 +138,7 @@ async def main(): ] ) def test_consumer_and_parent_maybe_lag( - arb_addr, + reg_addr, start_method, task_delays, ): @@ -150,7 +150,7 @@ async def main(): async with open_sequence_streamer( sequence, - arb_addr, + reg_addr, start_method, ) as stream: @@ -211,7 +211,7 @@ async def main(): def test_faster_task_to_recv_is_cancelled_by_slower( - arb_addr, + reg_addr, start_method, ): ''' @@ -225,7 +225,7 @@ async def main(): async with open_sequence_streamer( sequence, - arb_addr, + reg_addr, start_method, ) as stream: @@ -302,7 +302,7 @@ async def main(): def test_ensure_slow_consumers_lag_out( - arb_addr, + reg_addr, start_method, ): '''This is a pure local task test; no tractor From 7ce4bc489e9fb3684f7ba1a9d2cda2e4257b5139 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Wed, 27 Sep 2023 15:19:30 -0400 Subject: [PATCH 02/24] Init-support for "multi homed" transports Since we'd like to eventually allow a diverse set of transport (protocol) methods and stacks, and a multi-peer discovery system for distributed actor-tree applications, this reworks all runtime internals to support multi-homing for any given tree on a logical host. In other words any actor can now bind its transport server (currently only unsecured TCP + `msgspec`) to more then one address available in its (linux) network namespace. Further, registry actors (now dubbed "registars" instead of "arbiters") can also similarly bind to multiple network addresses and provide discovery services to remote actors via multiple addresses which can now be provided at runtime startup. Deats: - adjust `._runtime` internals to use a `list[tuple[str, int]]` (and thus pluralized) socket address sequence where applicable for transport server socket binds, now exposed via `Actor.accept_addrs`: - `Actor.__init__()` now takes a `registry_addrs: list`. - `Actor.is_arbiter` -> `.is_registrar`. - `._arb_addr` -> `._reg_addrs: list[tuple]`. - always reg and de-reg from all registrars in `async_main()`. - only set the global runtime var `'_root_mailbox'` to the loopback address since normally all in-tree processes should have access to it, right? - `._serve_forever()` task now takes `listen_sockaddrs: list[tuple]` - make `open_root_actor()` take a `registry_addrs: list[tuple[str, int]]` and defaults when not passed. - change `ActorNursery.start_..()` methods take `bind_addrs: list` and pass down through the spawning layer(s) via the parent-seed-msg. - generalize all `._discovery()` APIs to accept `registry_addrs`-like inputs and move all relevant subsystems to adopt the "registry" style naming instead of "arbiter": - make `find_actor()` support batched concurrent portal queries over all provided input addresses using `.trionics.gather_contexts()` Bo - syntax: move to using `async with <tuples>` 3.9+ style chained @acms. - a general modernization of the code to a python 3.9+ style. - start deprecation and change to "registry" naming / semantics: - `._discovery.get_arbiter()` -> `.get_registry()` --- tractor/_discovery.py | 192 ++++++++++++++++++------- tractor/_entry.py | 10 +- tractor/_ipc.py | 4 +- tractor/_portal.py | 7 +- tractor/_root.py | 147 ++++++++++++------- tractor/_runtime.py | 322 ++++++++++++++++++++++++++++-------------- tractor/_spawn.py | 21 ++- tractor/_supervise.py | 23 ++- 8 files changed, 489 insertions(+), 237 deletions(-) diff --git a/tractor/_discovery.py b/tractor/_discovery.py index 03775ac2..22ab88d1 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -15,16 +15,19 @@ # along with this program. If not, see <https://www.gnu.org/licenses/>. """ -Actor discovery API. +Discovery (protocols) API for automatic addressing and location +management of (service) actors. """ +from __future__ import annotations from typing import ( - Optional, - Union, AsyncGenerator, + TYPE_CHECKING, ) from contextlib import asynccontextmanager as acm +import warnings +from .trionics import gather_contexts from ._ipc import _connect_chan, Channel from ._portal import ( Portal, @@ -34,13 +37,19 @@ from ._state import current_actor, _runtime_vars -@acm -async def get_arbiter( +if TYPE_CHECKING: + from ._runtime import Actor + +@acm +async def get_registry( host: str, port: int, -) -> AsyncGenerator[Union[Portal, LocalPortal], None]: +) -> AsyncGenerator[ + Portal | LocalPortal | None, + None, +]: ''' Return a portal instance connected to a local or remote arbiter. @@ -51,16 +60,23 @@ async def get_arbiter( if not actor: raise RuntimeError("No actor instance has been defined yet?") - if actor.is_arbiter: + if actor.is_registrar: # we're already the arbiter # (likely a re-entrant call from the arbiter actor) - yield LocalPortal(actor, Channel((host, port))) + yield LocalPortal( + actor, + Channel((host, port)) + ) else: - async with _connect_chan(host, port) as chan: + async with ( + _connect_chan(host, port) as chan, + open_portal(chan) as regstr_ptl, + ): + yield regstr_ptl - async with open_portal(chan) as arb_portal: - yield arb_portal +# TODO: deprecate and remove _arbiter form +get_arbiter = get_registry @acm @@ -68,51 +84,81 @@ async def get_root( **kwargs, ) -> AsyncGenerator[Portal, None]: + # TODO: rename mailbox to `_root_maddr` when we finally + # add and impl libp2p multi-addrs? host, port = _runtime_vars['_root_mailbox'] assert host is not None - async with _connect_chan(host, port) as chan: - async with open_portal(chan, **kwargs) as portal: - yield portal + async with ( + _connect_chan(host, port) as chan, + open_portal(chan, **kwargs) as portal, + ): + yield portal @acm async def query_actor( name: str, - arbiter_sockaddr: Optional[tuple[str, int]] = None, + arbiter_sockaddr: tuple[str, int] | None = None, + regaddr: tuple[str, int] | None = None, -) -> AsyncGenerator[tuple[str, int], None]: +) -> AsyncGenerator[ + tuple[str, int] | None, + None, +]: ''' - Simple address lookup for a given actor name. + Make a transport address lookup for an actor name to a specific + registrar. - Returns the (socket) address or ``None``. + Returns the (socket) address or ``None`` if no entry under that + name exists for the given registrar listening @ `regaddr`. ''' - actor = current_actor() - async with get_arbiter( - *arbiter_sockaddr or actor._arb_addr - ) as arb_portal: + actor: Actor = current_actor() + if ( + name == 'registrar' + and actor.is_registrar + ): + raise RuntimeError( + 'The current actor IS the registry!?' + ) - sockaddr = await arb_portal.run_from_ns( + if arbiter_sockaddr is not None: + warnings.warn( + '`tractor.query_actor(regaddr=<blah>)` is deprecated.\n' + 'Use `registry_addrs: list[tuple]` instead!', + DeprecationWarning, + stacklevel=2, + ) + regaddr: list[tuple[str, int]] = arbiter_sockaddr + + regstr: Portal + async with get_registry( + *(regaddr or actor._reg_addrs[0]) + ) as regstr: + + # TODO: return portals to all available actors - for now + # just the last one that registered + sockaddr: tuple[str, int] = await regstr.run_from_ns( 'self', 'find_actor', name=name, ) - - # TODO: return portals to all available actors - for now just - # the last one that registered - if name == 'arbiter' and actor.is_arbiter: - raise RuntimeError("The current actor is the arbiter") - - yield sockaddr if sockaddr else None + yield sockaddr @acm async def find_actor( name: str, - arbiter_sockaddr: tuple[str, int] | None = None + arbiter_sockaddr: tuple[str, int] | None = None, + registry_addrs: list[tuple[str, int]] | None = None, -) -> AsyncGenerator[Optional[Portal], None]: + only_first: bool = True, + +) -> AsyncGenerator[ + Portal | list[Portal] | None, + None, +]: ''' Ask the arbiter to find actor(s) by name. @@ -120,24 +166,54 @@ async def find_actor( known to the arbiter. ''' - async with query_actor( - name=name, - arbiter_sockaddr=arbiter_sockaddr, - ) as sockaddr: - - if sockaddr: - async with _connect_chan(*sockaddr) as chan: - async with open_portal(chan) as portal: - yield portal - else: + if arbiter_sockaddr is not None: + warnings.warn( + '`tractor.find_actor(arbiter_sockaddr=<blah>)` is deprecated.\n' + 'Use `registry_addrs: list[tuple]` instead!', + DeprecationWarning, + stacklevel=2, + ) + registry_addrs: list[tuple[str, int]] = [arbiter_sockaddr] + + @acm + async def maybe_open_portal_from_reg_addr( + addr: tuple[str, int], + ): + async with query_actor( + name=name, + regaddr=addr, + ) as sockaddr: + if sockaddr: + async with _connect_chan(*sockaddr) as chan: + async with open_portal(chan) as portal: + yield portal + else: + yield None + + async with gather_contexts( + mngrs=list( + maybe_open_portal_from_reg_addr(addr) + for addr in registry_addrs + ) + ) as maybe_portals: + print(f'Portalz: {maybe_portals}') + if not maybe_portals: yield None + return + + portals: list[Portal] = list(maybe_portals) + if only_first: + yield portals[0] + + else: + yield portals @acm async def wait_for_actor( name: str, arbiter_sockaddr: tuple[str, int] | None = None, - # registry_addr: tuple[str, int] | None = None, + registry_addr: tuple[str, int] | None = None, ) -> AsyncGenerator[Portal, None]: ''' @@ -146,17 +222,33 @@ async def wait_for_actor( A portal to the first registered actor is returned. ''' - actor = current_actor() - - async with get_arbiter( - *arbiter_sockaddr or actor._arb_addr, - ) as arb_portal: - sockaddrs = await arb_portal.run_from_ns( + actor: Actor = current_actor() + + if arbiter_sockaddr is not None: + warnings.warn( + '`tractor.wait_for_actor(arbiter_sockaddr=<foo>)` is deprecated.\n' + 'Use `registry_addr: tuple` instead!', + DeprecationWarning, + stacklevel=2, + ) + registry_addr: list[tuple[str, int]] = [ + arbiter_sockaddr, + ] + + # TODO: use `.trionics.gather_contexts()` like + # above in `find_actor()` as well? + async with get_registry( + *(registry_addr or actor._reg_addrs[0]), # first if not passed + ) as reg_portal: + sockaddrs = await reg_portal.run_from_ns( 'self', 'wait_for_actor', name=name, ) - sockaddr = sockaddrs[-1] + + # get latest registered addr by default? + # TODO: offer multi-portal yields in multi-homed case? + sockaddr: tuple[str, int] = sockaddrs[-1] async with _connect_chan(*sockaddr) as chan: async with open_portal(chan) as portal: diff --git a/tractor/_entry.py b/tractor/_entry.py index a59975ce..0ac0dc47 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -47,8 +47,8 @@ def _mp_main( - actor: Actor, # type: ignore - accept_addr: tuple[str, int], + actor: Actor, + accept_addrs: list[tuple[str, int]], forkserver_info: tuple[Any, Any, Any, Any, Any], start_method: SpawnMethodKey, parent_addr: tuple[str, int] | None = None, @@ -77,8 +77,8 @@ def _mp_main( log.debug(f"parent_addr is {parent_addr}") trio_main = partial( async_main, - actor, - accept_addr, + actor=actor, + accept_addrs=accept_addrs, parent_addr=parent_addr ) try: @@ -96,7 +96,7 @@ def _mp_main( def _trio_main( - actor: Actor, # type: ignore + actor: Actor, *, parent_addr: tuple[str, int] | None = None, infect_asyncio: bool = False, diff --git a/tractor/_ipc.py b/tractor/_ipc.py index e80a1c35..f57d3bd8 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -517,7 +517,9 @@ def connected(self) -> bool: @acm async def _connect_chan( - host: str, port: int + host: str, + port: int + ) -> typing.AsyncGenerator[Channel, None]: ''' Create and connect a channel with disconnect on context manager diff --git a/tractor/_portal.py b/tractor/_portal.py index 0ca44483..ac602dd5 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -461,7 +461,12 @@ class LocalPortal: actor: 'Actor' # type: ignore # noqa channel: Channel - async def run_from_ns(self, ns: str, func_name: str, **kwargs) -> Any: + async def run_from_ns( + self, + ns: str, + func_name: str, + **kwargs, + ) -> Any: ''' Run a requested local function from a namespace path and return it's result. diff --git a/tractor/_root.py b/tractor/_root.py index a1a11d3b..403907ec 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -59,10 +59,10 @@ async def open_root_actor( *, # defaults are above - arbiter_addr: tuple[str, int] | None = None, + registry_addrs: list[tuple[str, int]] | None = None, # defaults are above - registry_addr: tuple[str, int] | None = None, + arbiter_addr: tuple[str, int] | None = None, name: str | None = 'root', @@ -116,19 +116,19 @@ async def open_root_actor( if arbiter_addr is not None: warnings.warn( - '`arbiter_addr` is now deprecated and has been renamed to' - '`registry_addr`.\nUse that instead..', + '`arbiter_addr` is now deprecated\n' + 'Use `registry_addrs: list[tuple]` instead..', DeprecationWarning, stacklevel=2, ) + registry_addrs = [arbiter_addr] - registry_addr = (host, port) = ( - registry_addr - or arbiter_addr - or ( + registry_addrs: list[tuple[str, int]] = ( + registry_addrs + or [ # default on localhost _default_arbiter_host, _default_arbiter_port, - ) + ] ) loglevel = ( @@ -177,60 +177,105 @@ async def open_root_actor( '`stackscope` not installed for use in debug mode!' ) - try: - # make a temporary connection to see if an arbiter exists, - # if one can't be made quickly we assume none exists. - arbiter_found = False - - # TODO: this connect-and-bail forces us to have to carefully - # rewrap TCP 104-connection-reset errors as EOF so as to avoid - # propagating cancel-causing errors to the channel-msg loop - # machinery. Likely it would be better to eventually have - # a "discovery" protocol with basic handshake instead. - with trio.move_on_after(1): - async with _connect_chan(host, port): - arbiter_found = True - - except OSError: - # TODO: make this a "discovery" log level? - logger.warning(f"No actor registry found @ {host}:{port}") - - # create a local actor and start up its main routine/task - if arbiter_found: + # closed into below ping task-func + ponged_addrs: list[tuple[str, int]] = [] + + async def ping_tpt_socket( + addr: tuple[str, int], + timeout: float = 1, + ) -> None: + ''' + Attempt temporary connection to see if a registry is + listening at the requested address by a tranport layer + ping. + + If a connection can't be made quickly we assume none no + server is listening at that addr. + + ''' + try: + # TODO: this connect-and-bail forces us to have to + # carefully rewrap TCP 104-connection-reset errors as + # EOF so as to avoid propagating cancel-causing errors + # to the channel-msg loop machinery. Likely it would + # be better to eventually have a "discovery" protocol + # with basic handshake instead? + with trio.move_on_after(timeout): + async with _connect_chan(*addr): + ponged_addrs.append(addr) + + except OSError: + # TODO: make this a "discovery" log level? + logger.warning(f'No actor registry found @ {addr}') + + async with trio.open_nursery() as tn: + for addr in registry_addrs: + tn.start_soon(ping_tpt_socket, addr) + + trans_bind_addrs: list[tuple[str, int]] = [] + + # Create a new local root-actor instance which IS NOT THE + # REGISTRAR + if ponged_addrs: # we were able to connect to an arbiter - logger.info(f"Arbiter seems to exist @ {host}:{port}") + logger.info( + f'Registry(s) seem(s) to exist @ {ponged_addrs}' + ) actor = Actor( - name or 'anonymous', - arbiter_addr=registry_addr, + name=name or 'anonymous', + registry_addrs=ponged_addrs, loglevel=loglevel, enable_modules=enable_modules, ) - host, port = (host, 0) - + # DO NOT use the registry_addrs as the transport server + # addrs for this new non-registar, root-actor. + for host, port in ponged_addrs: + # NOTE: zero triggers dynamic OS port allocation + trans_bind_addrs.append((host, 0)) + + # Start this local actor as the "registrar", aka a regular + # actor who manages the local registry of "mailboxes" of + # other process-tree-local sub-actors. else: - # start this local actor as the arbiter (aka a regular actor who - # manages the local registry of "mailboxes") - # Note that if the current actor is the arbiter it is desirable - # for it to stay up indefinitely until a re-election process has - # taken place - which is not implemented yet FYI). + # NOTE that if the current actor IS THE REGISTAR, the + # following init steps are taken: + # - the tranport layer server is bound to each (host, port) + # pair defined in provided registry_addrs, or the default. + trans_bind_addrs = registry_addrs + + # - it is normally desirable for any registrar to stay up + # indefinitely until either all registered (child/sub) + # actors are terminated (via SC supervision) or, + # a re-election process has taken place. + # NOTE: all of ^ which is not implemented yet - see: + # https://github.com/goodboy/tractor/issues/216 + # https://github.com/goodboy/tractor/pull/348 + # https://github.com/goodboy/tractor/issues/296 actor = Arbiter( - name or 'arbiter', - arbiter_addr=registry_addr, + name or 'registrar', + registry_addrs=registry_addrs, loglevel=loglevel, enable_modules=enable_modules, ) + # Start up main task set via core actor-runtime nurseries. try: # assign process-local actor _state._current_actor = actor # start local channel-server and fake the portal API # NOTE: this won't block since we provide the nursery - logger.info(f"Starting local {actor} @ {host}:{port}") + ml_addrs_str: str = '\n'.join( + f'@{addr}' for addr in trans_bind_addrs + ) + logger.info( + f'Starting local {actor.uid} on the following transport addrs:\n' + f'{ml_addrs_str}' + ) # start the actor runtime in a new task async with trio.open_nursery() as nursery: @@ -243,7 +288,7 @@ async def open_root_actor( partial( async_main, actor, - accept_addr=(host, port), + accept_addrs=trans_bind_addrs, parent_addr=None ) ) @@ -255,7 +300,7 @@ async def open_root_actor( BaseExceptionGroup, ) as err: - entered = await _debug._maybe_enter_pm(err) + entered: bool = await _debug._maybe_enter_pm(err) if ( not entered and @@ -263,7 +308,8 @@ async def open_root_actor( ): logger.exception('Root actor crashed:\n') - # always re-raise + # ALWAYS re-raise any error bubbled up from the + # runtime! raise finally: @@ -284,7 +330,7 @@ async def open_root_actor( _state._current_actor = None _state._last_actor_terminated = actor - # restore breakpoint hook state + # restore built-in `breakpoint()` hook state sys.breakpointhook = builtin_bp_handler if orig_bp_path is not None: os.environ['PYTHONBREAKPOINT'] = orig_bp_path @@ -300,10 +346,9 @@ def run_daemon( # runtime kwargs name: str | None = 'root', - registry_addr: tuple[str, int] = ( - _default_arbiter_host, - _default_arbiter_port, - ), + registry_addrs: list[tuple[str, int]] = [ + (_default_arbiter_host, _default_arbiter_port) + ], start_method: str | None = None, debug_mode: bool = False, @@ -327,7 +372,7 @@ def run_daemon( async def _main(): async with open_root_actor( - registry_addr=registry_addr, + registry_addrs=registry_addrs, name=name, start_method=start_method, debug_mode=debug_mode, diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 6b3d9461..d2a9e405 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -45,6 +45,7 @@ from itertools import chain import importlib import importlib.util +import os from pprint import pformat import signal import sys @@ -55,7 +56,7 @@ ) import uuid from types import ModuleType -import os +import warnings import trio from trio import ( @@ -77,8 +78,8 @@ ContextCancelled, TransportClosed, ) -from ._discovery import get_arbiter from .devx import _debug +from ._discovery import get_registry from ._portal import Portal from . import _state from . import _mp_fixup_main @@ -127,6 +128,11 @@ class Actor: # ugh, we need to get rid of this and replace with a "registry" sys # https://github.com/goodboy/tractor/issues/216 is_arbiter: bool = False + + @property + def is_registrar(self) -> bool: + return self.is_arbiter + msg_buffer_size: int = 2**6 # nursery placeholders filled in by `async_main()` after fork @@ -164,8 +170,12 @@ def __init__( enable_modules: list[str] = [], uid: str | None = None, loglevel: str | None = None, + registry_addrs: list[tuple[str, int]] | None = None, + spawn_method: str | None = None, + + # TODO: remove! arbiter_addr: tuple[str, int] | None = None, - spawn_method: str | None = None + ) -> None: ''' This constructor is called in the parent actor **before** the spawning @@ -189,27 +199,36 @@ def __init__( # always include debugging tools module enable_modules.append('tractor.devx._debug') - mods = {} + self.enable_modules: dict[str, str] = {} for name in enable_modules: - mod = importlib.import_module(name) - mods[name] = _get_mod_abspath(mod) + mod: ModuleType = importlib.import_module(name) + self.enable_modules[name] = _get_mod_abspath(mod) - self.enable_modules = mods self._mods: dict[str, ModuleType] = {} - self.loglevel = loglevel + self.loglevel: str = loglevel + + if arbiter_addr is not None: + warnings.warn( + '`Actor(arbiter_addr=<blah>)` is now deprecated.\n' + 'Use `registry_addrs: list[tuple]` instead.', + DeprecationWarning, + stacklevel=2, + ) + registry_addrs: list[tuple[str, int]] = [arbiter_addr] - self._arb_addr: tuple[str, int] | None = ( - str(arbiter_addr[0]), - int(arbiter_addr[1]) - ) if arbiter_addr else None + self._reg_addrs: list[tuple[str, int]] = ( + registry_addrs + or + None + ) # marked by the process spawning backend at startup # will be None for the parent most process started manually # by the user (currently called the "arbiter") - self._spawn_method = spawn_method + self._spawn_method: str = spawn_method self._peers: defaultdict = defaultdict(list) - self._peer_connected: dict = {} + self._peer_connected: dict[tuple[str, str], trio.Event] = {} self._no_more_peers = trio.Event() self._no_more_peers.set() self._ongoing_rpc_tasks = trio.Event() @@ -336,6 +355,12 @@ async def _stream_handler( self._no_more_peers = trio.Event() # unset by making new chan = Channel.from_stream(stream) their_uid: tuple[str, str]|None = chan.uid + if their_uid: + log.warning( + f'Re-connection from already known {their_uid}' + ) + else: + log.runtime(f'New connection to us @{chan.raddr}') con_msg: str = '' if their_uid: @@ -880,11 +905,11 @@ async def _from_parent( ) await chan.connect() + # TODO: move this into a `Channel.handshake()`? # Initial handshake: swap names. await self._do_handshake(chan) - accept_addr: tuple[str, int] | None = None - + accept_addrs: list[tuple[str, int]] | None = None if self._spawn_method == "trio": # Receive runtime state from our parent parent_data: dict[str, Any] @@ -897,10 +922,7 @@ async def _from_parent( # if "trace"/"util" mode is enabled? f'{pformat(parent_data)}\n' ) - accept_addr = ( - parent_data.pop('bind_host'), - parent_data.pop('bind_port'), - ) + accept_addrs: list[tuple[str, int]] = parent_data.pop('bind_addrs') rvs = parent_data.pop('_runtime_vars') if rvs['_debug_mode']: @@ -919,17 +941,18 @@ async def _from_parent( for attr, value in parent_data.items(): - if attr == '_arb_addr': + if attr == '_reg_addrs': # XXX: ``msgspec`` doesn't support serializing tuples # so just cash manually here since it's what our # internals expect. - value = tuple(value) if value else None - self._arb_addr = value + self._reg_addrs = [ + tuple(val) for val in value + ] if value else None else: setattr(self, attr, value) - return chan, accept_addr + return chan, accept_addrs except OSError: # failed to connect log.warning( @@ -946,8 +969,8 @@ async def _serve_forever( handler_nursery: Nursery, *, # (host, port) to bind for channel server - accept_host: tuple[str, int] | None = None, - accept_port: int = 0, + listen_sockaddrs: list[tuple[str, int]] | None = None, + task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED, ) -> None: ''' @@ -958,30 +981,40 @@ async def _serve_forever( `.cancel_server()` is called. ''' + if listen_sockaddrs is None: + listen_sockaddrs = [(None, 0)] + self._server_down = trio.Event() try: async with trio.open_nursery() as server_n: - listeners: list[trio.abc.Listener] = await server_n.start( - partial( - trio.serve_tcp, - self._stream_handler, - # new connections will stay alive even if this server - # is cancelled - handler_nursery=handler_nursery, - port=accept_port, - host=accept_host, + + for host, port in listen_sockaddrs: + listeners: list[trio.abc.Listener] = await server_n.start( + partial( + trio.serve_tcp, + + handler=self._stream_handler, + port=port, + host=host, + + # NOTE: configured such that new + # connections will stay alive even if + # this server is cancelled! + handler_nursery=handler_nursery, + ) ) - ) - sockets: list[trio.socket] = [ - getattr(listener, 'socket', 'unknown socket') - for listener in listeners - ] - log.runtime( - 'Started TCP server(s)\n' - f'|_{sockets}\n' - ) - self._listeners.extend(listeners) + sockets: list[trio.socket] = [ + getattr(listener, 'socket', 'unknown socket') + for listener in listeners + ] + log.runtime( + 'Started TCP server(s)\n' + f'|_{sockets}\n' + ) + self._listeners.extend(listeners) + task_status.started(server_n) + finally: # signal the server is down since nursery above terminated self._server_down.set() @@ -1318,6 +1351,19 @@ def cancel_server(self) -> None: log.runtime("Shutting down channel server") self._server_n.cancel_scope.cancel() + @property + def accept_addrs(self) -> list[tuple[str, int]]: + ''' + All addresses to which the transport-channel server binds + and listens for new connections. + + ''' + # throws OSError on failure + return [ + listener.socket.getsockname() + for listener in self._listeners + ] # type: ignore + @property def accept_addr(self) -> tuple[str, int]: ''' @@ -1326,7 +1372,7 @@ def accept_addr(self) -> tuple[str, int]: ''' # throws OSError on failure - return self._listeners[0].socket.getsockname() # type: ignore + return self.accept_addrs[0] def get_parent(self) -> Portal: ''' @@ -1343,6 +1389,7 @@ def get_chans(self, uid: tuple[str, str]) -> list[Channel]: ''' return self._peers[uid] + # TODO: move to `Channel.handshake(uid)` async def _do_handshake( self, chan: Channel @@ -1379,7 +1426,7 @@ def is_infected_aio(self) -> bool: async def async_main( actor: Actor, - accept_addr: tuple[str, int] | None = None, + accept_addrs: tuple[str, int] | None = None, # XXX: currently ``parent_addr`` is only needed for the # ``multiprocessing`` backend (which pickles state sent to @@ -1407,20 +1454,25 @@ async def async_main( # on our debugger lock state. _debug.Lock._trio_handler = signal.getsignal(signal.SIGINT) - registered_with_arbiter = False + is_registered: bool = False try: # establish primary connection with immediate parent - actor._parent_chan = None + actor._parent_chan: Channel | None = None if parent_addr is not None: - actor._parent_chan, accept_addr_rent = await actor._from_parent( - parent_addr) + ( + actor._parent_chan, + set_accept_addr_says_rent, + ) = await actor._from_parent(parent_addr) - # either it's passed in because we're not a child - # or because we're running in mp mode - if accept_addr_rent is not None: - accept_addr = accept_addr_rent + # either it's passed in because we're not a child or + # because we're running in mp mode + if ( + set_accept_addr_says_rent + and set_accept_addr_says_rent is not None + ): + accept_addrs = set_accept_addr_says_rent # The "root" nursery ensures the channel with the immediate # parent is kept alive as a resilient service until @@ -1460,34 +1512,58 @@ async def async_main( # - subactor: the bind address is sent by our parent # over our established channel # - root actor: the ``accept_addr`` passed to this method - assert accept_addr - host, port = accept_addr + assert accept_addrs actor._server_n = await service_nursery.start( partial( actor._serve_forever, service_nursery, - accept_host=host, - accept_port=port + listen_sockaddrs=accept_addrs, ) ) - accept_addr = actor.accept_addr + accept_addrs: list[tuple[str, int]] = actor.accept_addrs + + # NOTE: only set the loopback addr for the + # process-tree-global "root" mailbox since + # all sub-actors should be able to speak to + # their root actor over that channel. if _state._runtime_vars['_is_root']: - _state._runtime_vars['_root_mailbox'] = accept_addr + for addr in accept_addrs: + host, _ = addr + # TODO: generic 'lo' detector predicate + if '127.0.0.1' in host: + _state._runtime_vars['_root_mailbox'] = addr # Register with the arbiter if we're told its addr - log.runtime(f"Registering {actor} for role `{actor.name}`") - assert isinstance(actor._arb_addr, tuple) - - async with get_arbiter(*actor._arb_addr) as arb_portal: - await arb_portal.run_from_ns( - 'self', - 'register_actor', - uid=actor.uid, - sockaddr=accept_addr, - ) + log.runtime( + f'Registering `{actor.name}` ->\n' + f'{pformat(accept_addrs)}' + ) + + # TODO: ideally we don't fan out to all registrars + # if addresses point to the same actor.. + # So we need a way to detect that? maybe iterate + # only on unique actor uids? + for addr in actor._reg_addrs: + assert isinstance(addr, tuple) + assert addr[1] # non-zero after bind + + async with get_registry(*addr) as reg_portal: + for accept_addr in accept_addrs: + + if not accept_addr[1]: + await _debug.pause() + + assert accept_addr[1] - registered_with_arbiter = True + await reg_portal.run_from_ns( + 'self', + 'register_actor', + uid=actor.uid, + sockaddr=accept_addr, + ) + + is_registered: bool = True # init steps complete task_status.started() @@ -1520,18 +1596,18 @@ async def async_main( log.runtime("Closing all actor lifetime contexts") actor.lifetime_stack.close() - if not registered_with_arbiter: + if not is_registered: # TODO: I guess we could try to connect back # to the parent through a channel and engage a debugger # once we have that all working with std streams locking? log.exception( f"Actor errored and failed to register with arbiter " - f"@ {actor._arb_addr}?") + f"@ {actor._reg_addrs[0]}?") log.error( "\n\n\t^^^ THIS IS PROBABLY A TRACTOR BUGGGGG!!! ^^^\n" "\tCALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN.\n\n" - "\tYOUR PARENT CODE IS GOING TO KEEP WORKING FINE!!!\n" - "\tTHIS IS HOW RELIABlE SYSTEMS ARE SUPPOSED TO WORK!?!?\n" + "\tIf this is a sub-actor likely its parent will keep running " + "\tcorrectly if this error is caught and ignored.." ) if actor._parent_chan: @@ -1571,27 +1647,33 @@ async def async_main( # Unregister actor from the registry-sys / registrar. if ( - registered_with_arbiter - and not actor.is_arbiter + is_registered + and not actor.is_registrar ): - failed = False - assert isinstance(actor._arb_addr, tuple) - with trio.move_on_after(0.5) as cs: - cs.shield = True - try: - async with get_arbiter(*actor._arb_addr) as arb_portal: - await arb_portal.run_from_ns( - 'self', - 'unregister_actor', - uid=actor.uid - ) - except OSError: + failed: bool = False + for addr in actor._reg_addrs: + assert isinstance(addr, tuple) + with trio.move_on_after(0.5) as cs: + cs.shield = True + try: + async with get_registry( + *addr, + ) as reg_portal: + await reg_portal.run_from_ns( + 'self', + 'unregister_actor', + uid=actor.uid + ) + except OSError: + failed = True + if cs.cancelled_caught: failed = True - if cs.cancelled_caught: - failed = True - if failed: - log.warning( - f"Failed to unregister {actor.name} from arbiter") + + if failed: + log.warning( + f'Failed to unregister {actor.name} from ' + f'registar @ {addr}' + ) # Ensure all peers (actors connected to us as clients) are finished if not actor._no_more_peers.is_set(): @@ -1610,18 +1692,36 @@ async def async_main( # TODO: rename to `Registry` and move to `._discovery`! class Arbiter(Actor): ''' - A special actor who knows all the other actors and always has - access to a top level nursery. - - The arbiter is by default the first actor spawned on each host - and is responsible for keeping track of all other actors for - coordination purposes. If a new main process is launched and an - arbiter is already running that arbiter will be used. + A special registrar actor who can contact all other actors + within its immediate process tree and possibly keeps a registry + of others meant to be discoverable in a distributed + application. Normally the registrar is also the "root actor" + and thus always has access to the top-most-level actor + (process) nursery. + + By default, the registrar is always initialized when and if no + other registrar socket addrs have been specified to runtime + init entry-points (such as `open_root_actor()` or + `open_nursery()`). Any time a new main process is launched (and + thus thus a new root actor created) and, no existing registrar + can be contacted at the provided `registry_addr`, then a new + one is always created; however, if one can be reached it is + used. + + Normally a distributed app requires at least registrar per + logical host where for that given "host space" (aka localhost + IPC domain of addresses) it is responsible for making all other + host (local address) bound actors *discoverable* to external + actor trees running on remote hosts. ''' is_arbiter = True - def __init__(self, *args, **kwargs) -> None: + def __init__( + self, + *args, + **kwargs, + ) -> None: self._registry: dict[ tuple[str, str], @@ -1663,7 +1763,10 @@ async def get_registry( # unpacker since we have tuples as keys (not this makes the # arbiter suscetible to hashdos): # https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10 - return {'.'.join(key): val for key, val in self._registry.items()} + return { + '.'.join(key): val + for key, val in self._registry.items() + } async def wait_for_actor( self, @@ -1706,8 +1809,15 @@ async def register_actor( sockaddr: tuple[str, int] ) -> None: - uid = name, _ = (str(uid[0]), str(uid[1])) - self._registry[uid] = (str(sockaddr[0]), int(sockaddr[1])) + uid = name, hash = (str(uid[0]), str(uid[1])) + addr = (host, port) = ( + str(sockaddr[0]), + int(sockaddr[1]), + ) + if port == 0: + await _debug.pause() + assert port # should never be 0-dynamic-os-alloc + self._registry[uid] = addr # pop and signal all waiter events events = self._waiters.pop(name, []) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 78c38c84..001a0f10 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -365,7 +365,7 @@ async def new_proc( errors: dict[tuple[str, str], Exception], # passed through to actor main - bind_addr: tuple[str, int], + bind_addrs: list[tuple[str, int]], parent_addr: tuple[str, int], _runtime_vars: dict[str, Any], # serialized and sent to _child @@ -387,7 +387,7 @@ async def new_proc( actor_nursery, subactor, errors, - bind_addr, + bind_addrs, parent_addr, _runtime_vars, # run time vars infect_asyncio=infect_asyncio, @@ -402,7 +402,7 @@ async def trio_proc( errors: dict[tuple[str, str], Exception], # passed through to actor main - bind_addr: tuple[str, int], + bind_addrs: list[tuple[str, int]], parent_addr: tuple[str, int], _runtime_vars: dict[str, Any], # serialized and sent to _child *, @@ -491,12 +491,11 @@ async def trio_proc( # send additional init params await chan.send({ - "_parent_main_data": subactor._parent_main_data, - "enable_modules": subactor.enable_modules, - "_arb_addr": subactor._arb_addr, - "bind_host": bind_addr[0], - "bind_port": bind_addr[1], - "_runtime_vars": _runtime_vars, + '_parent_main_data': subactor._parent_main_data, + 'enable_modules': subactor.enable_modules, + '_reg_addrs': subactor._reg_addrs, + 'bind_addrs': bind_addrs, + '_runtime_vars': _runtime_vars, }) # track subactor in current nursery @@ -602,7 +601,7 @@ async def mp_proc( subactor: Actor, errors: dict[tuple[str, str], Exception], # passed through to actor main - bind_addr: tuple[str, int], + bind_addrs: list[tuple[str, int]], parent_addr: tuple[str, int], _runtime_vars: dict[str, Any], # serialized and sent to _child *, @@ -660,7 +659,7 @@ async def mp_proc( target=_mp_main, args=( subactor, - bind_addr, + bind_addrs, fs_info, _spawn_method, parent_addr, diff --git a/tractor/_supervise.py b/tractor/_supervise.py index c8c2336d..615ba692 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -22,10 +22,7 @@ from functools import partial import inspect from pprint import pformat -from typing import ( - Optional, - TYPE_CHECKING, -) +from typing import TYPE_CHECKING import typing import warnings @@ -97,7 +94,7 @@ def __init__( tuple[ Actor, trio.Process | mp.Process, - Optional[Portal], + Portal | None, ] ] = {} # portals spawned with ``run_in_actor()`` are @@ -121,12 +118,12 @@ async def start_actor( self, name: str, *, - bind_addr: tuple[str, int] = _default_bind_addr, + bind_addrs: list[tuple[str, int]] = [_default_bind_addr], rpc_module_paths: list[str] | None = None, enable_modules: list[str] | None = None, loglevel: str | None = None, # set log level per subactor nursery: trio.Nursery | None = None, - debug_mode: Optional[bool] | None = None, + debug_mode: bool | None = None, infect_asyncio: bool = False, ) -> Portal: ''' @@ -161,7 +158,9 @@ async def start_actor( # modules allowed to invoked funcs from enable_modules=enable_modules, loglevel=loglevel, - arbiter_addr=current_actor()._arb_addr, + + # verbatim relay this actor's registrar addresses + registry_addrs=current_actor()._reg_addrs, ) parent_addr = self._actor.accept_addr assert parent_addr @@ -178,7 +177,7 @@ async def start_actor( self, subactor, self.errors, - bind_addr, + bind_addrs, parent_addr, _rtv, # run time vars infect_asyncio=infect_asyncio, @@ -191,8 +190,8 @@ async def run_in_actor( fn: typing.Callable, *, - name: Optional[str] = None, - bind_addr: tuple[str, int] = _default_bind_addr, + name: str | None = None, + bind_addrs: tuple[str, int] = [_default_bind_addr], rpc_module_paths: list[str] | None = None, enable_modules: list[str] | None = None, loglevel: str | None = None, # set log level per subactor @@ -221,7 +220,7 @@ async def run_in_actor( enable_modules=[mod_path] + ( enable_modules or rpc_module_paths or [] ), - bind_addr=bind_addr, + bind_addrs=bind_addrs, loglevel=loglevel, # use the run_in_actor nursery nursery=self._ria_nursery, From ef4c4be0bbaab2abb4596d3e9947b9a00b04b7d4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Fri, 29 Sep 2023 14:11:31 -0400 Subject: [PATCH 03/24] Add libp2p style "multi-address" parser from `piker` Details are in the module docs; this is a first draft with lotsa room for refinement and extension. --- tractor/_multiaddr.py | 142 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 142 insertions(+) create mode 100644 tractor/_multiaddr.py diff --git a/tractor/_multiaddr.py b/tractor/_multiaddr.py new file mode 100644 index 00000000..f6b37a35 --- /dev/null +++ b/tractor/_multiaddr.py @@ -0,0 +1,142 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. + +''' +Multiaddress parser and utils according the spec(s) defined by +`libp2p` and used in dependent project such as `ipfs`: + +- https://docs.libp2p.io/concepts/fundamentals/addressing/ +- https://github.com/libp2p/specs/blob/master/addressing/README.md + +''' +from typing import Iterator + +from bidict import bidict + +# TODO: see if we can leverage libp2p ecosys projects instead of +# rolling our own (parser) impls of the above addressing specs: +# - https://github.com/libp2p/py-libp2p +# - https://docs.libp2p.io/concepts/nat/circuit-relay/#relay-addresses +# prots: bidict[int, str] = bidict({ +prots: bidict[int, str] = { + 'ipv4': 3, + 'ipv6': 3, + 'wg': 3, + + 'tcp': 4, + 'udp': 4, + + # TODO: support the next-gen shite Bo + # 'quic': 4, + # 'ssh': 7, # via rsyscall bootstrapping +} + +prot_params: dict[str, tuple[str]] = { + 'ipv4': ('addr',), + 'ipv6': ('addr',), + 'wg': ('addr', 'port', 'pubkey'), + + 'tcp': ('port',), + 'udp': ('port',), + + # 'quic': ('port',), + # 'ssh': ('port',), +} + + +def iter_prot_layers( + multiaddr: str, +) -> Iterator[ + tuple[ + int, + list[str] + ] +]: + ''' + Unpack a libp2p style "multiaddress" into multiple "segments" + for each "layer" of the protocoll stack (in OSI terms). + + ''' + tokens: list[str] = multiaddr.split('/') + root, tokens = tokens[0], tokens[1:] + assert not root # there is a root '/' on LHS + itokens = iter(tokens) + + prot: str | None = None + params: list[str] = [] + for token in itokens: + # every prot path should start with a known + # key-str. + if token in prots: + if prot is None: + prot: str = token + else: + yield prot, params + prot = token + + params = [] + + elif token not in prots: + params.append(token) + + else: + yield prot, params + + +def parse_addr( + multiaddr: str, +) -> dict[str, str | int | dict]: + ''' + Parse a libp2p style "multiaddress" into it's distinct protocol + segments where each segment: + + `../<protocol>/<param0>/<param1>/../<paramN>` + + is loaded into a layers `dict[str, dict[str, Any]` which holds + each prot segment of the path as a separate entry sortable by + it's approx OSI "layer number". + + Any `paramN` in the path must be distinctly defined in order + according to the (global) `prot_params` table in this module. + + ''' + layers: dict[str, str | int | dict] = {} + for ( + prot_key, + params, + ) in iter_prot_layers(multiaddr): + + layer: int = prots[prot_key] # OSI layer used for sorting + ep: dict[str, int | str] = {'layer': layer} + layers[prot_key] = ep + + # TODO; validation and resolving of names: + # - each param via a validator provided as part of the + # prot_params def? (also see `"port"` case below..) + # - do a resolv step that will check addrs against + # any loaded network.resolv: dict[str, str] + rparams: list = list(reversed(params)) + for key in prot_params[prot_key]: + val: str | int = rparams.pop() + + # TODO: UGHH, dunno what we should do for validation + # here, put it in the params spec somehow? + if key == 'port': + val = int(val) + + ep[key] = val + + return layers From 4db377c01dd2fd15c6783cf9e2fed420a4254903 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Fri, 29 Sep 2023 14:49:18 -0400 Subject: [PATCH 04/24] Rename to `parse_maddr()` and fill out doc strings --- tractor/_multiaddr.py | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/tractor/_multiaddr.py b/tractor/_multiaddr.py index f6b37a35..d0f562c0 100644 --- a/tractor/_multiaddr.py +++ b/tractor/_multiaddr.py @@ -96,21 +96,30 @@ def iter_prot_layers( yield prot, params -def parse_addr( +def parse_maddr( multiaddr: str, ) -> dict[str, str | int | dict]: ''' Parse a libp2p style "multiaddress" into it's distinct protocol - segments where each segment: + segments where each segment is of the form: `../<protocol>/<param0>/<param1>/../<paramN>` - is loaded into a layers `dict[str, dict[str, Any]` which holds - each prot segment of the path as a separate entry sortable by - it's approx OSI "layer number". + and is loaded into a (order preserving) `layers: dict[str, + dict[str, Any]` which holds each protocol-layer-segment of the + original `str` path as a separate entry according to its approx + OSI "layer number". - Any `paramN` in the path must be distinctly defined in order - according to the (global) `prot_params` table in this module. + Any `paramN` in the path must be distinctly defined by a str-token in the + (module global) `prot_params` table. + + For eg. for wireguard which requires an address, port number and publickey + the protocol params are specified as the entry: + + 'wg': ('addr', 'port', 'pubkey'), + + and are thus parsed from a maddr in that order: + `'/wg/1.1.1.1/51820/<pubkey>'` ''' layers: dict[str, str | int | dict] = {} From 72467491379c1945995b3d264435f742b28502d1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Tue, 3 Oct 2023 10:54:46 -0400 Subject: [PATCH 05/24] Add post-mortem catch around failed transport addr binds to aid with runtime debugging --- tractor/_root.py | 12 ++++++++---- tractor/_runtime.py | 21 +++++++++++++++------ 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/tractor/_root.py b/tractor/_root.py index 403907ec..a7b4d4d2 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -280,10 +280,14 @@ async def ping_tpt_socket( # start the actor runtime in a new task async with trio.open_nursery() as nursery: - # ``_runtime.async_main()`` creates an internal nursery and - # thus blocks here until the entire underlying actor tree has - # terminated thereby conducting structured concurrency. - + # ``_runtime.async_main()`` creates an internal nursery + # and blocks here until any underlying actor(-process) + # tree has terminated thereby conducting so called + # "end-to-end" structured concurrency throughout an + # entire hierarchical python sub-process set; all + # "actor runtime" primitives are SC-compat and thus all + # transitively spawned actors/processes must be as + # well. await nursery.start( partial( async_main, diff --git a/tractor/_runtime.py b/tractor/_runtime.py index d2a9e405..ed1e5f85 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -1514,13 +1514,22 @@ async def async_main( # - root actor: the ``accept_addr`` passed to this method assert accept_addrs - actor._server_n = await service_nursery.start( - partial( - actor._serve_forever, - service_nursery, - listen_sockaddrs=accept_addrs, + try: + actor._server_n = await service_nursery.start( + partial( + actor._serve_forever, + service_nursery, + listen_sockaddrs=accept_addrs, + ) ) - ) + except OSError as oserr: + # NOTE: always allow runtime hackers to debug + # tranport address bind errors - normally it's + # something silly like the wrong socket-address + # passed via a config or CLI Bo + entered_debug = await _debug._maybe_enter_pm(oserr) + raise + accept_addrs: list[tuple[str, int]] = actor.accept_addrs # NOTE: only set the loopback addr for the From 61d82d47c2e36fbaad8fc916a5059a5ea8a971e7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Sat, 7 Oct 2023 18:52:37 -0400 Subject: [PATCH 06/24] Oof, default reg addrs needs to be in `list[tuple]` form.. --- tractor/_root.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tractor/_root.py b/tractor/_root.py index a7b4d4d2..661a0b87 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -125,10 +125,10 @@ async def open_root_actor( registry_addrs: list[tuple[str, int]] = ( registry_addrs - or [ # default on localhost + or [( # default on localhost _default_arbiter_host, _default_arbiter_port, - ] + )] ) loglevel = ( From 2e17b084b24adc2ce9f04aabb224e18e25c4cef8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Wed, 18 Oct 2023 13:20:29 -0400 Subject: [PATCH 07/24] Always set default reg addr in `find_actor()` if not defined --- tractor/_discovery.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/tractor/_discovery.py b/tractor/_discovery.py index 22ab88d1..0f9f88e5 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -22,6 +22,7 @@ from __future__ import annotations from typing import ( AsyncGenerator, + AsyncContextManager, TYPE_CHECKING, ) from contextlib import asynccontextmanager as acm @@ -190,11 +191,19 @@ async def maybe_open_portal_from_reg_addr( else: yield None + if not registry_addrs: + from ._root import _default_lo_addrs + registry_addrs = _default_lo_addrs + + maybe_portals: list[ + AsyncContextManager[tuple[str, int]] + ] = list( + maybe_open_portal_from_reg_addr(addr) + for addr in registry_addrs + ) + async with gather_contexts( - mngrs=list( - maybe_open_portal_from_reg_addr(addr) - for addr in registry_addrs - ) + mngrs=maybe_portals, ) as maybe_portals: print(f'Portalz: {maybe_portals}') if not maybe_portals: @@ -206,6 +215,9 @@ async def maybe_open_portal_from_reg_addr( yield portals[0] else: + # TODO: currently this may return multiple portals + # given there are multi-homed or multiple registrars.. + # SO, we probably need de-duplication logic? yield portals From 0246c824b93edee2e28b31dc85191dee3e26f95a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Wed, 18 Oct 2023 14:12:58 -0400 Subject: [PATCH 08/24] ._root: set a `_default_lo_addrs` and apply it when not provided by caller --- tractor/_root.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/tractor/_root.py b/tractor/_root.py index 661a0b87..fb9138c8 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -47,8 +47,14 @@ # set at startup and after forks -_default_arbiter_host: str = '127.0.0.1' -_default_arbiter_port: int = 1616 +_default_lo_host: str = '127.0.0.1' +_default_port: int = 1616 + +# default registry always on localhost +_default_lo_addrs: list[tuple[str, int]] = [( + _default_lo_host, + _default_port, +)] logger = log.get_logger('tractor') @@ -125,10 +131,8 @@ async def open_root_actor( registry_addrs: list[tuple[str, int]] = ( registry_addrs - or [( # default on localhost - _default_arbiter_host, - _default_arbiter_port, - )] + or + _default_lo_addrs ) loglevel = ( @@ -350,9 +354,7 @@ def run_daemon( # runtime kwargs name: str | None = 'root', - registry_addrs: list[tuple[str, int]] = [ - (_default_arbiter_host, _default_arbiter_port) - ], + registry_addrs: list[tuple[str, int]] = _default_lo_addrs, start_method: str | None = None, debug_mode: bool = False, From 94c89fd4255d6f6cecda5fc1413c1821593be0e1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Wed, 18 Oct 2023 15:22:54 -0400 Subject: [PATCH 09/24] Facepalm, `wait_for_actor()` dun take an addr `list`.. --- tractor/_discovery.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tractor/_discovery.py b/tractor/_discovery.py index 0f9f88e5..1fa2a885 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -243,9 +243,7 @@ async def wait_for_actor( DeprecationWarning, stacklevel=2, ) - registry_addr: list[tuple[str, int]] = [ - arbiter_sockaddr, - ] + registry_addr: tuple[str, int] = arbiter_sockaddr # TODO: use `.trionics.gather_contexts()` like # above in `find_actor()` as well? From 6d671f69b82cd0d47b84812d4a0b27529035b125 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Wed, 18 Oct 2023 15:35:35 -0400 Subject: [PATCH 10/24] Rename fixture `arb_addr` -> `reg_addr` and set the session value globally as `._root._default_lo_addrs` --- tests/conftest.py | 43 +++++++++++++++++++++++++++---------------- tractor/_spawn.py | 4 ++++ 2 files changed, 31 insertions(+), 16 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 01811b56..5ce84425 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -114,12 +114,18 @@ def ci_env() -> bool: '127.0.0.1', random.randint(1000, 9999), ) -_arb_addr = _reg_addr @pytest.fixture(scope='session') -def arb_addr(): - return _arb_addr +def reg_addr() -> tuple[str, int]: + + # globally override the runtime to the per-test-session-dynamic + # addr so that all tests never conflict with any other actor + # tree using the default. + from tractor import _root + _root._default_lo_addrs = [_reg_addr] + + return _reg_addr def pytest_generate_tests(metafunc): @@ -161,30 +167,35 @@ def sig_prog(proc, sig): def daemon( loglevel: str, testdir, - arb_addr: tuple[str, int], + reg_addr: tuple[str, int], ): ''' - Run a daemon actor as a "remote arbiter". + Run a daemon root actor as a separate actor-process tree and + "remote registrar" for discovery-protocol related tests. ''' if loglevel in ('trace', 'debug'): - # too much logging will lock up the subproc (smh) - loglevel = 'info' - - cmdargs = [ - sys.executable, '-c', - "import tractor; tractor.run_daemon([], registry_addr={}, loglevel={})" - .format( - arb_addr, - "'{}'".format(loglevel) if loglevel else None) + # XXX: too much logging will lock up the subproc (smh) + loglevel: str = 'info' + + code: str = ( + "import tractor; " + "tractor.run_daemon([], registry_addrs={reg_addrs}, loglevel={ll})" + ).format( + reg_addrs=str([reg_addr]), + ll="'{}'".format(loglevel) if loglevel else None, + ) + cmd: list[str] = [ + sys.executable, + '-c', code, ] - kwargs = dict() + kwargs = {} if platform.system() == 'Windows': # without this, tests hang on windows forever kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP proc = testdir.popen( - cmdargs, + cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs, diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 001a0f10..cb8eef5b 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -220,6 +220,10 @@ async def hard_kill( # whilst also hacking on it XD # terminate_after: int = 99999, + # NOTE: for mucking with `.pause()`-ing inside the runtime + # whilst also hacking on it XD + # terminate_after: int = 99999, + ) -> None: ''' Un-gracefully terminate an OS level `trio.Process` after timeout. From f834b35aa9a33d0c74df2d3daf0bdfc356401549 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Wed, 18 Oct 2023 19:08:35 -0400 Subject: [PATCH 11/24] Ensure `registry_addrs` is always set to something --- tractor/_root.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tractor/_root.py b/tractor/_root.py index fb9138c8..3089b802 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -47,12 +47,12 @@ # set at startup and after forks -_default_lo_host: str = '127.0.0.1' +_default_host: str = '127.0.0.1' _default_port: int = 1616 # default registry always on localhost _default_lo_addrs: list[tuple[str, int]] = [( - _default_lo_host, + _default_host, _default_port, )] @@ -134,6 +134,7 @@ async def open_root_actor( or _default_lo_addrs ) + assert registry_addrs loglevel = ( loglevel From 4868bf225c8ef066e1dfc1006f0770def1bf703a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Wed, 18 Oct 2023 19:10:04 -0400 Subject: [PATCH 12/24] Always dynamically re-read the `._root._default_lo_addrs` value in `find_actor()` --- tractor/_discovery.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tractor/_discovery.py b/tractor/_discovery.py index 1fa2a885..070321b6 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -192,8 +192,11 @@ async def maybe_open_portal_from_reg_addr( yield None if not registry_addrs: - from ._root import _default_lo_addrs - registry_addrs = _default_lo_addrs + # XXX NOTE: make sure to dynamically read the value on + # every call since something may change it globally (eg. + # like in our discovery test suite)! + from . import _root + registry_addrs = _root._default_lo_addrs maybe_portals: list[ AsyncContextManager[tuple[str, int]] From 51bd38976f8dfc56e8fd21be6e49fc50c4c192cb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Thu, 19 Oct 2023 12:05:44 -0400 Subject: [PATCH 13/24] Expose per-actor registry addrs via `.reg_addrs` Since it's handy to be able to debug the *writing* of this instance var (particularly when checking state passed down to a child in `Actor._from_parent()`), rename and wrap the underlying `Actor._reg_addrs` as a settable `@property` and add validation to the `.setter` for sanity - actor discovery is a critical functionality. Other tweaks: - fix `.cancel_soon()` to pass expected argument.. - update internal runtime error message to be simpler and link to GH issues. - use new `Actor.reg_addrs` throughout core. --- tractor/_runtime.py | 94 +++++++++++++++++++++++++++++++++------------ 1 file changed, 69 insertions(+), 25 deletions(-) diff --git a/tractor/_runtime.py b/tractor/_runtime.py index ed1e5f85..9889c53d 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -216,12 +216,6 @@ def __init__( ) registry_addrs: list[tuple[str, int]] = [arbiter_addr] - self._reg_addrs: list[tuple[str, int]] = ( - registry_addrs - or - None - ) - # marked by the process spawning backend at startup # will be None for the parent most process started manually # by the user (currently called the "arbiter") @@ -258,6 +252,44 @@ def __init__( ActorNursery | None, ] = {} # type: ignore # noqa + # when provided, init the registry addresses property from + # input via the validator. + self._reg_addrs: list[tuple[str, int]] = [] + if registry_addrs: + self.reg_addrs: list[tuple[str, int]] = registry_addrs + + @property + def reg_addrs(self) -> list[tuple[str, int]]: + ''' + List of (socket) addresses for all known (and contactable) + registry actors. + + ''' + return self._reg_addrs + + @reg_addrs.setter + def reg_addrs( + self, + addrs: list[tuple[str, int]], + ) -> None: + if not addrs: + log.warning( + 'Empty registry address list is invalid:\n' + f'{addrs}' + ) + return + + # always sanity check the input list since it's critical + # that addrs are correct for discovery sys operation. + for addr in addrs: + if not isinstance(addr, tuple): + raise ValueError( + 'Expected `Actor.reg_addrs: list[tuple[str, int]]`\n' + f'Got {addrs}' + ) + + self._reg_addrs = addrs + async def wait_for_peer( self, uid: tuple[str, str] ) -> tuple[trio.Event, Channel]: @@ -542,16 +574,19 @@ async def _stream_handler( if disconnected: # if the transport died and this actor is still - # registered within a local nursery, we report that the - # IPC layer may have failed unexpectedly since it may be - # the cause of other downstream errors. + # registered within a local nursery, we report + # that the IPC layer may have failed + # unexpectedly since it may be the cause of + # other downstream errors. entry = local_nursery._children.get(uid) if entry: proc: trio.Process _, proc, _ = entry - poll = getattr(proc, 'poll', None) - if poll and poll() is None: + if ( + (poll := getattr(proc, 'poll', None)) + and poll() is None + ): log.cancel( f'Peer IPC broke but subproc is alive?\n\n' @@ -940,14 +975,18 @@ async def _from_parent( _state._runtime_vars.update(rvs) for attr, value in parent_data.items(): - - if attr == '_reg_addrs': + if ( + attr == 'reg_addrs' + and value + ): # XXX: ``msgspec`` doesn't support serializing tuples # so just cash manually here since it's what our # internals expect. - self._reg_addrs = [ - tuple(val) for val in value - ] if value else None + # TODO: we don't really NEED these as + # tuples so we can probably drop this + # casting since apparently in python lists + # are "more efficient"? + self.reg_addrs = [tuple(val) for val in value] else: setattr(self, attr, value) @@ -1553,9 +1592,12 @@ async def async_main( # if addresses point to the same actor.. # So we need a way to detect that? maybe iterate # only on unique actor uids? - for addr in actor._reg_addrs: - assert isinstance(addr, tuple) - assert addr[1] # non-zero after bind + for addr in actor.reg_addrs: + try: + assert isinstance(addr, tuple) + assert addr[1] # non-zero after bind + except AssertionError: + await _debug.pause() async with get_registry(*addr) as reg_portal: for accept_addr in accept_addrs: @@ -1611,12 +1653,14 @@ async def async_main( # once we have that all working with std streams locking? log.exception( f"Actor errored and failed to register with arbiter " - f"@ {actor._reg_addrs[0]}?") + f"@ {actor.reg_addrs[0]}?") log.error( - "\n\n\t^^^ THIS IS PROBABLY A TRACTOR BUGGGGG!!! ^^^\n" - "\tCALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN.\n\n" - "\tIf this is a sub-actor likely its parent will keep running " - "\tcorrectly if this error is caught and ignored.." + "\n\n\t^^^ THIS IS PROBABLY AN INTERNAL `tractor` BUG! ^^^\n\n" + "\t>> CALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN <<\n\n" + "\tIf this is a sub-actor hopefully its parent will keep running " + "correctly presuming this error was safely ignored..\n\n" + "\tPLEASE REPORT THIS TRACEBACK IN A BUG REPORT: " + "https://github.com/goodboy/tractor/issues\n" ) if actor._parent_chan: @@ -1660,7 +1704,7 @@ async def async_main( and not actor.is_registrar ): failed: bool = False - for addr in actor._reg_addrs: + for addr in actor.reg_addrs: assert isinstance(addr, tuple) with trio.move_on_after(0.5) as cs: cs.shield = True From 8b0b4abb3ce1110266eb0cfaf74db70546adc966 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Thu, 19 Oct 2023 12:40:37 -0400 Subject: [PATCH 14/24] Change remaining internals to use `Actor.reg_addrs` --- tractor/_discovery.py | 32 +++++++++++++++++++------------- tractor/_root.py | 9 ++++++--- tractor/_spawn.py | 2 +- tractor/_supervise.py | 2 +- 4 files changed, 27 insertions(+), 18 deletions(-) diff --git a/tractor/_discovery.py b/tractor/_discovery.py index 070321b6..b5f47165 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -133,14 +133,12 @@ async def query_actor( ) regaddr: list[tuple[str, int]] = arbiter_sockaddr - regstr: Portal - async with get_registry( - *(regaddr or actor._reg_addrs[0]) - ) as regstr: - + reg_portal: Portal + regaddr: tuple[str, int] = regaddr or actor.reg_addrs[0] + async with get_registry(*regaddr) as reg_portal: # TODO: return portals to all available actors - for now # just the last one that registered - sockaddr: tuple[str, int] = await regstr.run_from_ns( + sockaddr: tuple[str, int] = await reg_portal.run_from_ns( 'self', 'find_actor', name=name, @@ -155,6 +153,7 @@ async def find_actor( registry_addrs: list[tuple[str, int]] | None = None, only_first: bool = True, + raise_on_none: bool = False, ) -> AsyncGenerator[ Portal | list[Portal] | None, @@ -207,13 +206,20 @@ async def maybe_open_portal_from_reg_addr( async with gather_contexts( mngrs=maybe_portals, - ) as maybe_portals: - print(f'Portalz: {maybe_portals}') - if not maybe_portals: + ) as portals: + # log.runtime( + # 'Gathered portals:\n' + # f'{portals}' + # ) + if not portals: + if raise_on_none: + raise RuntimeError( + f'No {name} found registered @ {registry_addrs}' + ) yield None return - portals: list[Portal] = list(maybe_portals) + portals: list[Portal] = list(portals) if only_first: yield portals[0] @@ -250,9 +256,9 @@ async def wait_for_actor( # TODO: use `.trionics.gather_contexts()` like # above in `find_actor()` as well? - async with get_registry( - *(registry_addr or actor._reg_addrs[0]), # first if not passed - ) as reg_portal: + reg_portal: Portal + regaddr: tuple[str, int] = registry_addr or actor.reg_addrs[0] + async with get_registry(*regaddr) as reg_portal: sockaddrs = await reg_portal.run_from_ns( 'self', 'wait_for_actor', diff --git a/tractor/_root.py b/tractor/_root.py index 3089b802..e2590e60 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -86,7 +86,7 @@ async def open_root_actor( enable_modules: list | None = None, rpc_module_paths: list | None = None, -) -> typing.Any: +) -> Actor: ''' Runtime init entry point for ``tractor``. @@ -131,7 +131,7 @@ async def open_root_actor( registry_addrs: list[tuple[str, int]] = ( registry_addrs - or + or _default_lo_addrs ) assert registry_addrs @@ -215,7 +215,10 @@ async def ping_tpt_socket( async with trio.open_nursery() as tn: for addr in registry_addrs: - tn.start_soon(ping_tpt_socket, addr) + tn.start_soon( + ping_tpt_socket, + tuple(addr), # TODO: just drop this requirement? + ) trans_bind_addrs: list[tuple[str, int]] = [] diff --git a/tractor/_spawn.py b/tractor/_spawn.py index cb8eef5b..aa0e9bf1 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -497,7 +497,7 @@ async def trio_proc( await chan.send({ '_parent_main_data': subactor._parent_main_data, 'enable_modules': subactor.enable_modules, - '_reg_addrs': subactor._reg_addrs, + 'reg_addrs': subactor.reg_addrs, 'bind_addrs': bind_addrs, '_runtime_vars': _runtime_vars, }) diff --git a/tractor/_supervise.py b/tractor/_supervise.py index 615ba692..733dd53c 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -160,7 +160,7 @@ async def start_actor( loglevel=loglevel, # verbatim relay this actor's registrar addresses - registry_addrs=current_actor()._reg_addrs, + registry_addrs=current_actor().reg_addrs, ) parent_addr = self._actor.accept_addr assert parent_addr From 5f0bfeae5700bdda2f1338ff243b34c8ebc5bccb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Mon, 6 Nov 2023 15:43:43 -0500 Subject: [PATCH 15/24] Test with `any(portals)` since `gather_contexts()` will return `list[None | tuple]` --- tractor/_discovery.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tractor/_discovery.py b/tractor/_discovery.py index b5f47165..e5bc8dbe 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -211,10 +211,14 @@ async def maybe_open_portal_from_reg_addr( # 'Gathered portals:\n' # f'{portals}' # ) - if not portals: + # NOTE: `gather_contexts()` will return a + # `tuple[None, None, ..., None]` if no contact + # can be made with any regstrar at any of the + # N provided addrs! + if not any(portals): if raise_on_none: raise RuntimeError( - f'No {name} found registered @ {registry_addrs}' + f'No actor "{name}" found registered @ {registry_addrs}' ) yield None return From 2d541fdd9bba0bb721cdbee336f49adf40642bcc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Mon, 6 Nov 2023 15:44:21 -0500 Subject: [PATCH 16/24] Fix doc string "its" typo.. --- tractor/_multiaddr.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tractor/_multiaddr.py b/tractor/_multiaddr.py index d0f562c0..e8713b40 100644 --- a/tractor/_multiaddr.py +++ b/tractor/_multiaddr.py @@ -100,7 +100,7 @@ def parse_maddr( multiaddr: str, ) -> dict[str, str | int | dict]: ''' - Parse a libp2p style "multiaddress" into it's distinct protocol + Parse a libp2p style "multiaddress" into its distinct protocol segments where each segment is of the form: `../<protocol>/<param0>/<param1>/../<paramN>` From ebf9909cc45d3d13b5d30e61024308b1d418e30c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Tue, 7 Nov 2023 16:45:22 -0500 Subject: [PATCH 17/24] Add `open_root_actor(ensure_registry: bool)` Allows forcing the opened actor to either obtain the passed registry addrs or raise a runtime error. --- tractor/_root.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tractor/_root.py b/tractor/_root.py index e2590e60..a58de8c7 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -86,6 +86,10 @@ async def open_root_actor( enable_modules: list | None = None, rpc_module_paths: list | None = None, + # NOTE: allow caller to ensure that only one registry exists + # and that this call creates it. + ensure_registry: bool = False, + ) -> Actor: ''' Runtime init entry point for ``tractor``. @@ -226,6 +230,12 @@ async def ping_tpt_socket( # REGISTRAR if ponged_addrs: + if ensure_registry: + raise RuntimeError( + f'Failed to open `{name}`@{ponged_addrs}: ' + 'registry socket(s) already bound' + ) + # we were able to connect to an arbiter logger.info( f'Registry(s) seem(s) to exist @ {ponged_addrs}' From 15a4a2a51ef23de6aeb8ff75eda37ca11eacd3de Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Mon, 11 Dec 2023 19:37:45 -0500 Subject: [PATCH 18/24] `.discovery.get_arbiter()`: add warning around this now deprecated usage --- tractor/_discovery.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/tractor/_discovery.py b/tractor/_discovery.py index e5bc8dbe..8cccc505 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -76,8 +76,18 @@ async def get_registry( yield regstr_ptl -# TODO: deprecate and remove _arbiter form -get_arbiter = get_registry + +# TODO: deprecate and this remove _arbiter form! +@acm +async def get_arbiter(*args, **kwargs): + warnings.warn( + '`tractor.get_arbiter()` is now deprecated!\n' + 'Use `.get_registry()` instead!', + DeprecationWarning, + stacklevel=2, + ) + async with get_registry(*args, **kwargs) as to_yield: + yield to_yield @acm From dbd79d8bebe8d0e0e4bec5139565824664f13f66 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Tue, 2 Jan 2024 09:08:39 -0500 Subject: [PATCH 19/24] Log chan-server-startup failures via `.exception()` --- tractor/_runtime.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 9889c53d..ea23fbdf 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -1010,7 +1010,7 @@ async def _serve_forever( # (host, port) to bind for channel server listen_sockaddrs: list[tuple[str, int]] | None = None, - task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED, ) -> None: ''' Start the IPC transport server, begin listening for new connections. @@ -1566,7 +1566,9 @@ async def async_main( # tranport address bind errors - normally it's # something silly like the wrong socket-address # passed via a config or CLI Bo - entered_debug = await _debug._maybe_enter_pm(oserr) + entered_debug: bool = await _debug._maybe_enter_pm(oserr) + if not entered_debug: + log.exception('Failed to init IPC channel server !?\n') raise accept_addrs: list[tuple[str, int]] = actor.accept_addrs From f947bdf80cc494aef40dfcad645eec3adcb85137 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Tue, 2 Jan 2024 10:25:17 -0500 Subject: [PATCH 20/24] Use `import <name> as <name>,` style over `__all__` in pkg mod --- tractor/__init__.py | 88 ++++++++++++++++----------------------------- 1 file changed, 31 insertions(+), 57 deletions(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index 5c16bc4e..31f59598 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -18,74 +18,48 @@ tractor: structured concurrent ``trio``-"actors". """ -from ._clustering import open_actor_cluster + +from ._clustering import ( + open_actor_cluster as open_actor_cluster, +) from ._context import ( - Context, # the type - context, # a func-decorator + Context as Context, # the type + context as context, # a func-decorator ) from ._streaming import ( - MsgStream, - stream, + MsgStream as MsgStream, + stream as stream, ) from ._discovery import ( - get_arbiter, - find_actor, - wait_for_actor, - query_actor, + get_arbiter as get_arbiter, + find_actor as find_actor, + wait_for_actor as wait_for_actor, + query_actor as query_actor, +) +from ._supervise import ( + open_nursery as open_nursery, + ActorNursery as ActorNursery, ) -from ._supervise import open_nursery from ._state import ( - current_actor, - is_root_process, + current_actor as current_actor, + is_root_process as is_root_process, ) from ._exceptions import ( - RemoteActorError, - ModuleNotExposed, - ContextCancelled, + RemoteActorError as RemoteActorError, + ModuleNotExposed as ModuleNotExposed, + ContextCancelled as ContextCancelled, ) from .devx import ( - breakpoint, - pause, - pause_from_sync, - post_mortem, + breakpoint as breakpoint, + pause as pause, + pause_from_sync as pause_from_sync, + post_mortem as post_mortem, ) -from . import msg +from . import msg as msg from ._root import ( - run_daemon, - open_root_actor, + run_daemon as run_daemon, + open_root_actor as open_root_actor, ) -from ._ipc import Channel -from ._portal import Portal -from ._runtime import Actor - - -__all__ = [ - 'Actor', - 'BaseExceptionGroup', - 'Channel', - 'Context', - 'ContextCancelled', - 'ModuleNotExposed', - 'MsgStream', - 'Portal', - 'RemoteActorError', - 'breakpoint', - 'context', - 'current_actor', - 'find_actor', - 'query_actor', - 'get_arbiter', - 'is_root_process', - 'msg', - 'open_actor_cluster', - 'open_nursery', - 'open_root_actor', - 'pause', - 'post_mortem', - 'pause_from_sync', - 'query_actor', - 'run_daemon', - 'stream', - 'to_asyncio', - 'wait_for_actor', -] +from ._ipc import Channel as Channel +from ._portal import Portal as Portal +from ._runtime import Actor as Actor From 14f34c111ac40b9560edbf4b448f1342454a2785 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Tue, 2 Jan 2024 18:43:43 -0500 Subject: [PATCH 21/24] `_root`: drop unused `typing` import --- tractor/_root.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tractor/_root.py b/tractor/_root.py index a58de8c7..f91eda5a 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -25,7 +25,6 @@ import signal import sys import os -import typing import warnings From 9082efbe6852066579148e8facffaba5f3abef1f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Fri, 8 Mar 2024 15:34:20 -0500 Subject: [PATCH 22/24] Add a `._state._runtime_vars['_registry_addrs']` Such that it's set to whatever `Actor.reg_addrs: list[tuple]` is during the actor's init-after-spawn guaranteeing each actor has at least the registry infos from its parent. Ensure we read this if defined over `_root._default_lo_addrs` in `._discovery` routines, namely `.find_actor()` since it's the one API normally used without expecting the runtime's `current_actor()` to be up. Update the latest inter-peer cancellation test to use the `reg_addr` fixture (and thus test this new runtime-vars value via `find_actor()` usage) since it was failing if run *after* the infected `asyncio` suite due to registry contact failure. --- tests/test_inter_peer_cancellation.py | 2 ++ tractor/_discovery.py | 11 +++++++++-- tractor/_runtime.py | 9 +++++---- tractor/_state.py | 3 ++- 4 files changed, 18 insertions(+), 7 deletions(-) diff --git a/tests/test_inter_peer_cancellation.py b/tests/test_inter_peer_cancellation.py index c3d9e4fd..e3c8a7dd 100644 --- a/tests/test_inter_peer_cancellation.py +++ b/tests/test_inter_peer_cancellation.py @@ -939,6 +939,7 @@ async def tell_little_bro( def test_peer_spawns_and_cancels_service_subactor( debug_mode: bool, raise_client_error: str, + reg_addr: tuple[str, int], ): # NOTE: this tests for the modden `mod wks open piker` bug # discovered as part of implementing workspace ctx @@ -956,6 +957,7 @@ async def main(): async with tractor.open_nursery( # NOTE: to halt the peer tasks on ctxc, uncomment this. debug_mode=debug_mode, + registry_addrs=[reg_addr], ) as an: server: Portal = await an.start_actor( (server_name := 'spawn_server'), diff --git a/tractor/_discovery.py b/tractor/_discovery.py index 8cccc505..de79edc0 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -35,7 +35,10 @@ open_portal, LocalPortal, ) -from ._state import current_actor, _runtime_vars +from ._state import ( + current_actor, + _runtime_vars, +) if TYPE_CHECKING: @@ -205,7 +208,11 @@ async def maybe_open_portal_from_reg_addr( # every call since something may change it globally (eg. # like in our discovery test suite)! from . import _root - registry_addrs = _root._default_lo_addrs + registry_addrs = ( + _runtime_vars['_registry_addrs'] + or + _root._default_lo_addrs + ) maybe_portals: list[ AsyncContextManager[tuple[str, int]] diff --git a/tractor/_runtime.py b/tractor/_runtime.py index ea23fbdf..66a5381c 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -168,10 +168,10 @@ def __init__( name: str, *, enable_modules: list[str] = [], - uid: str | None = None, - loglevel: str | None = None, - registry_addrs: list[tuple[str, int]] | None = None, - spawn_method: str | None = None, + uid: str|None = None, + loglevel: str|None = None, + registry_addrs: list[tuple[str, int]]|None = None, + spawn_method: str|None = None, # TODO: remove! arbiter_addr: tuple[str, int] | None = None, @@ -257,6 +257,7 @@ def __init__( self._reg_addrs: list[tuple[str, int]] = [] if registry_addrs: self.reg_addrs: list[tuple[str, int]] = registry_addrs + _state._runtime_vars['_registry_addrs'] = registry_addrs @property def reg_addrs(self) -> list[tuple[str, int]]: diff --git a/tractor/_state.py b/tractor/_state.py index f3917436..9e4e9473 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -33,7 +33,8 @@ _runtime_vars: dict[str, Any] = { '_debug_mode': False, '_is_root': False, - '_root_mailbox': (None, None) + '_root_mailbox': (None, None), + '_registry_addrs': [], } From 5ffdda762a26897131a5b85d10d58e4edf66c74a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Mon, 11 Mar 2024 10:33:06 -0400 Subject: [PATCH 23/24] More spaceless union type annots --- tractor/_discovery.py | 4 ++-- tractor/_root.py | 20 ++++++++++++-------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/tractor/_discovery.py b/tractor/_discovery.py index de79edc0..99a4dd68 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -162,8 +162,8 @@ async def query_actor( @acm async def find_actor( name: str, - arbiter_sockaddr: tuple[str, int] | None = None, - registry_addrs: list[tuple[str, int]] | None = None, + arbiter_sockaddr: tuple[str, int]|None = None, + registry_addrs: list[tuple[str, int]]|None = None, only_first: bool = True, raise_on_none: bool = False, diff --git a/tractor/_root.py b/tractor/_root.py index f91eda5a..e1a7fb6c 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -64,26 +64,26 @@ async def open_root_actor( *, # defaults are above - registry_addrs: list[tuple[str, int]] | None = None, + registry_addrs: list[tuple[str, int]]|None = None, # defaults are above - arbiter_addr: tuple[str, int] | None = None, + arbiter_addr: tuple[str, int]|None = None, - name: str | None = 'root', + name: str|None = 'root', # either the `multiprocessing` start method: # https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods # OR `trio` (the new default). - start_method: _spawn.SpawnMethodKey | None = None, + start_method: _spawn.SpawnMethodKey|None = None, # enables the multi-process debugger support debug_mode: bool = False, # internal logging - loglevel: str | None = None, + loglevel: str|None = None, - enable_modules: list | None = None, - rpc_module_paths: list | None = None, + enable_modules: list|None = None, + rpc_module_paths: list|None = None, # NOTE: allow caller to ensure that only one registry exists # and that this call creates it. @@ -109,7 +109,11 @@ async def open_root_actor( _state._runtime_vars['_is_root'] = True # caps based rpc list - enable_modules = enable_modules or [] + enable_modules = ( + enable_modules + or + [] + ) if rpc_module_paths: warnings.warn( From 2b124447c822f3cb01a8499716d0c3e717dabe4f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Thu, 22 Feb 2024 20:37:12 -0500 Subject: [PATCH 24/24] Unmask `pytest.ini` log-capture lines (again) --- pytest.ini | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 pytest.ini diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 00000000..6a7e51fb --- /dev/null +++ b/pytest.ini @@ -0,0 +1,8 @@ +# vim: ft=ini +# pytest.ini for tractor + +[pytest] +# don't show frickin captured logs AGAIN in the report.. +addopts = --show-capture='no' +log_cli = false +; minversion = 6.0