diff --git a/pytest.ini b/pytest.ini
new file mode 100644
index 00000000..6a7e51fb
--- /dev/null
+++ b/pytest.ini
@@ -0,0 +1,8 @@
+# vim: ft=ini
+# pytest.ini for tractor
+
+[pytest]
+# don't show frickin captured logs AGAIN in the report..
+addopts = --show-capture='no'
+log_cli = false
+; minversion = 6.0
diff --git a/tests/conftest.py b/tests/conftest.py
index 01811b56..5ce84425 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -114,12 +114,18 @@ def ci_env() -> bool:
'127.0.0.1',
random.randint(1000, 9999),
)
-_arb_addr = _reg_addr
@pytest.fixture(scope='session')
-def arb_addr():
- return _arb_addr
+def reg_addr() -> tuple[str, int]:
+
+ # globally override the runtime to the per-test-session-dynamic
+ # addr so that all tests never conflict with any other actor
+ # tree using the default.
+ from tractor import _root
+ _root._default_lo_addrs = [_reg_addr]
+
+ return _reg_addr
def pytest_generate_tests(metafunc):
@@ -161,30 +167,35 @@ def sig_prog(proc, sig):
def daemon(
loglevel: str,
testdir,
- arb_addr: tuple[str, int],
+ reg_addr: tuple[str, int],
):
'''
- Run a daemon actor as a "remote arbiter".
+ Run a daemon root actor as a separate actor-process tree and
+ "remote registrar" for discovery-protocol related tests.
'''
if loglevel in ('trace', 'debug'):
- # too much logging will lock up the subproc (smh)
- loglevel = 'info'
-
- cmdargs = [
- sys.executable, '-c',
- "import tractor; tractor.run_daemon([], registry_addr={}, loglevel={})"
- .format(
- arb_addr,
- "'{}'".format(loglevel) if loglevel else None)
+ # XXX: too much logging will lock up the subproc (smh)
+ loglevel: str = 'info'
+
+ code: str = (
+ "import tractor; "
+ "tractor.run_daemon([], registry_addrs={reg_addrs}, loglevel={ll})"
+ ).format(
+ reg_addrs=str([reg_addr]),
+ ll="'{}'".format(loglevel) if loglevel else None,
+ )
+ cmd: list[str] = [
+ sys.executable,
+ '-c', code,
]
- kwargs = dict()
+ kwargs = {}
if platform.system() == 'Windows':
# without this, tests hang on windows forever
kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
proc = testdir.popen(
- cmdargs,
+ cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**kwargs,
diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py
index 14e4d0ae..b8c14af3 100644
--- a/tests/test_cancellation.py
+++ b/tests/test_cancellation.py
@@ -14,7 +14,7 @@
from tractor._testing import (
tractor_test,
)
-from .conftest import no_windows
+from conftest import no_windows
def is_win():
@@ -45,7 +45,7 @@ async def do_nuthin():
],
ids=['no_args', 'unexpected_args'],
)
-def test_remote_error(arb_addr, args_err):
+def test_remote_error(reg_addr, args_err):
'''
Verify an error raised in a subactor that is propagated
to the parent nursery, contains the underlying boxed builtin
@@ -57,7 +57,7 @@ def test_remote_error(arb_addr, args_err):
async def main():
async with tractor.open_nursery(
- arbiter_addr=arb_addr,
+ registry_addrs=[reg_addr],
) as nursery:
# on a remote type error caused by bad input args
@@ -99,7 +99,7 @@ async def main():
assert exc.type == errtype
-def test_multierror(arb_addr):
+def test_multierror(reg_addr):
'''
Verify we raise a ``BaseExceptionGroup`` out of a nursery where
more then one actor errors.
@@ -107,7 +107,7 @@ def test_multierror(arb_addr):
'''
async def main():
async with tractor.open_nursery(
- arbiter_addr=arb_addr,
+ registry_addrs=[reg_addr],
) as nursery:
await nursery.run_in_actor(assert_err, name='errorer1')
@@ -132,14 +132,14 @@ async def main():
@pytest.mark.parametrize(
'num_subactors', range(25, 26),
)
-def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
+def test_multierror_fast_nursery(reg_addr, start_method, num_subactors, delay):
"""Verify we raise a ``BaseExceptionGroup`` out of a nursery where
more then one actor errors and also with a delay before failure
to test failure during an ongoing spawning.
"""
async def main():
async with tractor.open_nursery(
- arbiter_addr=arb_addr,
+ registry_addrs=[reg_addr],
) as nursery:
for i in range(num_subactors):
@@ -177,15 +177,20 @@ async def do_nothing():
@pytest.mark.parametrize('mechanism', ['nursery_cancel', KeyboardInterrupt])
-def test_cancel_single_subactor(arb_addr, mechanism):
- """Ensure a ``ActorNursery.start_actor()`` spawned subactor
+def test_cancel_single_subactor(reg_addr, mechanism):
+ '''
+ Ensure a ``ActorNursery.start_actor()`` spawned subactor
cancels when the nursery is cancelled.
- """
+
+ '''
async def spawn_actor():
- """Spawn an actor that blocks indefinitely.
- """
+ '''
+ Spawn an actor that blocks indefinitely then cancel via
+ either `ActorNursery.cancel()` or an exception raise.
+
+ '''
async with tractor.open_nursery(
- arbiter_addr=arb_addr,
+ registry_addrs=[reg_addr],
) as nursery:
portal = await nursery.start_actor(
diff --git a/tests/test_child_manages_service_nursery.py b/tests/test_child_manages_service_nursery.py
index 1dcbe031..350f939b 100644
--- a/tests/test_child_manages_service_nursery.py
+++ b/tests/test_child_manages_service_nursery.py
@@ -142,7 +142,7 @@ async def open_actor_local_nursery(
)
def test_actor_managed_trio_nursery_task_error_cancels_aio(
asyncio_mode: bool,
- arb_addr
+ reg_addr: tuple,
):
'''
Verify that a ``trio`` nursery created managed in a child actor
diff --git a/tests/test_debugger.py b/tests/test_debugger.py
index a10ecad9..923aa94d 100644
--- a/tests/test_debugger.py
+++ b/tests/test_debugger.py
@@ -83,7 +83,7 @@ def mk_cmd(ex_name: str) -> str:
def spawn(
start_method,
testdir,
- arb_addr,
+ reg_addr,
) -> 'pexpect.spawn':
if start_method != 'trio':
diff --git a/tests/test_discovery.py b/tests/test_discovery.py
index b56c3a2e..cd9dc022 100644
--- a/tests/test_discovery.py
+++ b/tests/test_discovery.py
@@ -14,19 +14,19 @@
@tractor_test
-async def test_reg_then_unreg(arb_addr):
+async def test_reg_then_unreg(reg_addr):
actor = tractor.current_actor()
assert actor.is_arbiter
assert len(actor._registry) == 1 # only self is registered
async with tractor.open_nursery(
- arbiter_addr=arb_addr,
+ registry_addrs=[reg_addr],
) as n:
portal = await n.start_actor('actor', enable_modules=[__name__])
uid = portal.channel.uid
- async with tractor.get_arbiter(*arb_addr) as aportal:
+ async with tractor.get_arbiter(*reg_addr) as aportal:
# this local actor should be the arbiter
assert actor is aportal.actor
@@ -52,15 +52,27 @@ async def hi():
return the_line.format(tractor.current_actor().name)
-async def say_hello(other_actor):
+async def say_hello(
+ other_actor: str,
+ reg_addr: tuple[str, int],
+):
await trio.sleep(1) # wait for other actor to spawn
- async with tractor.find_actor(other_actor) as portal:
+ async with tractor.find_actor(
+ other_actor,
+ registry_addrs=[reg_addr],
+ ) as portal:
assert portal is not None
return await portal.run(__name__, 'hi')
-async def say_hello_use_wait(other_actor):
- async with tractor.wait_for_actor(other_actor) as portal:
+async def say_hello_use_wait(
+ other_actor: str,
+ reg_addr: tuple[str, int],
+):
+ async with tractor.wait_for_actor(
+ other_actor,
+ registry_addr=reg_addr,
+ ) as portal:
assert portal is not None
result = await portal.run(__name__, 'hi')
return result
@@ -68,21 +80,29 @@ async def say_hello_use_wait(other_actor):
@tractor_test
@pytest.mark.parametrize('func', [say_hello, say_hello_use_wait])
-async def test_trynamic_trio(func, start_method, arb_addr):
- """Main tractor entry point, the "master" process (for now
- acts as the "director").
- """
+async def test_trynamic_trio(
+ func,
+ start_method,
+ reg_addr,
+):
+ '''
+ Root actor acting as the "director" and running one-shot-task-actors
+ for the directed subs.
+
+ '''
async with tractor.open_nursery() as n:
print("Alright... Action!")
donny = await n.run_in_actor(
func,
other_actor='gretchen',
+ reg_addr=reg_addr,
name='donny',
)
gretchen = await n.run_in_actor(
func,
other_actor='donny',
+ reg_addr=reg_addr,
name='gretchen',
)
print(await gretchen.result())
@@ -130,7 +150,7 @@ async def unpack_reg(actor_or_portal):
async def spawn_and_check_registry(
- arb_addr: tuple,
+ reg_addr: tuple,
use_signal: bool,
remote_arbiter: bool = False,
with_streaming: bool = False,
@@ -138,9 +158,9 @@ async def spawn_and_check_registry(
) -> None:
async with tractor.open_root_actor(
- arbiter_addr=arb_addr,
+ registry_addrs=[reg_addr],
):
- async with tractor.get_arbiter(*arb_addr) as portal:
+ async with tractor.get_arbiter(*reg_addr) as portal:
# runtime needs to be up to call this
actor = tractor.current_actor()
@@ -212,17 +232,19 @@ async def spawn_and_check_registry(
def test_subactors_unregister_on_cancel(
start_method,
use_signal,
- arb_addr,
+ reg_addr,
with_streaming,
):
- """Verify that cancelling a nursery results in all subactors
+ '''
+ Verify that cancelling a nursery results in all subactors
deregistering themselves with the arbiter.
- """
+
+ '''
with pytest.raises(KeyboardInterrupt):
trio.run(
partial(
spawn_and_check_registry,
- arb_addr,
+ reg_addr,
use_signal,
remote_arbiter=False,
with_streaming=with_streaming,
@@ -236,7 +258,7 @@ def test_subactors_unregister_on_cancel_remote_daemon(
daemon,
start_method,
use_signal,
- arb_addr,
+ reg_addr,
with_streaming,
):
"""Verify that cancelling a nursery results in all subactors
@@ -247,7 +269,7 @@ def test_subactors_unregister_on_cancel_remote_daemon(
trio.run(
partial(
spawn_and_check_registry,
- arb_addr,
+ reg_addr,
use_signal,
remote_arbiter=True,
with_streaming=with_streaming,
@@ -261,7 +283,7 @@ async def streamer(agen):
async def close_chans_before_nursery(
- arb_addr: tuple,
+ reg_addr: tuple,
use_signal: bool,
remote_arbiter: bool = False,
) -> None:
@@ -274,9 +296,9 @@ async def close_chans_before_nursery(
entries_at_end = 1
async with tractor.open_root_actor(
- arbiter_addr=arb_addr,
+ registry_addrs=[reg_addr],
):
- async with tractor.get_arbiter(*arb_addr) as aportal:
+ async with tractor.get_arbiter(*reg_addr) as aportal:
try:
get_reg = partial(unpack_reg, aportal)
@@ -328,7 +350,7 @@ async def close_chans_before_nursery(
def test_close_channel_explicit(
start_method,
use_signal,
- arb_addr,
+ reg_addr,
):
"""Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also
@@ -338,7 +360,7 @@ def test_close_channel_explicit(
trio.run(
partial(
close_chans_before_nursery,
- arb_addr,
+ reg_addr,
use_signal,
remote_arbiter=False,
),
@@ -350,7 +372,7 @@ def test_close_channel_explicit_remote_arbiter(
daemon,
start_method,
use_signal,
- arb_addr,
+ reg_addr,
):
"""Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also
@@ -360,7 +382,7 @@ def test_close_channel_explicit_remote_arbiter(
trio.run(
partial(
close_chans_before_nursery,
- arb_addr,
+ reg_addr,
use_signal,
remote_arbiter=True,
),
diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py
index f9670225..568708a2 100644
--- a/tests/test_infected_asyncio.py
+++ b/tests/test_infected_asyncio.py
@@ -47,7 +47,7 @@ async def trio_cancels_single_aio_task():
await tractor.to_asyncio.run_task(sleep_forever)
-def test_trio_cancels_aio_on_actor_side(arb_addr):
+def test_trio_cancels_aio_on_actor_side(reg_addr):
'''
Spawn an infected actor that is cancelled by the ``trio`` side
task using std cancel scope apis.
@@ -55,7 +55,7 @@ def test_trio_cancels_aio_on_actor_side(arb_addr):
'''
async def main():
async with tractor.open_nursery(
- arbiter_addr=arb_addr
+ registry_addrs=[reg_addr]
) as n:
await n.run_in_actor(
trio_cancels_single_aio_task,
@@ -94,7 +94,7 @@ async def asyncio_actor(
raise
-def test_aio_simple_error(arb_addr):
+def test_aio_simple_error(reg_addr):
'''
Verify a simple remote asyncio error propagates back through trio
to the parent actor.
@@ -103,7 +103,7 @@ def test_aio_simple_error(arb_addr):
'''
async def main():
async with tractor.open_nursery(
- arbiter_addr=arb_addr
+ registry_addrs=[reg_addr]
) as n:
await n.run_in_actor(
asyncio_actor,
@@ -131,7 +131,7 @@ async def main():
assert err.type == AssertionError
-def test_tractor_cancels_aio(arb_addr):
+def test_tractor_cancels_aio(reg_addr):
'''
Verify we can cancel a spawned asyncio task gracefully.
@@ -150,7 +150,7 @@ async def main():
trio.run(main)
-def test_trio_cancels_aio(arb_addr):
+def test_trio_cancels_aio(reg_addr):
'''
Much like the above test with ``tractor.Portal.cancel_actor()``
except we just use a standard ``trio`` cancellation api.
@@ -206,7 +206,7 @@ async def trio_ctx(
ids='parent_actor_cancels_child={}'.format
)
def test_context_spawns_aio_task_that_errors(
- arb_addr,
+ reg_addr,
parent_cancels: bool,
):
'''
@@ -288,7 +288,7 @@ async def aio_cancel():
await sleep_forever()
-def test_aio_cancelled_from_aio_causes_trio_cancelled(arb_addr):
+def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr):
async def main():
async with tractor.open_nursery() as n:
@@ -436,7 +436,7 @@ async def consume(
'fan_out', [False, True],
ids='fan_out_w_chan_subscribe={}'.format
)
-def test_basic_interloop_channel_stream(arb_addr, fan_out):
+def test_basic_interloop_channel_stream(reg_addr, fan_out):
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
@@ -450,7 +450,7 @@ async def main():
# TODO: parametrize the above test and avoid the duplication here?
-def test_trio_error_cancels_intertask_chan(arb_addr):
+def test_trio_error_cancels_intertask_chan(reg_addr):
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
@@ -469,7 +469,7 @@ async def main():
assert exc.type == Exception
-def test_trio_closes_early_and_channel_exits(arb_addr):
+def test_trio_closes_early_and_channel_exits(reg_addr):
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
@@ -484,7 +484,7 @@ async def main():
trio.run(main)
-def test_aio_errors_and_channel_propagates_and_closes(arb_addr):
+def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
@@ -561,7 +561,7 @@ async def aio_echo_server(
ids='raise_error={}'.format,
)
def test_echoserver_detailed_mechanics(
- arb_addr,
+ reg_addr,
raise_error_mid_stream,
):
diff --git a/tests/test_inter_peer_cancellation.py b/tests/test_inter_peer_cancellation.py
index c3d9e4fd..e3c8a7dd 100644
--- a/tests/test_inter_peer_cancellation.py
+++ b/tests/test_inter_peer_cancellation.py
@@ -939,6 +939,7 @@ async def tell_little_bro(
def test_peer_spawns_and_cancels_service_subactor(
debug_mode: bool,
raise_client_error: str,
+ reg_addr: tuple[str, int],
):
# NOTE: this tests for the modden `mod wks open piker` bug
# discovered as part of implementing workspace ctx
@@ -956,6 +957,7 @@ async def main():
async with tractor.open_nursery(
# NOTE: to halt the peer tasks on ctxc, uncomment this.
debug_mode=debug_mode,
+ registry_addrs=[reg_addr],
) as an:
server: Portal = await an.start_actor(
(server_name := 'spawn_server'),
diff --git a/tests/test_legacy_one_way_streaming.py b/tests/test_legacy_one_way_streaming.py
index 61fff75c..6092bca7 100644
--- a/tests/test_legacy_one_way_streaming.py
+++ b/tests/test_legacy_one_way_streaming.py
@@ -58,7 +58,7 @@ async def context_stream(
async def stream_from_single_subactor(
- arb_addr,
+ reg_addr,
start_method,
stream_func,
):
@@ -67,7 +67,7 @@ async def stream_from_single_subactor(
# only one per host address, spawns an actor if None
async with tractor.open_nursery(
- arbiter_addr=arb_addr,
+ registry_addrs=[reg_addr],
start_method=start_method,
) as nursery:
@@ -118,13 +118,13 @@ async def stream_from_single_subactor(
@pytest.mark.parametrize(
'stream_func', [async_gen_stream, context_stream]
)
-def test_stream_from_single_subactor(arb_addr, start_method, stream_func):
+def test_stream_from_single_subactor(reg_addr, start_method, stream_func):
"""Verify streaming from a spawned async generator.
"""
trio.run(
partial(
stream_from_single_subactor,
- arb_addr,
+ reg_addr,
start_method,
stream_func=stream_func,
),
@@ -228,14 +228,14 @@ async def a_quadruple_example():
return result_stream
-async def cancel_after(wait, arb_addr):
- async with tractor.open_root_actor(arbiter_addr=arb_addr):
+async def cancel_after(wait, reg_addr):
+ async with tractor.open_root_actor(registry_addrs=[reg_addr]):
with trio.move_on_after(wait):
return await a_quadruple_example()
@pytest.fixture(scope='module')
-def time_quad_ex(arb_addr, ci_env, spawn_backend):
+def time_quad_ex(reg_addr, ci_env, spawn_backend):
if spawn_backend == 'mp':
"""no idea but the mp *nix runs are flaking out here often...
"""
@@ -243,7 +243,7 @@ def time_quad_ex(arb_addr, ci_env, spawn_backend):
timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4
start = time.time()
- results = trio.run(cancel_after, timeout, arb_addr)
+ results = trio.run(cancel_after, timeout, reg_addr)
diff = time.time() - start
assert results
return results, diff
@@ -263,14 +263,14 @@ def test_a_quadruple_example(time_quad_ex, ci_env, spawn_backend):
list(map(lambda i: i/10, range(3, 9)))
)
def test_not_fast_enough_quad(
- arb_addr, time_quad_ex, cancel_delay, ci_env, spawn_backend
+ reg_addr, time_quad_ex, cancel_delay, ci_env, spawn_backend
):
"""Verify we can cancel midway through the quad example and all actors
cancel gracefully.
"""
results, diff = time_quad_ex
delay = max(diff - cancel_delay, 0)
- results = trio.run(cancel_after, delay, arb_addr)
+ results = trio.run(cancel_after, delay, reg_addr)
system = platform.system()
if system in ('Windows', 'Darwin') and results is not None:
# In CI envoirments it seems later runs are quicker then the first
@@ -283,7 +283,7 @@ def test_not_fast_enough_quad(
@tractor_test
async def test_respawn_consumer_task(
- arb_addr,
+ reg_addr,
spawn_backend,
loglevel,
):
diff --git a/tests/test_local.py b/tests/test_local.py
index bb013043..a019d771 100644
--- a/tests/test_local.py
+++ b/tests/test_local.py
@@ -24,7 +24,7 @@ async def test_no_runtime():
@tractor_test
-async def test_self_is_registered(arb_addr):
+async def test_self_is_registered(reg_addr):
"Verify waiting on the arbiter to register itself using the standard api."
actor = tractor.current_actor()
assert actor.is_arbiter
@@ -34,20 +34,20 @@ async def test_self_is_registered(arb_addr):
@tractor_test
-async def test_self_is_registered_localportal(arb_addr):
+async def test_self_is_registered_localportal(reg_addr):
"Verify waiting on the arbiter to register itself using a local portal."
actor = tractor.current_actor()
assert actor.is_arbiter
- async with tractor.get_arbiter(*arb_addr) as portal:
+ async with tractor.get_arbiter(*reg_addr) as portal:
assert isinstance(portal, tractor._portal.LocalPortal)
with trio.fail_after(0.2):
sockaddr = await portal.run_from_ns(
'self', 'wait_for_actor', name='root')
- assert sockaddr[0] == arb_addr
+ assert sockaddr[0] == reg_addr
-def test_local_actor_async_func(arb_addr):
+def test_local_actor_async_func(reg_addr):
"""Verify a simple async function in-process.
"""
nums = []
@@ -55,7 +55,7 @@ def test_local_actor_async_func(arb_addr):
async def print_loop():
async with tractor.open_root_actor(
- arbiter_addr=arb_addr,
+ registry_addrs=[reg_addr],
):
# arbiter is started in-proc if dne
assert tractor.current_actor().is_arbiter
diff --git a/tests/test_multi_program.py b/tests/test_multi_program.py
index d3eadabf..92f4c52d 100644
--- a/tests/test_multi_program.py
+++ b/tests/test_multi_program.py
@@ -30,9 +30,9 @@ def test_abort_on_sigint(daemon):
@tractor_test
-async def test_cancel_remote_arbiter(daemon, arb_addr):
+async def test_cancel_remote_arbiter(daemon, reg_addr):
assert not tractor.current_actor().is_arbiter
- async with tractor.get_arbiter(*arb_addr) as portal:
+ async with tractor.get_arbiter(*reg_addr) as portal:
await portal.cancel_actor()
time.sleep(0.1)
@@ -41,16 +41,16 @@ async def test_cancel_remote_arbiter(daemon, arb_addr):
# no arbiter socket should exist
with pytest.raises(OSError):
- async with tractor.get_arbiter(*arb_addr) as portal:
+ async with tractor.get_arbiter(*reg_addr) as portal:
pass
-def test_register_duplicate_name(daemon, arb_addr):
+def test_register_duplicate_name(daemon, reg_addr):
async def main():
async with tractor.open_nursery(
- arbiter_addr=arb_addr,
+ registry_addrs=[reg_addr],
) as n:
assert not tractor.current_actor().is_arbiter
diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py
index 69f4c513..6d416f89 100644
--- a/tests/test_pubsub.py
+++ b/tests/test_pubsub.py
@@ -159,7 +159,7 @@ async def test_required_args(callwith_expecterror):
)
def test_multi_actor_subs_arbiter_pub(
loglevel,
- arb_addr,
+ reg_addr,
pub_actor,
):
"""Try out the neato @pub decorator system.
@@ -169,7 +169,7 @@ def test_multi_actor_subs_arbiter_pub(
async def main():
async with tractor.open_nursery(
- arbiter_addr=arb_addr,
+ registry_addrs=[reg_addr],
enable_modules=[__name__],
) as n:
@@ -254,12 +254,12 @@ async def main():
def test_single_subactor_pub_multitask_subs(
loglevel,
- arb_addr,
+ reg_addr,
):
async def main():
async with tractor.open_nursery(
- arbiter_addr=arb_addr,
+ registry_addrs=[reg_addr],
enable_modules=[__name__],
) as n:
diff --git a/tests/test_rpc.py b/tests/test_rpc.py
index b16f2f1d..71f3258b 100644
--- a/tests/test_rpc.py
+++ b/tests/test_rpc.py
@@ -52,7 +52,7 @@ async def short_sleep():
'fail_on_syntax',
],
)
-def test_rpc_errors(arb_addr, to_call, testdir):
+def test_rpc_errors(reg_addr, to_call, testdir):
"""Test errors when making various RPC requests to an actor
that either doesn't have the requested module exposed or doesn't define
the named function.
@@ -84,7 +84,7 @@ async def main():
# spawn a subactor which calls us back
async with tractor.open_nursery(
- arbiter_addr=arb_addr,
+ arbiter_addr=reg_addr,
enable_modules=exposed_mods.copy(),
) as n:
diff --git a/tests/test_spawning.py b/tests/test_spawning.py
index 3f4772e9..6a4b2988 100644
--- a/tests/test_spawning.py
+++ b/tests/test_spawning.py
@@ -16,14 +16,14 @@
async def spawn(
is_arbiter: bool,
data: dict,
- arb_addr: tuple[str, int],
+ reg_addr: tuple[str, int],
):
namespaces = [__name__]
await trio.sleep(0.1)
async with tractor.open_root_actor(
- arbiter_addr=arb_addr,
+ arbiter_addr=reg_addr,
):
actor = tractor.current_actor()
@@ -41,7 +41,7 @@ async def spawn(
is_arbiter=False,
name='sub-actor',
data=data,
- arb_addr=arb_addr,
+ reg_addr=reg_addr,
enable_modules=namespaces,
)
@@ -55,12 +55,12 @@ async def spawn(
return 10
-def test_local_arbiter_subactor_global_state(arb_addr):
+def test_local_arbiter_subactor_global_state(reg_addr):
result = trio.run(
spawn,
True,
data_to_pass_down,
- arb_addr,
+ reg_addr,
)
assert result == 10
@@ -140,7 +140,7 @@ async def check_loglevel(level):
def test_loglevel_propagated_to_subactor(
start_method,
capfd,
- arb_addr,
+ reg_addr,
):
if start_method == 'mp_forkserver':
pytest.skip(
@@ -152,7 +152,7 @@ async def main():
async with tractor.open_nursery(
name='arbiter',
start_method=start_method,
- arbiter_addr=arb_addr,
+ arbiter_addr=reg_addr,
) as tn:
await tn.run_in_actor(
diff --git a/tests/test_task_broadcasting.py b/tests/test_task_broadcasting.py
index 5e18e10a..d7a29134 100644
--- a/tests/test_task_broadcasting.py
+++ b/tests/test_task_broadcasting.py
@@ -66,13 +66,13 @@ async def ensure_sequence(
async def open_sequence_streamer(
sequence: list[int],
- arb_addr: tuple[str, int],
+ reg_addr: tuple[str, int],
start_method: str,
) -> tractor.MsgStream:
async with tractor.open_nursery(
- arbiter_addr=arb_addr,
+ arbiter_addr=reg_addr,
start_method=start_method,
) as tn:
@@ -93,7 +93,7 @@ async def open_sequence_streamer(
def test_stream_fan_out_to_local_subscriptions(
- arb_addr,
+ reg_addr,
start_method,
):
@@ -103,7 +103,7 @@ async def main():
async with open_sequence_streamer(
sequence,
- arb_addr,
+ reg_addr,
start_method,
) as stream:
@@ -138,7 +138,7 @@ async def main():
]
)
def test_consumer_and_parent_maybe_lag(
- arb_addr,
+ reg_addr,
start_method,
task_delays,
):
@@ -150,7 +150,7 @@ async def main():
async with open_sequence_streamer(
sequence,
- arb_addr,
+ reg_addr,
start_method,
) as stream:
@@ -211,7 +211,7 @@ async def main():
def test_faster_task_to_recv_is_cancelled_by_slower(
- arb_addr,
+ reg_addr,
start_method,
):
'''
@@ -225,7 +225,7 @@ async def main():
async with open_sequence_streamer(
sequence,
- arb_addr,
+ reg_addr,
start_method,
) as stream:
@@ -302,7 +302,7 @@ async def main():
def test_ensure_slow_consumers_lag_out(
- arb_addr,
+ reg_addr,
start_method,
):
'''This is a pure local task test; no tractor
diff --git a/tractor/__init__.py b/tractor/__init__.py
index 5c16bc4e..31f59598 100644
--- a/tractor/__init__.py
+++ b/tractor/__init__.py
@@ -18,74 +18,48 @@
tractor: structured concurrent ``trio``-"actors".
"""
-from ._clustering import open_actor_cluster
+
+from ._clustering import (
+ open_actor_cluster as open_actor_cluster,
+)
from ._context import (
- Context, # the type
- context, # a func-decorator
+ Context as Context, # the type
+ context as context, # a func-decorator
)
from ._streaming import (
- MsgStream,
- stream,
+ MsgStream as MsgStream,
+ stream as stream,
)
from ._discovery import (
- get_arbiter,
- find_actor,
- wait_for_actor,
- query_actor,
+ get_arbiter as get_arbiter,
+ find_actor as find_actor,
+ wait_for_actor as wait_for_actor,
+ query_actor as query_actor,
+)
+from ._supervise import (
+ open_nursery as open_nursery,
+ ActorNursery as ActorNursery,
)
-from ._supervise import open_nursery
from ._state import (
- current_actor,
- is_root_process,
+ current_actor as current_actor,
+ is_root_process as is_root_process,
)
from ._exceptions import (
- RemoteActorError,
- ModuleNotExposed,
- ContextCancelled,
+ RemoteActorError as RemoteActorError,
+ ModuleNotExposed as ModuleNotExposed,
+ ContextCancelled as ContextCancelled,
)
from .devx import (
- breakpoint,
- pause,
- pause_from_sync,
- post_mortem,
+ breakpoint as breakpoint,
+ pause as pause,
+ pause_from_sync as pause_from_sync,
+ post_mortem as post_mortem,
)
-from . import msg
+from . import msg as msg
from ._root import (
- run_daemon,
- open_root_actor,
+ run_daemon as run_daemon,
+ open_root_actor as open_root_actor,
)
-from ._ipc import Channel
-from ._portal import Portal
-from ._runtime import Actor
-
-
-__all__ = [
- 'Actor',
- 'BaseExceptionGroup',
- 'Channel',
- 'Context',
- 'ContextCancelled',
- 'ModuleNotExposed',
- 'MsgStream',
- 'Portal',
- 'RemoteActorError',
- 'breakpoint',
- 'context',
- 'current_actor',
- 'find_actor',
- 'query_actor',
- 'get_arbiter',
- 'is_root_process',
- 'msg',
- 'open_actor_cluster',
- 'open_nursery',
- 'open_root_actor',
- 'pause',
- 'post_mortem',
- 'pause_from_sync',
- 'query_actor',
- 'run_daemon',
- 'stream',
- 'to_asyncio',
- 'wait_for_actor',
-]
+from ._ipc import Channel as Channel
+from ._portal import Portal as Portal
+from ._runtime import Actor as Actor
diff --git a/tractor/_discovery.py b/tractor/_discovery.py
index 03775ac2..99a4dd68 100644
--- a/tractor/_discovery.py
+++ b/tractor/_discovery.py
@@ -15,32 +15,45 @@
# along with this program. If not, see .
"""
-Actor discovery API.
+Discovery (protocols) API for automatic addressing and location
+management of (service) actors.
"""
+from __future__ import annotations
from typing import (
- Optional,
- Union,
AsyncGenerator,
+ AsyncContextManager,
+ TYPE_CHECKING,
)
from contextlib import asynccontextmanager as acm
+import warnings
+from .trionics import gather_contexts
from ._ipc import _connect_chan, Channel
from ._portal import (
Portal,
open_portal,
LocalPortal,
)
-from ._state import current_actor, _runtime_vars
+from ._state import (
+ current_actor,
+ _runtime_vars,
+)
-@acm
-async def get_arbiter(
+if TYPE_CHECKING:
+ from ._runtime import Actor
+
+@acm
+async def get_registry(
host: str,
port: int,
-) -> AsyncGenerator[Union[Portal, LocalPortal], None]:
+) -> AsyncGenerator[
+ Portal | LocalPortal | None,
+ None,
+]:
'''
Return a portal instance connected to a local or remote
arbiter.
@@ -51,16 +64,33 @@ async def get_arbiter(
if not actor:
raise RuntimeError("No actor instance has been defined yet?")
- if actor.is_arbiter:
+ if actor.is_registrar:
# we're already the arbiter
# (likely a re-entrant call from the arbiter actor)
- yield LocalPortal(actor, Channel((host, port)))
+ yield LocalPortal(
+ actor,
+ Channel((host, port))
+ )
else:
- async with _connect_chan(host, port) as chan:
+ async with (
+ _connect_chan(host, port) as chan,
+ open_portal(chan) as regstr_ptl,
+ ):
+ yield regstr_ptl
+
- async with open_portal(chan) as arb_portal:
- yield arb_portal
+# TODO: deprecate and this remove _arbiter form!
+@acm
+async def get_arbiter(*args, **kwargs):
+ warnings.warn(
+ '`tractor.get_arbiter()` is now deprecated!\n'
+ 'Use `.get_registry()` instead!',
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ async with get_registry(*args, **kwargs) as to_yield:
+ yield to_yield
@acm
@@ -68,51 +98,80 @@ async def get_root(
**kwargs,
) -> AsyncGenerator[Portal, None]:
+ # TODO: rename mailbox to `_root_maddr` when we finally
+ # add and impl libp2p multi-addrs?
host, port = _runtime_vars['_root_mailbox']
assert host is not None
- async with _connect_chan(host, port) as chan:
- async with open_portal(chan, **kwargs) as portal:
- yield portal
+ async with (
+ _connect_chan(host, port) as chan,
+ open_portal(chan, **kwargs) as portal,
+ ):
+ yield portal
@acm
async def query_actor(
name: str,
- arbiter_sockaddr: Optional[tuple[str, int]] = None,
+ arbiter_sockaddr: tuple[str, int] | None = None,
+ regaddr: tuple[str, int] | None = None,
-) -> AsyncGenerator[tuple[str, int], None]:
+) -> AsyncGenerator[
+ tuple[str, int] | None,
+ None,
+]:
'''
- Simple address lookup for a given actor name.
+ Make a transport address lookup for an actor name to a specific
+ registrar.
- Returns the (socket) address or ``None``.
+ Returns the (socket) address or ``None`` if no entry under that
+ name exists for the given registrar listening @ `regaddr`.
'''
- actor = current_actor()
- async with get_arbiter(
- *arbiter_sockaddr or actor._arb_addr
- ) as arb_portal:
+ actor: Actor = current_actor()
+ if (
+ name == 'registrar'
+ and actor.is_registrar
+ ):
+ raise RuntimeError(
+ 'The current actor IS the registry!?'
+ )
- sockaddr = await arb_portal.run_from_ns(
+ if arbiter_sockaddr is not None:
+ warnings.warn(
+ '`tractor.query_actor(regaddr=)` is deprecated.\n'
+ 'Use `registry_addrs: list[tuple]` instead!',
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ regaddr: list[tuple[str, int]] = arbiter_sockaddr
+
+ reg_portal: Portal
+ regaddr: tuple[str, int] = regaddr or actor.reg_addrs[0]
+ async with get_registry(*regaddr) as reg_portal:
+ # TODO: return portals to all available actors - for now
+ # just the last one that registered
+ sockaddr: tuple[str, int] = await reg_portal.run_from_ns(
'self',
'find_actor',
name=name,
)
-
- # TODO: return portals to all available actors - for now just
- # the last one that registered
- if name == 'arbiter' and actor.is_arbiter:
- raise RuntimeError("The current actor is the arbiter")
-
- yield sockaddr if sockaddr else None
+ yield sockaddr
@acm
async def find_actor(
name: str,
- arbiter_sockaddr: tuple[str, int] | None = None
+ arbiter_sockaddr: tuple[str, int]|None = None,
+ registry_addrs: list[tuple[str, int]]|None = None,
+
+ only_first: bool = True,
+ raise_on_none: bool = False,
-) -> AsyncGenerator[Optional[Portal], None]:
+) -> AsyncGenerator[
+ Portal | list[Portal] | None,
+ None,
+]:
'''
Ask the arbiter to find actor(s) by name.
@@ -120,24 +179,83 @@ async def find_actor(
known to the arbiter.
'''
- async with query_actor(
- name=name,
- arbiter_sockaddr=arbiter_sockaddr,
- ) as sockaddr:
-
- if sockaddr:
- async with _connect_chan(*sockaddr) as chan:
- async with open_portal(chan) as portal:
- yield portal
- else:
+ if arbiter_sockaddr is not None:
+ warnings.warn(
+ '`tractor.find_actor(arbiter_sockaddr=)` is deprecated.\n'
+ 'Use `registry_addrs: list[tuple]` instead!',
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ registry_addrs: list[tuple[str, int]] = [arbiter_sockaddr]
+
+ @acm
+ async def maybe_open_portal_from_reg_addr(
+ addr: tuple[str, int],
+ ):
+ async with query_actor(
+ name=name,
+ regaddr=addr,
+ ) as sockaddr:
+ if sockaddr:
+ async with _connect_chan(*sockaddr) as chan:
+ async with open_portal(chan) as portal:
+ yield portal
+ else:
+ yield None
+
+ if not registry_addrs:
+ # XXX NOTE: make sure to dynamically read the value on
+ # every call since something may change it globally (eg.
+ # like in our discovery test suite)!
+ from . import _root
+ registry_addrs = (
+ _runtime_vars['_registry_addrs']
+ or
+ _root._default_lo_addrs
+ )
+
+ maybe_portals: list[
+ AsyncContextManager[tuple[str, int]]
+ ] = list(
+ maybe_open_portal_from_reg_addr(addr)
+ for addr in registry_addrs
+ )
+
+ async with gather_contexts(
+ mngrs=maybe_portals,
+ ) as portals:
+ # log.runtime(
+ # 'Gathered portals:\n'
+ # f'{portals}'
+ # )
+ # NOTE: `gather_contexts()` will return a
+ # `tuple[None, None, ..., None]` if no contact
+ # can be made with any regstrar at any of the
+ # N provided addrs!
+ if not any(portals):
+ if raise_on_none:
+ raise RuntimeError(
+ f'No actor "{name}" found registered @ {registry_addrs}'
+ )
yield None
+ return
+
+ portals: list[Portal] = list(portals)
+ if only_first:
+ yield portals[0]
+
+ else:
+ # TODO: currently this may return multiple portals
+ # given there are multi-homed or multiple registrars..
+ # SO, we probably need de-duplication logic?
+ yield portals
@acm
async def wait_for_actor(
name: str,
arbiter_sockaddr: tuple[str, int] | None = None,
- # registry_addr: tuple[str, int] | None = None,
+ registry_addr: tuple[str, int] | None = None,
) -> AsyncGenerator[Portal, None]:
'''
@@ -146,17 +264,31 @@ async def wait_for_actor(
A portal to the first registered actor is returned.
'''
- actor = current_actor()
-
- async with get_arbiter(
- *arbiter_sockaddr or actor._arb_addr,
- ) as arb_portal:
- sockaddrs = await arb_portal.run_from_ns(
+ actor: Actor = current_actor()
+
+ if arbiter_sockaddr is not None:
+ warnings.warn(
+ '`tractor.wait_for_actor(arbiter_sockaddr=)` is deprecated.\n'
+ 'Use `registry_addr: tuple` instead!',
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ registry_addr: tuple[str, int] = arbiter_sockaddr
+
+ # TODO: use `.trionics.gather_contexts()` like
+ # above in `find_actor()` as well?
+ reg_portal: Portal
+ regaddr: tuple[str, int] = registry_addr or actor.reg_addrs[0]
+ async with get_registry(*regaddr) as reg_portal:
+ sockaddrs = await reg_portal.run_from_ns(
'self',
'wait_for_actor',
name=name,
)
- sockaddr = sockaddrs[-1]
+
+ # get latest registered addr by default?
+ # TODO: offer multi-portal yields in multi-homed case?
+ sockaddr: tuple[str, int] = sockaddrs[-1]
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
diff --git a/tractor/_entry.py b/tractor/_entry.py
index a59975ce..0ac0dc47 100644
--- a/tractor/_entry.py
+++ b/tractor/_entry.py
@@ -47,8 +47,8 @@
def _mp_main(
- actor: Actor, # type: ignore
- accept_addr: tuple[str, int],
+ actor: Actor,
+ accept_addrs: list[tuple[str, int]],
forkserver_info: tuple[Any, Any, Any, Any, Any],
start_method: SpawnMethodKey,
parent_addr: tuple[str, int] | None = None,
@@ -77,8 +77,8 @@ def _mp_main(
log.debug(f"parent_addr is {parent_addr}")
trio_main = partial(
async_main,
- actor,
- accept_addr,
+ actor=actor,
+ accept_addrs=accept_addrs,
parent_addr=parent_addr
)
try:
@@ -96,7 +96,7 @@ def _mp_main(
def _trio_main(
- actor: Actor, # type: ignore
+ actor: Actor,
*,
parent_addr: tuple[str, int] | None = None,
infect_asyncio: bool = False,
diff --git a/tractor/_ipc.py b/tractor/_ipc.py
index e80a1c35..f57d3bd8 100644
--- a/tractor/_ipc.py
+++ b/tractor/_ipc.py
@@ -517,7 +517,9 @@ def connected(self) -> bool:
@acm
async def _connect_chan(
- host: str, port: int
+ host: str,
+ port: int
+
) -> typing.AsyncGenerator[Channel, None]:
'''
Create and connect a channel with disconnect on context manager
diff --git a/tractor/_multiaddr.py b/tractor/_multiaddr.py
new file mode 100644
index 00000000..e8713b40
--- /dev/null
+++ b/tractor/_multiaddr.py
@@ -0,0 +1,151 @@
+# tractor: structured concurrent "actors".
+# Copyright 2018-eternity Tyler Goodlet.
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+'''
+Multiaddress parser and utils according the spec(s) defined by
+`libp2p` and used in dependent project such as `ipfs`:
+
+- https://docs.libp2p.io/concepts/fundamentals/addressing/
+- https://github.com/libp2p/specs/blob/master/addressing/README.md
+
+'''
+from typing import Iterator
+
+from bidict import bidict
+
+# TODO: see if we can leverage libp2p ecosys projects instead of
+# rolling our own (parser) impls of the above addressing specs:
+# - https://github.com/libp2p/py-libp2p
+# - https://docs.libp2p.io/concepts/nat/circuit-relay/#relay-addresses
+# prots: bidict[int, str] = bidict({
+prots: bidict[int, str] = {
+ 'ipv4': 3,
+ 'ipv6': 3,
+ 'wg': 3,
+
+ 'tcp': 4,
+ 'udp': 4,
+
+ # TODO: support the next-gen shite Bo
+ # 'quic': 4,
+ # 'ssh': 7, # via rsyscall bootstrapping
+}
+
+prot_params: dict[str, tuple[str]] = {
+ 'ipv4': ('addr',),
+ 'ipv6': ('addr',),
+ 'wg': ('addr', 'port', 'pubkey'),
+
+ 'tcp': ('port',),
+ 'udp': ('port',),
+
+ # 'quic': ('port',),
+ # 'ssh': ('port',),
+}
+
+
+def iter_prot_layers(
+ multiaddr: str,
+) -> Iterator[
+ tuple[
+ int,
+ list[str]
+ ]
+]:
+ '''
+ Unpack a libp2p style "multiaddress" into multiple "segments"
+ for each "layer" of the protocoll stack (in OSI terms).
+
+ '''
+ tokens: list[str] = multiaddr.split('/')
+ root, tokens = tokens[0], tokens[1:]
+ assert not root # there is a root '/' on LHS
+ itokens = iter(tokens)
+
+ prot: str | None = None
+ params: list[str] = []
+ for token in itokens:
+ # every prot path should start with a known
+ # key-str.
+ if token in prots:
+ if prot is None:
+ prot: str = token
+ else:
+ yield prot, params
+ prot = token
+
+ params = []
+
+ elif token not in prots:
+ params.append(token)
+
+ else:
+ yield prot, params
+
+
+def parse_maddr(
+ multiaddr: str,
+) -> dict[str, str | int | dict]:
+ '''
+ Parse a libp2p style "multiaddress" into its distinct protocol
+ segments where each segment is of the form:
+
+ `..////../`
+
+ and is loaded into a (order preserving) `layers: dict[str,
+ dict[str, Any]` which holds each protocol-layer-segment of the
+ original `str` path as a separate entry according to its approx
+ OSI "layer number".
+
+ Any `paramN` in the path must be distinctly defined by a str-token in the
+ (module global) `prot_params` table.
+
+ For eg. for wireguard which requires an address, port number and publickey
+ the protocol params are specified as the entry:
+
+ 'wg': ('addr', 'port', 'pubkey'),
+
+ and are thus parsed from a maddr in that order:
+ `'/wg/1.1.1.1/51820/'`
+
+ '''
+ layers: dict[str, str | int | dict] = {}
+ for (
+ prot_key,
+ params,
+ ) in iter_prot_layers(multiaddr):
+
+ layer: int = prots[prot_key] # OSI layer used for sorting
+ ep: dict[str, int | str] = {'layer': layer}
+ layers[prot_key] = ep
+
+ # TODO; validation and resolving of names:
+ # - each param via a validator provided as part of the
+ # prot_params def? (also see `"port"` case below..)
+ # - do a resolv step that will check addrs against
+ # any loaded network.resolv: dict[str, str]
+ rparams: list = list(reversed(params))
+ for key in prot_params[prot_key]:
+ val: str | int = rparams.pop()
+
+ # TODO: UGHH, dunno what we should do for validation
+ # here, put it in the params spec somehow?
+ if key == 'port':
+ val = int(val)
+
+ ep[key] = val
+
+ return layers
diff --git a/tractor/_portal.py b/tractor/_portal.py
index 0ca44483..ac602dd5 100644
--- a/tractor/_portal.py
+++ b/tractor/_portal.py
@@ -461,7 +461,12 @@ class LocalPortal:
actor: 'Actor' # type: ignore # noqa
channel: Channel
- async def run_from_ns(self, ns: str, func_name: str, **kwargs) -> Any:
+ async def run_from_ns(
+ self,
+ ns: str,
+ func_name: str,
+ **kwargs,
+ ) -> Any:
'''
Run a requested local function from a namespace path and
return it's result.
diff --git a/tractor/_root.py b/tractor/_root.py
index a1a11d3b..e1a7fb6c 100644
--- a/tractor/_root.py
+++ b/tractor/_root.py
@@ -25,7 +25,6 @@
import signal
import sys
import os
-import typing
import warnings
@@ -47,8 +46,14 @@
# set at startup and after forks
-_default_arbiter_host: str = '127.0.0.1'
-_default_arbiter_port: int = 1616
+_default_host: str = '127.0.0.1'
+_default_port: int = 1616
+
+# default registry always on localhost
+_default_lo_addrs: list[tuple[str, int]] = [(
+ _default_host,
+ _default_port,
+)]
logger = log.get_logger('tractor')
@@ -59,28 +64,32 @@ async def open_root_actor(
*,
# defaults are above
- arbiter_addr: tuple[str, int] | None = None,
+ registry_addrs: list[tuple[str, int]]|None = None,
# defaults are above
- registry_addr: tuple[str, int] | None = None,
+ arbiter_addr: tuple[str, int]|None = None,
- name: str | None = 'root',
+ name: str|None = 'root',
# either the `multiprocessing` start method:
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
# OR `trio` (the new default).
- start_method: _spawn.SpawnMethodKey | None = None,
+ start_method: _spawn.SpawnMethodKey|None = None,
# enables the multi-process debugger support
debug_mode: bool = False,
# internal logging
- loglevel: str | None = None,
+ loglevel: str|None = None,
+
+ enable_modules: list|None = None,
+ rpc_module_paths: list|None = None,
- enable_modules: list | None = None,
- rpc_module_paths: list | None = None,
+ # NOTE: allow caller to ensure that only one registry exists
+ # and that this call creates it.
+ ensure_registry: bool = False,
-) -> typing.Any:
+) -> Actor:
'''
Runtime init entry point for ``tractor``.
@@ -100,7 +109,11 @@ async def open_root_actor(
_state._runtime_vars['_is_root'] = True
# caps based rpc list
- enable_modules = enable_modules or []
+ enable_modules = (
+ enable_modules
+ or
+ []
+ )
if rpc_module_paths:
warnings.warn(
@@ -116,20 +129,19 @@ async def open_root_actor(
if arbiter_addr is not None:
warnings.warn(
- '`arbiter_addr` is now deprecated and has been renamed to'
- '`registry_addr`.\nUse that instead..',
+ '`arbiter_addr` is now deprecated\n'
+ 'Use `registry_addrs: list[tuple]` instead..',
DeprecationWarning,
stacklevel=2,
)
+ registry_addrs = [arbiter_addr]
- registry_addr = (host, port) = (
- registry_addr
- or arbiter_addr
- or (
- _default_arbiter_host,
- _default_arbiter_port,
- )
+ registry_addrs: list[tuple[str, int]] = (
+ registry_addrs
+ or
+ _default_lo_addrs
)
+ assert registry_addrs
loglevel = (
loglevel
@@ -177,73 +189,131 @@ async def open_root_actor(
'`stackscope` not installed for use in debug mode!'
)
- try:
- # make a temporary connection to see if an arbiter exists,
- # if one can't be made quickly we assume none exists.
- arbiter_found = False
-
- # TODO: this connect-and-bail forces us to have to carefully
- # rewrap TCP 104-connection-reset errors as EOF so as to avoid
- # propagating cancel-causing errors to the channel-msg loop
- # machinery. Likely it would be better to eventually have
- # a "discovery" protocol with basic handshake instead.
- with trio.move_on_after(1):
- async with _connect_chan(host, port):
- arbiter_found = True
-
- except OSError:
- # TODO: make this a "discovery" log level?
- logger.warning(f"No actor registry found @ {host}:{port}")
-
- # create a local actor and start up its main routine/task
- if arbiter_found:
+ # closed into below ping task-func
+ ponged_addrs: list[tuple[str, int]] = []
+
+ async def ping_tpt_socket(
+ addr: tuple[str, int],
+ timeout: float = 1,
+ ) -> None:
+ '''
+ Attempt temporary connection to see if a registry is
+ listening at the requested address by a tranport layer
+ ping.
+
+ If a connection can't be made quickly we assume none no
+ server is listening at that addr.
+
+ '''
+ try:
+ # TODO: this connect-and-bail forces us to have to
+ # carefully rewrap TCP 104-connection-reset errors as
+ # EOF so as to avoid propagating cancel-causing errors
+ # to the channel-msg loop machinery. Likely it would
+ # be better to eventually have a "discovery" protocol
+ # with basic handshake instead?
+ with trio.move_on_after(timeout):
+ async with _connect_chan(*addr):
+ ponged_addrs.append(addr)
+
+ except OSError:
+ # TODO: make this a "discovery" log level?
+ logger.warning(f'No actor registry found @ {addr}')
+
+ async with trio.open_nursery() as tn:
+ for addr in registry_addrs:
+ tn.start_soon(
+ ping_tpt_socket,
+ tuple(addr), # TODO: just drop this requirement?
+ )
+
+ trans_bind_addrs: list[tuple[str, int]] = []
+
+ # Create a new local root-actor instance which IS NOT THE
+ # REGISTRAR
+ if ponged_addrs:
+
+ if ensure_registry:
+ raise RuntimeError(
+ f'Failed to open `{name}`@{ponged_addrs}: '
+ 'registry socket(s) already bound'
+ )
# we were able to connect to an arbiter
- logger.info(f"Arbiter seems to exist @ {host}:{port}")
+ logger.info(
+ f'Registry(s) seem(s) to exist @ {ponged_addrs}'
+ )
actor = Actor(
- name or 'anonymous',
- arbiter_addr=registry_addr,
+ name=name or 'anonymous',
+ registry_addrs=ponged_addrs,
loglevel=loglevel,
enable_modules=enable_modules,
)
- host, port = (host, 0)
-
+ # DO NOT use the registry_addrs as the transport server
+ # addrs for this new non-registar, root-actor.
+ for host, port in ponged_addrs:
+ # NOTE: zero triggers dynamic OS port allocation
+ trans_bind_addrs.append((host, 0))
+
+ # Start this local actor as the "registrar", aka a regular
+ # actor who manages the local registry of "mailboxes" of
+ # other process-tree-local sub-actors.
else:
- # start this local actor as the arbiter (aka a regular actor who
- # manages the local registry of "mailboxes")
- # Note that if the current actor is the arbiter it is desirable
- # for it to stay up indefinitely until a re-election process has
- # taken place - which is not implemented yet FYI).
+ # NOTE that if the current actor IS THE REGISTAR, the
+ # following init steps are taken:
+ # - the tranport layer server is bound to each (host, port)
+ # pair defined in provided registry_addrs, or the default.
+ trans_bind_addrs = registry_addrs
+
+ # - it is normally desirable for any registrar to stay up
+ # indefinitely until either all registered (child/sub)
+ # actors are terminated (via SC supervision) or,
+ # a re-election process has taken place.
+ # NOTE: all of ^ which is not implemented yet - see:
+ # https://github.com/goodboy/tractor/issues/216
+ # https://github.com/goodboy/tractor/pull/348
+ # https://github.com/goodboy/tractor/issues/296
actor = Arbiter(
- name or 'arbiter',
- arbiter_addr=registry_addr,
+ name or 'registrar',
+ registry_addrs=registry_addrs,
loglevel=loglevel,
enable_modules=enable_modules,
)
+ # Start up main task set via core actor-runtime nurseries.
try:
# assign process-local actor
_state._current_actor = actor
# start local channel-server and fake the portal API
# NOTE: this won't block since we provide the nursery
- logger.info(f"Starting local {actor} @ {host}:{port}")
+ ml_addrs_str: str = '\n'.join(
+ f'@{addr}' for addr in trans_bind_addrs
+ )
+ logger.info(
+ f'Starting local {actor.uid} on the following transport addrs:\n'
+ f'{ml_addrs_str}'
+ )
# start the actor runtime in a new task
async with trio.open_nursery() as nursery:
- # ``_runtime.async_main()`` creates an internal nursery and
- # thus blocks here until the entire underlying actor tree has
- # terminated thereby conducting structured concurrency.
-
+ # ``_runtime.async_main()`` creates an internal nursery
+ # and blocks here until any underlying actor(-process)
+ # tree has terminated thereby conducting so called
+ # "end-to-end" structured concurrency throughout an
+ # entire hierarchical python sub-process set; all
+ # "actor runtime" primitives are SC-compat and thus all
+ # transitively spawned actors/processes must be as
+ # well.
await nursery.start(
partial(
async_main,
actor,
- accept_addr=(host, port),
+ accept_addrs=trans_bind_addrs,
parent_addr=None
)
)
@@ -255,7 +325,7 @@ async def open_root_actor(
BaseExceptionGroup,
) as err:
- entered = await _debug._maybe_enter_pm(err)
+ entered: bool = await _debug._maybe_enter_pm(err)
if (
not entered
and
@@ -263,7 +333,8 @@ async def open_root_actor(
):
logger.exception('Root actor crashed:\n')
- # always re-raise
+ # ALWAYS re-raise any error bubbled up from the
+ # runtime!
raise
finally:
@@ -284,7 +355,7 @@ async def open_root_actor(
_state._current_actor = None
_state._last_actor_terminated = actor
- # restore breakpoint hook state
+ # restore built-in `breakpoint()` hook state
sys.breakpointhook = builtin_bp_handler
if orig_bp_path is not None:
os.environ['PYTHONBREAKPOINT'] = orig_bp_path
@@ -300,10 +371,7 @@ def run_daemon(
# runtime kwargs
name: str | None = 'root',
- registry_addr: tuple[str, int] = (
- _default_arbiter_host,
- _default_arbiter_port,
- ),
+ registry_addrs: list[tuple[str, int]] = _default_lo_addrs,
start_method: str | None = None,
debug_mode: bool = False,
@@ -327,7 +395,7 @@ def run_daemon(
async def _main():
async with open_root_actor(
- registry_addr=registry_addr,
+ registry_addrs=registry_addrs,
name=name,
start_method=start_method,
debug_mode=debug_mode,
diff --git a/tractor/_runtime.py b/tractor/_runtime.py
index 6b3d9461..66a5381c 100644
--- a/tractor/_runtime.py
+++ b/tractor/_runtime.py
@@ -45,6 +45,7 @@
from itertools import chain
import importlib
import importlib.util
+import os
from pprint import pformat
import signal
import sys
@@ -55,7 +56,7 @@
)
import uuid
from types import ModuleType
-import os
+import warnings
import trio
from trio import (
@@ -77,8 +78,8 @@
ContextCancelled,
TransportClosed,
)
-from ._discovery import get_arbiter
from .devx import _debug
+from ._discovery import get_registry
from ._portal import Portal
from . import _state
from . import _mp_fixup_main
@@ -127,6 +128,11 @@ class Actor:
# ugh, we need to get rid of this and replace with a "registry" sys
# https://github.com/goodboy/tractor/issues/216
is_arbiter: bool = False
+
+ @property
+ def is_registrar(self) -> bool:
+ return self.is_arbiter
+
msg_buffer_size: int = 2**6
# nursery placeholders filled in by `async_main()` after fork
@@ -162,10 +168,14 @@ def __init__(
name: str,
*,
enable_modules: list[str] = [],
- uid: str | None = None,
- loglevel: str | None = None,
+ uid: str|None = None,
+ loglevel: str|None = None,
+ registry_addrs: list[tuple[str, int]]|None = None,
+ spawn_method: str|None = None,
+
+ # TODO: remove!
arbiter_addr: tuple[str, int] | None = None,
- spawn_method: str | None = None
+
) -> None:
'''
This constructor is called in the parent actor **before** the spawning
@@ -189,27 +199,30 @@ def __init__(
# always include debugging tools module
enable_modules.append('tractor.devx._debug')
- mods = {}
+ self.enable_modules: dict[str, str] = {}
for name in enable_modules:
- mod = importlib.import_module(name)
- mods[name] = _get_mod_abspath(mod)
+ mod: ModuleType = importlib.import_module(name)
+ self.enable_modules[name] = _get_mod_abspath(mod)
- self.enable_modules = mods
self._mods: dict[str, ModuleType] = {}
- self.loglevel = loglevel
-
- self._arb_addr: tuple[str, int] | None = (
- str(arbiter_addr[0]),
- int(arbiter_addr[1])
- ) if arbiter_addr else None
+ self.loglevel: str = loglevel
+
+ if arbiter_addr is not None:
+ warnings.warn(
+ '`Actor(arbiter_addr=)` is now deprecated.\n'
+ 'Use `registry_addrs: list[tuple]` instead.',
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ registry_addrs: list[tuple[str, int]] = [arbiter_addr]
# marked by the process spawning backend at startup
# will be None for the parent most process started manually
# by the user (currently called the "arbiter")
- self._spawn_method = spawn_method
+ self._spawn_method: str = spawn_method
self._peers: defaultdict = defaultdict(list)
- self._peer_connected: dict = {}
+ self._peer_connected: dict[tuple[str, str], trio.Event] = {}
self._no_more_peers = trio.Event()
self._no_more_peers.set()
self._ongoing_rpc_tasks = trio.Event()
@@ -239,6 +252,45 @@ def __init__(
ActorNursery | None,
] = {} # type: ignore # noqa
+ # when provided, init the registry addresses property from
+ # input via the validator.
+ self._reg_addrs: list[tuple[str, int]] = []
+ if registry_addrs:
+ self.reg_addrs: list[tuple[str, int]] = registry_addrs
+ _state._runtime_vars['_registry_addrs'] = registry_addrs
+
+ @property
+ def reg_addrs(self) -> list[tuple[str, int]]:
+ '''
+ List of (socket) addresses for all known (and contactable)
+ registry actors.
+
+ '''
+ return self._reg_addrs
+
+ @reg_addrs.setter
+ def reg_addrs(
+ self,
+ addrs: list[tuple[str, int]],
+ ) -> None:
+ if not addrs:
+ log.warning(
+ 'Empty registry address list is invalid:\n'
+ f'{addrs}'
+ )
+ return
+
+ # always sanity check the input list since it's critical
+ # that addrs are correct for discovery sys operation.
+ for addr in addrs:
+ if not isinstance(addr, tuple):
+ raise ValueError(
+ 'Expected `Actor.reg_addrs: list[tuple[str, int]]`\n'
+ f'Got {addrs}'
+ )
+
+ self._reg_addrs = addrs
+
async def wait_for_peer(
self, uid: tuple[str, str]
) -> tuple[trio.Event, Channel]:
@@ -336,6 +388,12 @@ async def _stream_handler(
self._no_more_peers = trio.Event() # unset by making new
chan = Channel.from_stream(stream)
their_uid: tuple[str, str]|None = chan.uid
+ if their_uid:
+ log.warning(
+ f'Re-connection from already known {their_uid}'
+ )
+ else:
+ log.runtime(f'New connection to us @{chan.raddr}')
con_msg: str = ''
if their_uid:
@@ -517,16 +575,19 @@ async def _stream_handler(
if disconnected:
# if the transport died and this actor is still
- # registered within a local nursery, we report that the
- # IPC layer may have failed unexpectedly since it may be
- # the cause of other downstream errors.
+ # registered within a local nursery, we report
+ # that the IPC layer may have failed
+ # unexpectedly since it may be the cause of
+ # other downstream errors.
entry = local_nursery._children.get(uid)
if entry:
proc: trio.Process
_, proc, _ = entry
- poll = getattr(proc, 'poll', None)
- if poll and poll() is None:
+ if (
+ (poll := getattr(proc, 'poll', None))
+ and poll() is None
+ ):
log.cancel(
f'Peer IPC broke but subproc is alive?\n\n'
@@ -880,11 +941,11 @@ async def _from_parent(
)
await chan.connect()
+ # TODO: move this into a `Channel.handshake()`?
# Initial handshake: swap names.
await self._do_handshake(chan)
- accept_addr: tuple[str, int] | None = None
-
+ accept_addrs: list[tuple[str, int]] | None = None
if self._spawn_method == "trio":
# Receive runtime state from our parent
parent_data: dict[str, Any]
@@ -897,10 +958,7 @@ async def _from_parent(
# if "trace"/"util" mode is enabled?
f'{pformat(parent_data)}\n'
)
- accept_addr = (
- parent_data.pop('bind_host'),
- parent_data.pop('bind_port'),
- )
+ accept_addrs: list[tuple[str, int]] = parent_data.pop('bind_addrs')
rvs = parent_data.pop('_runtime_vars')
if rvs['_debug_mode']:
@@ -918,18 +976,23 @@ async def _from_parent(
_state._runtime_vars.update(rvs)
for attr, value in parent_data.items():
-
- if attr == '_arb_addr':
+ if (
+ attr == 'reg_addrs'
+ and value
+ ):
# XXX: ``msgspec`` doesn't support serializing tuples
# so just cash manually here since it's what our
# internals expect.
- value = tuple(value) if value else None
- self._arb_addr = value
+ # TODO: we don't really NEED these as
+ # tuples so we can probably drop this
+ # casting since apparently in python lists
+ # are "more efficient"?
+ self.reg_addrs = [tuple(val) for val in value]
else:
setattr(self, attr, value)
- return chan, accept_addr
+ return chan, accept_addrs
except OSError: # failed to connect
log.warning(
@@ -946,9 +1009,9 @@ async def _serve_forever(
handler_nursery: Nursery,
*,
# (host, port) to bind for channel server
- accept_host: tuple[str, int] | None = None,
- accept_port: int = 0,
- task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED,
+ listen_sockaddrs: list[tuple[str, int]] | None = None,
+
+ task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Start the IPC transport server, begin listening for new connections.
@@ -958,30 +1021,40 @@ async def _serve_forever(
`.cancel_server()` is called.
'''
+ if listen_sockaddrs is None:
+ listen_sockaddrs = [(None, 0)]
+
self._server_down = trio.Event()
try:
async with trio.open_nursery() as server_n:
- listeners: list[trio.abc.Listener] = await server_n.start(
- partial(
- trio.serve_tcp,
- self._stream_handler,
- # new connections will stay alive even if this server
- # is cancelled
- handler_nursery=handler_nursery,
- port=accept_port,
- host=accept_host,
+
+ for host, port in listen_sockaddrs:
+ listeners: list[trio.abc.Listener] = await server_n.start(
+ partial(
+ trio.serve_tcp,
+
+ handler=self._stream_handler,
+ port=port,
+ host=host,
+
+ # NOTE: configured such that new
+ # connections will stay alive even if
+ # this server is cancelled!
+ handler_nursery=handler_nursery,
+ )
)
- )
- sockets: list[trio.socket] = [
- getattr(listener, 'socket', 'unknown socket')
- for listener in listeners
- ]
- log.runtime(
- 'Started TCP server(s)\n'
- f'|_{sockets}\n'
- )
- self._listeners.extend(listeners)
+ sockets: list[trio.socket] = [
+ getattr(listener, 'socket', 'unknown socket')
+ for listener in listeners
+ ]
+ log.runtime(
+ 'Started TCP server(s)\n'
+ f'|_{sockets}\n'
+ )
+ self._listeners.extend(listeners)
+
task_status.started(server_n)
+
finally:
# signal the server is down since nursery above terminated
self._server_down.set()
@@ -1318,6 +1391,19 @@ def cancel_server(self) -> None:
log.runtime("Shutting down channel server")
self._server_n.cancel_scope.cancel()
+ @property
+ def accept_addrs(self) -> list[tuple[str, int]]:
+ '''
+ All addresses to which the transport-channel server binds
+ and listens for new connections.
+
+ '''
+ # throws OSError on failure
+ return [
+ listener.socket.getsockname()
+ for listener in self._listeners
+ ] # type: ignore
+
@property
def accept_addr(self) -> tuple[str, int]:
'''
@@ -1326,7 +1412,7 @@ def accept_addr(self) -> tuple[str, int]:
'''
# throws OSError on failure
- return self._listeners[0].socket.getsockname() # type: ignore
+ return self.accept_addrs[0]
def get_parent(self) -> Portal:
'''
@@ -1343,6 +1429,7 @@ def get_chans(self, uid: tuple[str, str]) -> list[Channel]:
'''
return self._peers[uid]
+ # TODO: move to `Channel.handshake(uid)`
async def _do_handshake(
self,
chan: Channel
@@ -1379,7 +1466,7 @@ def is_infected_aio(self) -> bool:
async def async_main(
actor: Actor,
- accept_addr: tuple[str, int] | None = None,
+ accept_addrs: tuple[str, int] | None = None,
# XXX: currently ``parent_addr`` is only needed for the
# ``multiprocessing`` backend (which pickles state sent to
@@ -1407,20 +1494,25 @@ async def async_main(
# on our debugger lock state.
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
- registered_with_arbiter = False
+ is_registered: bool = False
try:
# establish primary connection with immediate parent
- actor._parent_chan = None
+ actor._parent_chan: Channel | None = None
if parent_addr is not None:
- actor._parent_chan, accept_addr_rent = await actor._from_parent(
- parent_addr)
+ (
+ actor._parent_chan,
+ set_accept_addr_says_rent,
+ ) = await actor._from_parent(parent_addr)
- # either it's passed in because we're not a child
- # or because we're running in mp mode
- if accept_addr_rent is not None:
- accept_addr = accept_addr_rent
+ # either it's passed in because we're not a child or
+ # because we're running in mp mode
+ if (
+ set_accept_addr_says_rent
+ and set_accept_addr_says_rent is not None
+ ):
+ accept_addrs = set_accept_addr_says_rent
# The "root" nursery ensures the channel with the immediate
# parent is kept alive as a resilient service until
@@ -1460,34 +1552,72 @@ async def async_main(
# - subactor: the bind address is sent by our parent
# over our established channel
# - root actor: the ``accept_addr`` passed to this method
- assert accept_addr
- host, port = accept_addr
-
- actor._server_n = await service_nursery.start(
- partial(
- actor._serve_forever,
- service_nursery,
- accept_host=host,
- accept_port=port
+ assert accept_addrs
+
+ try:
+ actor._server_n = await service_nursery.start(
+ partial(
+ actor._serve_forever,
+ service_nursery,
+ listen_sockaddrs=accept_addrs,
+ )
)
- )
- accept_addr = actor.accept_addr
+ except OSError as oserr:
+ # NOTE: always allow runtime hackers to debug
+ # tranport address bind errors - normally it's
+ # something silly like the wrong socket-address
+ # passed via a config or CLI Bo
+ entered_debug: bool = await _debug._maybe_enter_pm(oserr)
+ if not entered_debug:
+ log.exception('Failed to init IPC channel server !?\n')
+ raise
+
+ accept_addrs: list[tuple[str, int]] = actor.accept_addrs
+
+ # NOTE: only set the loopback addr for the
+ # process-tree-global "root" mailbox since
+ # all sub-actors should be able to speak to
+ # their root actor over that channel.
if _state._runtime_vars['_is_root']:
- _state._runtime_vars['_root_mailbox'] = accept_addr
+ for addr in accept_addrs:
+ host, _ = addr
+ # TODO: generic 'lo' detector predicate
+ if '127.0.0.1' in host:
+ _state._runtime_vars['_root_mailbox'] = addr
# Register with the arbiter if we're told its addr
- log.runtime(f"Registering {actor} for role `{actor.name}`")
- assert isinstance(actor._arb_addr, tuple)
-
- async with get_arbiter(*actor._arb_addr) as arb_portal:
- await arb_portal.run_from_ns(
- 'self',
- 'register_actor',
- uid=actor.uid,
- sockaddr=accept_addr,
- )
+ log.runtime(
+ f'Registering `{actor.name}` ->\n'
+ f'{pformat(accept_addrs)}'
+ )
- registered_with_arbiter = True
+ # TODO: ideally we don't fan out to all registrars
+ # if addresses point to the same actor..
+ # So we need a way to detect that? maybe iterate
+ # only on unique actor uids?
+ for addr in actor.reg_addrs:
+ try:
+ assert isinstance(addr, tuple)
+ assert addr[1] # non-zero after bind
+ except AssertionError:
+ await _debug.pause()
+
+ async with get_registry(*addr) as reg_portal:
+ for accept_addr in accept_addrs:
+
+ if not accept_addr[1]:
+ await _debug.pause()
+
+ assert accept_addr[1]
+
+ await reg_portal.run_from_ns(
+ 'self',
+ 'register_actor',
+ uid=actor.uid,
+ sockaddr=accept_addr,
+ )
+
+ is_registered: bool = True
# init steps complete
task_status.started()
@@ -1520,18 +1650,20 @@ async def async_main(
log.runtime("Closing all actor lifetime contexts")
actor.lifetime_stack.close()
- if not registered_with_arbiter:
+ if not is_registered:
# TODO: I guess we could try to connect back
# to the parent through a channel and engage a debugger
# once we have that all working with std streams locking?
log.exception(
f"Actor errored and failed to register with arbiter "
- f"@ {actor._arb_addr}?")
+ f"@ {actor.reg_addrs[0]}?")
log.error(
- "\n\n\t^^^ THIS IS PROBABLY A TRACTOR BUGGGGG!!! ^^^\n"
- "\tCALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN.\n\n"
- "\tYOUR PARENT CODE IS GOING TO KEEP WORKING FINE!!!\n"
- "\tTHIS IS HOW RELIABlE SYSTEMS ARE SUPPOSED TO WORK!?!?\n"
+ "\n\n\t^^^ THIS IS PROBABLY AN INTERNAL `tractor` BUG! ^^^\n\n"
+ "\t>> CALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN <<\n\n"
+ "\tIf this is a sub-actor hopefully its parent will keep running "
+ "correctly presuming this error was safely ignored..\n\n"
+ "\tPLEASE REPORT THIS TRACEBACK IN A BUG REPORT: "
+ "https://github.com/goodboy/tractor/issues\n"
)
if actor._parent_chan:
@@ -1571,27 +1703,33 @@ async def async_main(
# Unregister actor from the registry-sys / registrar.
if (
- registered_with_arbiter
- and not actor.is_arbiter
+ is_registered
+ and not actor.is_registrar
):
- failed = False
- assert isinstance(actor._arb_addr, tuple)
- with trio.move_on_after(0.5) as cs:
- cs.shield = True
- try:
- async with get_arbiter(*actor._arb_addr) as arb_portal:
- await arb_portal.run_from_ns(
- 'self',
- 'unregister_actor',
- uid=actor.uid
- )
- except OSError:
+ failed: bool = False
+ for addr in actor.reg_addrs:
+ assert isinstance(addr, tuple)
+ with trio.move_on_after(0.5) as cs:
+ cs.shield = True
+ try:
+ async with get_registry(
+ *addr,
+ ) as reg_portal:
+ await reg_portal.run_from_ns(
+ 'self',
+ 'unregister_actor',
+ uid=actor.uid
+ )
+ except OSError:
+ failed = True
+ if cs.cancelled_caught:
failed = True
- if cs.cancelled_caught:
- failed = True
- if failed:
- log.warning(
- f"Failed to unregister {actor.name} from arbiter")
+
+ if failed:
+ log.warning(
+ f'Failed to unregister {actor.name} from '
+ f'registar @ {addr}'
+ )
# Ensure all peers (actors connected to us as clients) are finished
if not actor._no_more_peers.is_set():
@@ -1610,18 +1748,36 @@ async def async_main(
# TODO: rename to `Registry` and move to `._discovery`!
class Arbiter(Actor):
'''
- A special actor who knows all the other actors and always has
- access to a top level nursery.
-
- The arbiter is by default the first actor spawned on each host
- and is responsible for keeping track of all other actors for
- coordination purposes. If a new main process is launched and an
- arbiter is already running that arbiter will be used.
+ A special registrar actor who can contact all other actors
+ within its immediate process tree and possibly keeps a registry
+ of others meant to be discoverable in a distributed
+ application. Normally the registrar is also the "root actor"
+ and thus always has access to the top-most-level actor
+ (process) nursery.
+
+ By default, the registrar is always initialized when and if no
+ other registrar socket addrs have been specified to runtime
+ init entry-points (such as `open_root_actor()` or
+ `open_nursery()`). Any time a new main process is launched (and
+ thus thus a new root actor created) and, no existing registrar
+ can be contacted at the provided `registry_addr`, then a new
+ one is always created; however, if one can be reached it is
+ used.
+
+ Normally a distributed app requires at least registrar per
+ logical host where for that given "host space" (aka localhost
+ IPC domain of addresses) it is responsible for making all other
+ host (local address) bound actors *discoverable* to external
+ actor trees running on remote hosts.
'''
is_arbiter = True
- def __init__(self, *args, **kwargs) -> None:
+ def __init__(
+ self,
+ *args,
+ **kwargs,
+ ) -> None:
self._registry: dict[
tuple[str, str],
@@ -1663,7 +1819,10 @@ async def get_registry(
# unpacker since we have tuples as keys (not this makes the
# arbiter suscetible to hashdos):
# https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10
- return {'.'.join(key): val for key, val in self._registry.items()}
+ return {
+ '.'.join(key): val
+ for key, val in self._registry.items()
+ }
async def wait_for_actor(
self,
@@ -1706,8 +1865,15 @@ async def register_actor(
sockaddr: tuple[str, int]
) -> None:
- uid = name, _ = (str(uid[0]), str(uid[1]))
- self._registry[uid] = (str(sockaddr[0]), int(sockaddr[1]))
+ uid = name, hash = (str(uid[0]), str(uid[1]))
+ addr = (host, port) = (
+ str(sockaddr[0]),
+ int(sockaddr[1]),
+ )
+ if port == 0:
+ await _debug.pause()
+ assert port # should never be 0-dynamic-os-alloc
+ self._registry[uid] = addr
# pop and signal all waiter events
events = self._waiters.pop(name, [])
diff --git a/tractor/_spawn.py b/tractor/_spawn.py
index 78c38c84..aa0e9bf1 100644
--- a/tractor/_spawn.py
+++ b/tractor/_spawn.py
@@ -220,6 +220,10 @@ async def hard_kill(
# whilst also hacking on it XD
# terminate_after: int = 99999,
+ # NOTE: for mucking with `.pause()`-ing inside the runtime
+ # whilst also hacking on it XD
+ # terminate_after: int = 99999,
+
) -> None:
'''
Un-gracefully terminate an OS level `trio.Process` after timeout.
@@ -365,7 +369,7 @@ async def new_proc(
errors: dict[tuple[str, str], Exception],
# passed through to actor main
- bind_addr: tuple[str, int],
+ bind_addrs: list[tuple[str, int]],
parent_addr: tuple[str, int],
_runtime_vars: dict[str, Any], # serialized and sent to _child
@@ -387,7 +391,7 @@ async def new_proc(
actor_nursery,
subactor,
errors,
- bind_addr,
+ bind_addrs,
parent_addr,
_runtime_vars, # run time vars
infect_asyncio=infect_asyncio,
@@ -402,7 +406,7 @@ async def trio_proc(
errors: dict[tuple[str, str], Exception],
# passed through to actor main
- bind_addr: tuple[str, int],
+ bind_addrs: list[tuple[str, int]],
parent_addr: tuple[str, int],
_runtime_vars: dict[str, Any], # serialized and sent to _child
*,
@@ -491,12 +495,11 @@ async def trio_proc(
# send additional init params
await chan.send({
- "_parent_main_data": subactor._parent_main_data,
- "enable_modules": subactor.enable_modules,
- "_arb_addr": subactor._arb_addr,
- "bind_host": bind_addr[0],
- "bind_port": bind_addr[1],
- "_runtime_vars": _runtime_vars,
+ '_parent_main_data': subactor._parent_main_data,
+ 'enable_modules': subactor.enable_modules,
+ 'reg_addrs': subactor.reg_addrs,
+ 'bind_addrs': bind_addrs,
+ '_runtime_vars': _runtime_vars,
})
# track subactor in current nursery
@@ -602,7 +605,7 @@ async def mp_proc(
subactor: Actor,
errors: dict[tuple[str, str], Exception],
# passed through to actor main
- bind_addr: tuple[str, int],
+ bind_addrs: list[tuple[str, int]],
parent_addr: tuple[str, int],
_runtime_vars: dict[str, Any], # serialized and sent to _child
*,
@@ -660,7 +663,7 @@ async def mp_proc(
target=_mp_main,
args=(
subactor,
- bind_addr,
+ bind_addrs,
fs_info,
_spawn_method,
parent_addr,
diff --git a/tractor/_state.py b/tractor/_state.py
index f3917436..9e4e9473 100644
--- a/tractor/_state.py
+++ b/tractor/_state.py
@@ -33,7 +33,8 @@
_runtime_vars: dict[str, Any] = {
'_debug_mode': False,
'_is_root': False,
- '_root_mailbox': (None, None)
+ '_root_mailbox': (None, None),
+ '_registry_addrs': [],
}
diff --git a/tractor/_supervise.py b/tractor/_supervise.py
index c8c2336d..733dd53c 100644
--- a/tractor/_supervise.py
+++ b/tractor/_supervise.py
@@ -22,10 +22,7 @@
from functools import partial
import inspect
from pprint import pformat
-from typing import (
- Optional,
- TYPE_CHECKING,
-)
+from typing import TYPE_CHECKING
import typing
import warnings
@@ -97,7 +94,7 @@ def __init__(
tuple[
Actor,
trio.Process | mp.Process,
- Optional[Portal],
+ Portal | None,
]
] = {}
# portals spawned with ``run_in_actor()`` are
@@ -121,12 +118,12 @@ async def start_actor(
self,
name: str,
*,
- bind_addr: tuple[str, int] = _default_bind_addr,
+ bind_addrs: list[tuple[str, int]] = [_default_bind_addr],
rpc_module_paths: list[str] | None = None,
enable_modules: list[str] | None = None,
loglevel: str | None = None, # set log level per subactor
nursery: trio.Nursery | None = None,
- debug_mode: Optional[bool] | None = None,
+ debug_mode: bool | None = None,
infect_asyncio: bool = False,
) -> Portal:
'''
@@ -161,7 +158,9 @@ async def start_actor(
# modules allowed to invoked funcs from
enable_modules=enable_modules,
loglevel=loglevel,
- arbiter_addr=current_actor()._arb_addr,
+
+ # verbatim relay this actor's registrar addresses
+ registry_addrs=current_actor().reg_addrs,
)
parent_addr = self._actor.accept_addr
assert parent_addr
@@ -178,7 +177,7 @@ async def start_actor(
self,
subactor,
self.errors,
- bind_addr,
+ bind_addrs,
parent_addr,
_rtv, # run time vars
infect_asyncio=infect_asyncio,
@@ -191,8 +190,8 @@ async def run_in_actor(
fn: typing.Callable,
*,
- name: Optional[str] = None,
- bind_addr: tuple[str, int] = _default_bind_addr,
+ name: str | None = None,
+ bind_addrs: tuple[str, int] = [_default_bind_addr],
rpc_module_paths: list[str] | None = None,
enable_modules: list[str] | None = None,
loglevel: str | None = None, # set log level per subactor
@@ -221,7 +220,7 @@ async def run_in_actor(
enable_modules=[mod_path] + (
enable_modules or rpc_module_paths or []
),
- bind_addr=bind_addr,
+ bind_addrs=bind_addrs,
loglevel=loglevel,
# use the run_in_actor nursery
nursery=self._ria_nursery,