diff --git a/ci/slurm.sh b/ci/slurm.sh index 217ad3e1..3265a736 100644 --- a/ci/slurm.sh +++ b/ci/slurm.sh @@ -28,7 +28,9 @@ function show_network_interfaces { } function jobqueue_install { - docker exec slurmctld conda run -n dask-jobqueue /bin/bash -c "cd /dask-jobqueue; pip install -e ." + for c in slurmctld c1 c2; do + docker exec $c conda run -n dask-jobqueue /bin/bash -c "cd /dask-jobqueue; pip install -e ." + done } function jobqueue_script { diff --git a/ci/slurm/docker-compose.yml b/ci/slurm/docker-compose.yml index cdb9475d..ac06352a 100644 --- a/ci/slurm/docker-compose.yml +++ b/ci/slurm/docker-compose.yml @@ -69,6 +69,7 @@ services: - slurm_jobdir:/data - var_log_slurm:/var/log/slurm - shared_space:/shared_space + - ../..:/dask-jobqueue expose: - "6818" depends_on: @@ -91,6 +92,7 @@ services: - slurm_jobdir:/data - var_log_slurm:/var/log/slurm - shared_space:/shared_space + - ../..:/dask-jobqueue expose: - "6818" depends_on: diff --git a/dask_jobqueue/runner.py b/dask_jobqueue/runner.py new file mode 100644 index 00000000..60da426e --- /dev/null +++ b/dask_jobqueue/runner.py @@ -0,0 +1,239 @@ +import asyncio +import sys +import os +import signal +from contextlib import suppress +from enum import Enum +from typing import Dict, Optional +import warnings +from tornado.ioloop import IOLoop + +from distributed.core import CommClosedError, Status, rpc +from distributed.scheduler import Scheduler +from distributed.utils import LoopRunner, import_term, SyncMethodMixin +from distributed.worker import Worker + + +# Close gracefully when receiving a SIGINT +signal.signal(signal.SIGINT, lambda *_: sys.exit()) + + +class Role(Enum): + """ + This Enum contains the various roles processes can be. + """ + + worker = "worker" + scheduler = "scheduler" + client = "client" + + +class BaseRunner(SyncMethodMixin): + """Superclass for runner objects. + + This class contains common functionality for Dask cluster runner classes. + + To implement this class, you must provide + + 1. A ``get_role`` method which returns a role from the ``Role`` enum. + 2. Optionally, a ``set_scheduler_address`` method for the scheduler process to communicate its address. + 3. A ``get_scheduler_address`` method for all other processed to recieve the scheduler address. + 4. Optionally, a ``get_worker_name`` to provide a platform specific name to the workers. + 5. Optionally, a ``before_scheduler_start`` to perform any actions before the scheduler is created. + 6. Optionally, a ``before_worker_start`` to perform any actions before the worker is created. + 7. Optionally, a ``before_client_start`` to perform any actions before the client code continues. + 8. Optionally, a ``on_scheduler_start`` to perform anything on the scheduler once it has started. + 9. Optionally, a ``on_worker_start`` to perform anything on the worker once it has started. + + For that, you should get the following: + + A context manager and object which can be used within a script that is run in parallel to decide which processes + run the scheduler, workers and client code. + + """ + + __loop: Optional[IOLoop] = None + + def __init__( + self, + scheduler: bool = True, + scheduler_options: Dict = None, + worker_class: str = None, + worker_options: Dict = None, + client: bool = True, + asynchronous: bool = False, + loop: asyncio.BaseEventLoop = None, + ): + self.status = Status.created + self.scheduler = scheduler + self.scheduler_address = None + self.scheduler_comm = None + self.client = client + if self.client and not self.scheduler: + raise RuntimeError("Cannot run client code without a scheduler.") + self.scheduler_options = ( + scheduler_options if scheduler_options is not None else {} + ) + self.worker_class = ( + Worker if worker_class is None else import_term(worker_class) + ) + self.worker_options = worker_options if worker_options is not None else {} + self.role = None + self.__asynchronous = asynchronous + self._loop_runner = LoopRunner(loop=loop, asynchronous=asynchronous) + + if not self.__asynchronous: + self._loop_runner.start() + self.sync(self._start) + + async def get_role(self) -> str: + raise NotImplementedError() + + async def set_scheduler_address(self, scheduler: Scheduler) -> None: + raise None + + async def get_scheduler_address(self) -> str: + raise NotImplementedError() + + async def get_worker_name(self) -> str: + return None + + async def before_scheduler_start(self) -> None: + return None + + async def before_worker_start(self) -> None: + return None + + async def before_client_start(self) -> None: + return None + + async def on_scheduler_start(self, scheduler: Scheduler) -> None: + return None + + async def on_worker_start(self, worker: Worker) -> None: + return None + + @property + def loop(self) -> Optional[IOLoop]: + loop = self.__loop + if loop is None: + # If the loop is not running when this is called, the LoopRunner.loop + # property will raise a DeprecationWarning + # However subsequent calls might occur - eg atexit, where a stopped + # loop is still acceptable - so we cache access to the loop. + self.__loop = loop = self._loop_runner.loop + return loop + + @loop.setter + def loop(self, value: IOLoop) -> None: + warnings.warn( + "setting the loop property is deprecated", DeprecationWarning, stacklevel=2 + ) + if value is None: + raise ValueError("expected an IOLoop, got None") + self.__loop = value + + def __await__(self): + async def _await(): + if self.status != Status.running: + await self._start() + return self + + return _await().__await__() + + async def __aenter__(self): + await self + return self + + async def __aexit__(self, *args): + await self._close() + + def __enter__(self): + return self.sync(self.__aenter__) + + def __exit__(self, typ, value, traceback): + return self.sync(self.__aexit__) + + def __del__(self): + with suppress(AttributeError, RuntimeError): # during closing + self.loop.add_callback(self.close) + + async def _start(self) -> None: + self.role = await self.get_role() + if self.role == Role.scheduler: + await self.start_scheduler() + os.kill( + os.getpid(), signal.SIGINT + ) # Shutdown with a signal to give the event loop time to close + elif self.role == Role.worker: + await self.start_worker() + os.kill( + os.getpid(), signal.SIGINT + ) # Shutdown with a signal to give the event loop time to close + elif self.role == Role.client: + self.scheduler_address = await self.get_scheduler_address() + if self.scheduler_address: + self.scheduler_comm = rpc(self.scheduler_address) + await self.before_client_start() + self.status = Status.running + + async def start_scheduler(self) -> None: + await self.before_scheduler_start() + async with Scheduler(**self.scheduler_options) as scheduler: + await self.set_scheduler_address(scheduler) + await self.on_scheduler_start(scheduler) + await scheduler.finished() + + async def start_worker(self) -> None: + if ( + "scheduler_file" not in self.worker_options + and "scheduler_ip" not in self.worker_options + ): + self.worker_options["scheduler_ip"] = await self.get_scheduler_address() + worker_name = await self.get_worker_name() + await self.before_worker_start() + async with self.worker_class(name=worker_name, **self.worker_options) as worker: + await self.on_worker_start(worker) + await worker.finished() + + async def _close(self) -> None: + if self.status == Status.running: + if self.scheduler_comm: + with suppress(CommClosedError): + await self.scheduler_comm.terminate() + self.status = Status.closed + + def close(self) -> None: + return self.sync(self._close) + + +class AsyncCommWorld: + def __init__(self): + self.roles = {"scheduler": None, "client": None} + self.role_lock = asyncio.Lock() + self.scheduler_address = None + + +class AsyncRunner(BaseRunner): + def __init__(self, commworld: AsyncCommWorld, *args, **kwargs): + self.commworld = commworld + super().__init__(*args, **kwargs) + + async def get_role(self) -> str: + async with self.commworld.role_lock: + if self.commworld.roles["scheduler"] is None and self.scheduler: + self.commworld.roles["scheduler"] = self + return Role.scheduler + elif self.commworld.roles["client"] is None and self.client: + self.commworld.roles["client"] = self + return Role.client + else: + return Role.worker + + async def set_scheduler_address(self, scheduler: Scheduler) -> None: + self.commworld.scheduler_address = scheduler.address + + async def get_scheduler_address(self) -> str: + while self.commworld.scheduler_address is None: + await asyncio.sleep(0.1) + return self.commworld.scheduler_address diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index 8e6c1a07..d31e5562 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -1,10 +1,16 @@ import logging import math import warnings +import asyncio +import json +import os +from pathlib import Path import dask +from dask.distributed import Scheduler from .core import Job, JobQueueCluster, job_parameters, cluster_parameters +from .runner import Role, BaseRunner logger = logging.getLogger(__name__) @@ -26,7 +32,7 @@ def __init__( job_cpu=None, job_mem=None, config_name=None, - **base_class_kwargs + **base_class_kwargs, ): super().__init__( scheduler=scheduler, name=name, config_name=config_name, **base_class_kwargs @@ -177,3 +183,75 @@ class SLURMCluster(JobQueueCluster): job=job_parameters, cluster=cluster_parameters ) job_cls = SLURMJob + + +class WorldTooSmallException(RuntimeError): + """Not enough Slurm tasks to start all required processes.""" + + +class SLURMRunner(BaseRunner): + def __init__(self, *args, scheduler_file="scheduler-{job_id}.json", **kwargs): + try: + self.proc_id = int(os.environ["SLURM_PROCID"]) + self.world_size = self.n_workers = int(os.environ["SLURM_NTASKS"]) + self.job_id = int(os.environ["SLURM_JOB_ID"]) + except KeyError as e: + raise RuntimeError( + "SLURM_PROCID, SLURM_NTASKS, and SLURM_JOB_ID must be present in the environment." + ) from e + if not scheduler_file: + scheduler_file = kwargs.get("scheduler_options", {}).get("scheduler_file") + + if not scheduler_file: + raise RuntimeError( + "scheduler_file must be specified in either the " + "scheduler_options or as keyword argument to SlurmRunner." + ) + + # Encourage filename uniqueness by inserting the job ID + scheduler_file = scheduler_file.format(job_id=self.job_id) + scheduler_file = Path(scheduler_file) + + if isinstance(kwargs.get("scheduler_options"), dict): + kwargs["scheduler_options"]["scheduler_file"] = scheduler_file + else: + kwargs["scheduler_options"] = {"scheduler_file": scheduler_file} + if isinstance(kwargs.get("worker_options"), dict): + kwargs["worker_options"]["scheduler_file"] = scheduler_file + else: + kwargs["worker_options"] = {"scheduler_file": scheduler_file} + + self.scheduler_file = scheduler_file + + super().__init__(*args, **kwargs) + + async def get_role(self) -> str: + if self.scheduler and self.client and self.world_size < 3: + raise WorldTooSmallException( + f"Not enough Slurm tasks to start cluster, found {self.world_size}, " + "needs at least 3, one each for the scheduler, client and a worker." + ) + elif self.scheduler and self.world_size < 2: + raise WorldTooSmallException( + f"Not enough Slurm tasks to start cluster, found {self.world_size}, " + "needs at least 2, one each for the scheduler and a worker." + ) + self.n_workers -= int(self.scheduler) + int(self.client) + if self.proc_id == 0 and self.scheduler: + return Role.scheduler + elif self.proc_id == 1 and self.client: + return Role.client + else: + return Role.worker + + async def set_scheduler_address(self, scheduler: Scheduler) -> None: + return + + async def get_scheduler_address(self) -> str: + while not self.scheduler_file or not self.scheduler_file.exists(): + await asyncio.sleep(0.2) + cfg = json.loads(self.scheduler_file.read_text()) + return cfg["address"] + + async def get_worker_name(self) -> str: + return self.proc_id diff --git a/dask_jobqueue/tests/slurm_runner/basic.py b/dask_jobqueue/tests/slurm_runner/basic.py new file mode 100644 index 00000000..dc01b468 --- /dev/null +++ b/dask_jobqueue/tests/slurm_runner/basic.py @@ -0,0 +1,8 @@ +from dask.distributed import Client +from dask_jobqueue.slurm import SLURMRunner + +with SLURMRunner(scheduler_file="/shared_space/{job_id}.json") as runner: + with Client(runner) as client: + assert client.submit(lambda x: x + 1, 10).result() == 11 + assert client.submit(lambda x: x + 1, 20, workers=2).result() == 21 + print("Test passed") diff --git a/dask_jobqueue/tests/test_runner.py b/dask_jobqueue/tests/test_runner.py new file mode 100644 index 00000000..a486b654 --- /dev/null +++ b/dask_jobqueue/tests/test_runner.py @@ -0,0 +1,28 @@ +import asyncio +from contextlib import suppress +from time import time + +import pytest + +from dask.distributed import Client + +from dask_jobqueue.runner import AsyncCommWorld, AsyncRunner + + +@pytest.mark.asyncio +async def test_runner(): + commworld = AsyncCommWorld() + + async def run_code(commworld): + with suppress(SystemExit): + async with AsyncRunner(commworld, asynchronous=True) as runner: + async with Client(runner, asynchronous=True) as c: + start = time() + while len(c.scheduler_info()["workers"]) != 2: + assert time() < start + 10 + await asyncio.sleep(0.2) + + assert await c.submit(lambda x: x + 1, 10).result() == 11 + assert await c.submit(lambda x: x + 1, 20).result() == 21 + + await asyncio.gather(*[run_code(commworld) for _ in range(4)]) diff --git a/dask_jobqueue/tests/test_slurm.py b/dask_jobqueue/tests/test_slurm.py index 129027e1..5b4b166b 100644 --- a/dask_jobqueue/tests/test_slurm.py +++ b/dask_jobqueue/tests/test_slurm.py @@ -1,3 +1,5 @@ +import os +import subprocess import sys from time import sleep, time @@ -12,6 +14,19 @@ from . import QUEUE_WAIT +def slurm_cores(): + "Use sinfo to get the number of available CPU cores" + try: + return int( + subprocess.check_output(["sinfo", "-o", "%C"]) + .split()[1] + .decode() + .split("/")[1] + ) + except FileNotFoundError: + return 0 + + def test_header(): with SLURMCluster( walltime="00:02:00", processes=4, cores=8, memory="28GB" @@ -314,3 +329,17 @@ def test_deprecation_project(): ) job_script = job.job_script() assert "project" in job_script + + +@pytest.mark.env("slurm") +@pytest.mark.skipif(slurm_cores() < 4, reason="Need at least 4 CPUs to run this test") +def test_slurm_runner(): + script_file = os.path.join( + os.path.dirname(os.path.realpath(__file__)), "slurm_runner", "basic.py" + ) + + output = subprocess.check_output( + ["srun", "--mpi=none", "-vv", "-n", "4", sys.executable, script_file] + ) + output = output.decode() + assert "Test passed" in output diff --git a/docs/requirements-docs.txt b/docs/requirements-docs.txt index b4923de1..e79e74ae 100644 --- a/docs/requirements-docs.txt +++ b/docs/requirements-docs.txt @@ -1,7 +1,8 @@ distributed numpydoc ipython -sphinx +sphinx<5 +sphinx-reredirects dask-sphinx-theme>=3.0.0 # FIXME: This workaround is required until we have sphinx>=5, as enabled by # dask-sphinx-theme no longer pinning sphinx-book-theme==0.2.0. This is diff --git a/docs/source/advanced-tips-and-tricks.rst b/docs/source/clusters-advanced-tips-and-tricks.rst similarity index 99% rename from docs/source/advanced-tips-and-tricks.rst rename to docs/source/clusters-advanced-tips-and-tricks.rst index 8100d7f2..257ed098 100644 --- a/docs/source/advanced-tips-and-tricks.rst +++ b/docs/source/clusters-advanced-tips-and-tricks.rst @@ -11,7 +11,7 @@ This page is an attempt to document tips and tricks that are likely to be useful on some clusters (strictly more than one ideally although hard to be sure ...). Skipping unrecognised line in submission script with ``job_directives_skip`` --------------------------------------------------------------------- +---------------------------------------------------------------------------- *Note: the parameter* ``job_directives_skip`` *was named* ``header_skip`` *until version 0.8.0.* ``header_skip`` *can still be used, but is considered deprecated and will be removed in a future version.* diff --git a/docs/source/api.rst b/docs/source/clusters-api.rst similarity index 80% rename from docs/source/api.rst rename to docs/source/clusters-api.rst index 2d963054..1f6f427c 100644 --- a/docs/source/api.rst +++ b/docs/source/clusters-api.rst @@ -1,10 +1,10 @@ -.. _api: +.. _clusters_api: .. currentmodule:: dask_jobqueue -API -=== +Clusters API +============ .. autosummary:: :toctree: generated/ diff --git a/docs/source/configurations.rst b/docs/source/clusters-configuration-examples.rst similarity index 100% rename from docs/source/configurations.rst rename to docs/source/clusters-configuration-examples.rst diff --git a/docs/source/configuration-setup.rst b/docs/source/clusters-configuration-setup.rst similarity index 100% rename from docs/source/configuration-setup.rst rename to docs/source/clusters-configuration-setup.rst diff --git a/docs/source/configuration.rst b/docs/source/clusters-configuration.rst similarity index 97% rename from docs/source/configuration.rst rename to docs/source/clusters-configuration.rst index 8363e224..644ee6c4 100644 --- a/docs/source/configuration.rst +++ b/docs/source/clusters-configuration.rst @@ -71,7 +71,7 @@ recommend using a configuration file like the following: account: my-account walltime: 00:30:00 -See :doc:`Configuration Examples ` for real-world examples. +See :doc:`Configuration Examples ` for real-world examples. If you place this in your ``~/.config/dask/`` directory then Dask-jobqueue will use these values by default. You can then construct a cluster object without diff --git a/docs/source/examples.rst b/docs/source/clusters-example-deployments.rst similarity index 100% rename from docs/source/examples.rst rename to docs/source/clusters-example-deployments.rst diff --git a/docs/source/howitworks.rst b/docs/source/clusters-howitworks.rst similarity index 100% rename from docs/source/howitworks.rst rename to docs/source/clusters-howitworks.rst diff --git a/docs/source/interactive.rst b/docs/source/clusters-interactive.rst similarity index 100% rename from docs/source/interactive.rst rename to docs/source/clusters-interactive.rst diff --git a/docs/source/clusters-overview.rst b/docs/source/clusters-overview.rst new file mode 100644 index 00000000..0c62aaa9 --- /dev/null +++ b/docs/source/clusters-overview.rst @@ -0,0 +1,43 @@ +Overview +======== + +The Dask-jobqueue project provides a convenient interface that is accessible from interactive systems like Jupyter notebooks, or batch jobs. + + +.. _example: + +Example +------- + +.. code-block:: python + + from dask_jobqueue import PBSCluster + cluster = PBSCluster() + cluster.scale(jobs=10) # Deploy ten single-node jobs + + from dask.distributed import Client + client = Client(cluster) # Connect this local process to remote workers + + # wait for jobs to arrive, depending on the queue, this may take some time + + import dask.array as da + x = ... # Dask commands now use these distributed resources + +.. raw:: html + + + +Adaptive Scaling +---------------- + +Dask jobqueue can also adapt the cluster size dynamically based on current +load. This helps to scale up the cluster when necessary but scale it down and +save resources when not actively computing. + +.. code-block:: python + + cluster.adapt(minimum_jobs=10, maximum_jobs=100) # auto-scale between 10 and 100 jobs + cluster.adapt(maximum_memory="10 TB") # or use core/memory limits diff --git a/docs/source/conf.py b/docs/source/conf.py index 27f8079c..4a08afb8 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -48,8 +48,21 @@ "sphinx.ext.autosummary", "sphinx.ext.extlinks", "numpydoc", + "sphinx_reredirects", ] +redirects = { + "interactive": "clusters-interactive.html", + "advanced-tips-and-tricks": "clusters-advanced-tips-and-tricks.html", + "configuration": "clusters-configuration.html", + "howitworks": "clusters-howitworks.html", + "api": "clusters-api.html", + "configuration-setup": "clusters-configuration-setup.html", + "interactive": "clusters-interactive.html", + "configurations": "clusters-configuration-examples.html", + "examples": "clusters-example-deployments.html", +} + autosummary_generate = True numpydoc_class_members_toctree = True @@ -72,7 +85,7 @@ # # This is also used if you do content translation via gettext catalogs. # Usually you set "language" from the command line for these cases. -language = None +language = "en" # List of patterns, relative to source directory, that match files and # directories to ignore when looking for source files. diff --git a/docs/source/develop.rst b/docs/source/develop.rst index c3dd5c78..5606264d 100644 --- a/docs/source/develop.rst +++ b/docs/source/develop.rst @@ -62,7 +62,7 @@ Testing without CI scripts You can also manually launch tests with dockerized jobs schedulers (without CI commands), for a better understanding of what is going on. -This is basically a simplified version of what is in the ci/*.sh files. +This is basically a simplified version of what is in the ``ci/*.sh`` files. For example with Slurm:: diff --git a/docs/source/index.rst b/docs/source/index.rst index 01c960b7..0b2b287b 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -4,49 +4,87 @@ Dask-Jobqueue *Easily deploy Dask on job queuing systems like PBS, Slurm, MOAB, SGE, LSF, and HTCondor.* -The Dask-jobqueue project makes it easy to deploy Dask on common job queuing +The ``dask-jobqueue`` project makes it easy to deploy Dask on common job queuing systems typically found in high performance supercomputers, academic research -institutions, and other clusters. It provides a convenient interface that is -accessible from interactive systems like Jupyter notebooks, or batch jobs. +institutions, and other clusters. +There are two common deployment patterns for Dask on HPC, **Dynamic Clusters** and **Batch Runners**, and ``dask-jobqueue`` has support for both. -.. _example: +Dynamic Clusters +---------------- -Example -------- +A **Dynamic Cluster** is a Dask cluster where new workers can be added dynamically while the cluster is running. + +In an HPC environment this generally means that the Dask Scheduler is run in the same location as the client code, +usually on a single compute node. Then workers for the Dask cluster are submitted as additional jobs to the job +queue scheduler. + +This pattern works well on clusters where it is favourable to submit many small jobs. + + +.. code-block:: bash + + srun -n 1 dynamic_workload.py .. code-block:: python - from dask_jobqueue import PBSCluster - cluster = PBSCluster() - cluster.scale(jobs=10) # Deploy ten single-node jobs + # dynamic_workload.py + from dask_jobqueue.slurm import SLURMCluster + cluster = SLURMCluster() + cluster.adapt(minimum=1, maximum=10) # Tells Dask to call `srun -n 1 ...` when it needs new workers from dask.distributed import Client client = Client(cluster) # Connect this local process to remote workers - # wait for jobs to arrive, depending on the queue, this may take some time - import dask.array as da - x = ... # Dask commands now use these distributed resources + x = ... # Dask commands now use these distributed resources + +**Benefits** + +- Clusters can autoscale as a workload progresses. +- Small gaps in the HPC that would be otherwise unused can be backfilled. +- A workload can run slowly with a few workers during busy times and then scale up during quiet times. +- Workloads in intaractive environments can scale up and down as users run code manually. +- You don't need to wait for all nodes to be available before your workload starts, so jobs often start sooner. -.. raw:: html +To learn more see the `Dynamic Cluster documentation `_. - +Batch Runners +------------- -Adaptivity ----------- +A **Batch Runner** is a Dask cluster where the whole workload, including the client code, scheduler and workers +are submitted as a single allocation to the job queue scheduler. All of the processes within the workload coordinate +during startup and then work together to compute the Dask workload. -Dask jobqueue can also adapt the cluster size dynamically based on current -load. This helps to scale up the cluster when necessary but scale it down and -save resources when not actively computing. +This pattern works well where large jobs are prioritised and node locality is important. + +.. code-block:: bash + + srun -n 12 batch_workload.py .. code-block:: python - cluster.adapt(minimum_jobs=10, maximum_jobs=100) # auto-scale between 10 and 100 jobs - cluster.adapt(maximum_memory="10 TB") # or use core/memory limits + # batch_workload.py + from dask_jobqueue.slurm import SLURMRunner + cluster = SLURMRunner() # Boostraps all the processes into a client + scheduler + 10 workers + + # Only the client process will continue past this point + + from dask.distributed import Client + client = Client(cluster) # Connect this client process to remote workers + + import dask.array as da + x = ... # Dask commands now use these distributed resources + +**Benefits** + +- Workers are generally colocated physically on the machine, so communication is faster, expecially with `UCX `_. +- Submitting many small jobs can be frowned upon on some HPCs, submitting a single large job is more typical of other HPC workloads. +- All workers are guaranteed to be available when the job starts which can avoid oversubscribing workers. +- Clusters comprised of one large allocation tends to be more reliable than many small allocations. +- All processes have the same start and wall time. + +To learn more see the `Batch Runner documentation `_. More details ------------ @@ -58,20 +96,29 @@ A good entry point to know more about how to use ``dask-jobqueue`` is :caption: Getting Started install - interactive talks-and-tutorials - howitworks - configuration .. toctree:: :maxdepth: 1 - :caption: Detailed use + :caption: Dynamic Clusters + + clusters-overview + clusters-interactive + clusters-howitworks + clusters-configuration + clusters-configuration-setup + clusters-example-deployments + clusters-configuration-examples + clusters-advanced-tips-and-tricks + clusters-api + +.. toctree:: + :maxdepth: 1 + :caption: Batch Runners - configuration-setup - examples - configurations - advanced-tips-and-tricks - api + runners-overview + runners-implementing-new + runners-api .. toctree:: :maxdepth: 1 diff --git a/docs/source/runners-api.rst b/docs/source/runners-api.rst new file mode 100644 index 00000000..a6e86039 --- /dev/null +++ b/docs/source/runners-api.rst @@ -0,0 +1,12 @@ +.. _runners_api: + +.. currentmodule:: dask_jobqueue + + +Runners API +=========== + +.. autosummary:: + :toctree: generated/ + + slurm.SLURMRunner diff --git a/docs/source/runners-implementing-new.rst b/docs/source/runners-implementing-new.rst new file mode 100644 index 00000000..cbb88dca --- /dev/null +++ b/docs/source/runners-implementing-new.rst @@ -0,0 +1,124 @@ +Writing your own runners +======================== + +This document describes the design of the runners class and how to implement your own Dask runners. + +The core assumption in the design of the runner model is that the same script will be executed many times by a job scheduler. + +.. code-block:: text + + (mpirun|srun|qsub|etc) -n4 myscript.py + ├── [0] myscript.py + ├── [1] myscript.py + ├── [2] myscript.py + └── [3] myscript.py + +Within the script the runner class is created early on in the execution. + +.. code-block:: python + + from dask_jobqueue import SomeRunner + from dask.distributed import Client + + with SomeRunner(**kwargs) as runner: + with Client(runner) as client: + client.wait_for_workers(2) + # Do some Dask work + +This will result in multiple processes runnning on an HPC that are all instantiating the runner class. + +The processes need to coordinate to decide which process should run the Dask Scheduler, which should be Dask Workers +and which should continue running the rest of the client code within the script. This coordination happens during the +``__init__()`` of the runner class. + +The Scheduler and Worker processes exit after they complete to avoid running the client code multiple times. +This means that only one of the processes will continue past the ``__init__()`` of the runner class, the rest will +exit at that point after the work is done. + +Base class +---------- + +In ``dask_jobqueue.runners`` there is a ``BaseRunner`` class that can be used for implementing other runners. + +The minimum required to implement a new runner is the following methods. + +.. code-block:: python + + from dask_jobqueue.runner import BaseRunner + + class MyRunner(BaseRunner): + + async def get_role(self) -> str: + """Figure out whether I am a scheduler, worker or client. + + A common way to do this is by using a process ID. Many job queues give each process + a monotonic index that starts from zero. So we can assume proc 0 is the scheduler, proc 1 + is the client and any other procs are workers. + """ + ... + + async def get_scheduler_address(self) -> str: + """If I am not the scheduler discover the scheduler address. + + A common way to do this is to read a scheduler file from a shared filesystem. + + Alternatively if the scheduler process can broadcast it's address via something like MPI + we can define ``BaseRunner.set_scheduler_address()`` which will be called on the scheduler + and then recieve the broadcast in this method. + """ + ... + +The ``BaseRunner`` class handles starting up Dask once these methods have been implemented. +It also provides many stubbed out hooks to allow you to write code that runs before/after each component is created. +E.g ``BaseRunner.before_scheduler_start()``, ``BaseRunner.before_worker_start()`` and ``BaseRunner.before_client_start()``. + +The runner must know the address of the scheduler so that it can coordinate the clean shutdown of all processes when we +reach the end of the code (either via ``__exit__()`` or a finalizer). This communication happens independently of +any clients that may be created. + +Slurm implementation example +---------------------------- + +As a concrete example you can look at the Slurm implementation. + +In the ``get_role()`` method we use the ``SLURM_PROCID`` environment variable to infer the role. + +We also add a default scheduler option to set the ``scheduler_file="scheduler-{job_id}.json"`` and I look up the +Job ID from the ``SLURM_JOB_ID`` environment variable to ensource uniqueness. This effectively allows us to broadcast +the scheduler address via the shared filesystem. + +Then in the ``get_scheduler_address()`` method we wait for the scheduler file to exist and then open and read the +address from the scheduler file in the same way the ``dask.distributed.Client`` does. + +Here's a cut down example for demonstration purposes. + + +.. code-block:: python + + from dask_jobqueue.runner import BaseRunner + + class SLURMRunner(BaseRunner): + def __init__(self, *args, scheduler_file="scheduler.json", **kwargs): + # Get the current process ID from the environment + self.proc_id = int(os.environ["SLURM_PROCID"]) + + # Tell the scheduler and workers to use a scheduler file on the shared filesystem + self.scheduler_file = scheduler_file + options = {"scheduler_file": self.scheduler_file} + super().__init__(*args, worker_options=options, scheduler_options=options) + + async def get_role(self) -> str: + # Choose the role for this process based on the process ID + if self.proc_id == 0 and self.scheduler: + return Role.scheduler + elif self.proc_id == 1 and self.client: + return Role.client + else: + return Role.worker + + async def get_scheduler_address(self) -> str: + # Wait for the scheduler file to be created and read the address from it + while not self.scheduler_file or not self.scheduler_file.exists(): + await asyncio.sleep(0.2) + cfg = json.loads(self.scheduler_file.read_text()) + return cfg["address"] diff --git a/docs/source/runners-overview.rst b/docs/source/runners-overview.rst new file mode 100644 index 00000000..a542399a --- /dev/null +++ b/docs/source/runners-overview.rst @@ -0,0 +1,32 @@ +Overview +======== + +The batch runner classes are designed to make it simple to write Python scripts that will leverage multi-node jobs in an HPC. + +For example if we write a Python script for a Slurm based system and call it with ``srun -n 6 python myscript.py`` the script will be invoked by Slurm +6 times in parallel on 6 different nodes/cores on the HPC. The Dask Runner class then uses the Slurm process ID environment +variable to decide what role reach process should play and uses the shared filesystem to bootstrap communications with a scheduler file. + +.. code-block:: python + + # myscript.py + from dask.distributed import Client + from dask_jobqueue.slurm import SlurmRunner + + # When entering the SlurmRunner context manager processes will decide if they should be + # the client, schdeduler or a worker. + # Only process ID 1 executes the contents of the context manager. + # All other processes start the Dask components and then block here forever. + with SlurmRunner(scheduler_file="/path/to/shared/filesystem/scheduler-{job_id}.json") as runner: + + # The runner object contains the scheduler address info and can be used to construct a client. + with Client(runner) as client: + + # Wait for all the workers to be ready before continuing. + client.wait_for_workers(runner.n_workers) + + # Then we can submit some work to the Dask scheduler. + assert client.submit(lambda x: x + 1, 10).result() == 11 + assert client.submit(lambda x: x + 1, 20, workers=2).result() == 21 + + # When process ID 1 exits the SlurmRunner context manager it sends a graceful shutdown to the Dask processes.