Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ properties:
Whether or not to run consistency checks during execution.
This is typically only used for debugging.

zeroconf:
type: boolean
description: |
Whether or not to advertise the scheduler via zeroconf.

dashboard:
type: object
description: |
Expand Down
1 change: 1 addition & 0 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ distributed:
rechunk-split: 1us
shuffle-split: 1us
validate: False # Check scheduler state at every step for debugging
zeroconf: true
dashboard:
status:
task-stream-length: 1000
Expand Down
38 changes: 37 additions & 1 deletion distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import operator
import os
import random
import socket
import sys
import warnings
import weakref
Expand Down Expand Up @@ -49,7 +50,11 @@
resolve_address,
unparse_host_port,
)
from .comm.addressing import addresses_from_user_args
from .comm.addressing import (
addresses_from_user_args,
get_address_host_port,
parse_address,
)
from .core import CommClosedError, Status, clean_exception, rpc, send_recv
from .diagnostics.plugin import SchedulerPlugin
from .event import EventExtension
Expand Down Expand Up @@ -88,6 +93,12 @@
except ImportError:
compiled = False

try:
import zeroconf
from zeroconf.asyncio import AsyncServiceInfo, AsyncZeroconf
except ImportError:
zeroconf = False

if compiled:
from cython import (
Py_hash_t,
Expand Down Expand Up @@ -158,6 +169,7 @@ def nogil(func):


LOG_PDB = dask.config.get("distributed.admin.pdb-on-err")
ZEROCONF = dask.config.get("distributed.scheduler.zeroconf")
DEFAULT_DATA_SIZE = declare(
Py_ssize_t, parse_bytes(dask.config.get("distributed.scheduler.default-data-size"))
)
Expand Down Expand Up @@ -3351,6 +3363,9 @@ def __init__(
self._lock = asyncio.Lock()
self.bandwidth_workers = defaultdict(float)
self.bandwidth_types = defaultdict(float)
if zeroconf and ZEROCONF:
self._zeroconf = AsyncZeroconf(ip_version=zeroconf.IPVersion.V4Only)
self._zeroconf_services = []

if not preload:
preload = dask.config.get("distributed.scheduler.preload")
Expand Down Expand Up @@ -3722,6 +3737,22 @@ async def start(self):

for listener in self.listeners:
logger.info(" Scheduler at: %25s", listener.contact_address)
if zeroconf and ZEROCONF:
# Advertise service via mdns service discovery
host, port = get_address_host_port(listener.contact_address)
protocol, _ = parse_address(listener.contact_address)
short_id = self.id.split("-")[1]
info = AsyncServiceInfo(
"_dask._tcp.local.",
f"_sched-{short_id}._dask._tcp.local.",
addresses=[socket.inet_aton(host)],
port=port,
properties={"protocol": protocol},
server=f"sched-{short_id}.dask.local.",
)
self._zeroconf_services.append(info)
await self._zeroconf.async_register_service(info)
logger.info(" Advertising as: %25s", info.server)
for k, v in self.services.items():
logger.info("%11s at: %25s", k, "%s:%d" % (listen_ip, v.port))

Expand Down Expand Up @@ -3786,6 +3817,11 @@ async def close(self, comm=None, fast=False, close_workers=False):

self.stop_services()

if zeroconf and ZEROCONF:
for info in self._zeroconf_services:
await self._zeroconf.async_unregister_service(info)
await self._zeroconf.async_close()

for ext in parent._extensions.values():
with suppress(AttributeError):
ext.teardown()
Expand Down
14 changes: 14 additions & 0 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2800,3 +2800,17 @@ async def test_transition_counter(c, s, a, b):
assert s.transition_counter == 0
await c.submit(inc, 1)
assert s.transition_counter > 1


@gen_cluster(
config={"distributed.scheduler.zeroconf": True},
)
async def test_zeroconf(s, *_):
zeroconf = pytest.importorskip("zeroconf")
assert len(s._zeroconf_services) == 1
async with zeroconf.asyncio.AsyncZeroconf(interfaces=["127.0.0.1"]) as aiozc:
service = s._zeroconf_services[0]
service = await aiozc.async_get_service_info("_dask._tcp.local.", service.name)
[address] = service.parsed_addresses()
assert str(address) in s.address
assert str(service.port) in s.address