Skip to content

Commit

Permalink
Merge pull request #20 from d-krupke/development
Browse files Browse the repository at this point in the history
v0.10.0
  • Loading branch information
d-krupke authored Feb 25, 2024
2 parents 966fa7d + 76786df commit 987d4c8
Show file tree
Hide file tree
Showing 19 changed files with 504 additions and 178 deletions.
25 changes: 14 additions & 11 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,15 @@ A simple script could look like this:
import slurminade
slurminade.update_default_configuration(partition="alg") # global options for slurm
slurminade.update_default_configuration(
partition="alg", exclusive=True
) # global options for slurm
# If no slurm environment is found, the functions are called directly to make scripts
# compatible with any environment.
# You can enforce slurm with `slurminade.set_dispatcher(slurminade.SlurmDispatcher())`
@slurminade.node_setup
def setup():
print("I will run automatically on every slurm node at the beginning!")
Expand All @@ -83,26 +86,25 @@ A simple script could look like this:
if __name__ == "__main__":
jid = prepare.distribute()
with slurminade.Batch(max_size=20) as batch: # automatically bundles up to 20 tasks
prepare.distribute()
slurminade.join() # make sure that no job runs before prepare has finished
with slurminade.JobBundling(max_size=20): # automatically bundles up to 20 tasks
# run 100x f after `prepare` has finished
for i in range(100):
f.wait_for(jid).distribute(i) # no job id while in batch!
f.distribute(i)
# clean up after the previous jobs have finished
jids = batch.flush() # flush returns a list with all job ids.
clean_up.wait_for(jids).distribute()
slurminade.join() # make sure that the clean up jobs runs after all f-jobs have finished
clean_up.distribute()
If slurm is not available, ``distribute`` results in a local function
call. Analogous for ``srun`` and ``sbatch`` (giving some extra value on
top of just forwarding to *simple_slurm*).

.. warning::
Always use ``Batch`` when distributing many small tasks to few nodes. Slurm
Always use ``JobBundling`` when distributing many small tasks to few nodes. Slurm
jobs have a certain overhead and you do not want to spam your
infrastructure with too many jobs. However, function calls
joined by ``Batch`` are considered as a single job by slurm, thus,
joined by ``JobBundling`` are considered as a single job by slurm, thus,
not shared across nodes.

**What are the limitations of slurminade?** Slurminade reconstructs the
Expand Down Expand Up @@ -342,7 +344,7 @@ Project structure

The project is reasonably easy:

- batch.py: Contains code for bundling tasks, so we don’t spam slurm
- bundling.py: Contains code for bundling tasks, so we don’t spam slurm
with too many.
- conf.py: Contains code for managing the configuration of slurm.
- dispatcher.py: Contains code for actually dispatching tasks to slurm.
Expand All @@ -357,6 +359,7 @@ The project is reasonably easy:
Changes
-------

- 0.10.0: `Batch` is now named `JobBundling`. There is a method `join` for easier synchronization. `exec` allows to executed commands just like `srun` and `sbatch`, but uniform syntax with other slurmified functions. Functions can now also be called with `distribute_and_wait`. If you call `python3 -m slurminade.check --partition YOUR_PARTITION --constraint YOUR_CONSTRAINT` you can check if your slurm configuration is running correctly.
- 0.9.0: Lots of improvements.
- 0.8.1: Bugfix and automatic detection of wrong usage when using ``Batch`` with ``wait_for``.
- 0.8.0: Added extensive logging and improved typing.
Expand Down
4 changes: 2 additions & 2 deletions examples/example_1.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import datetime

import slurminade
from slurminade import Batch
from slurminade import JobBundling

slurminade.update_default_configuration(partition="alg", constraint="alggen02")

Expand All @@ -15,6 +15,6 @@ def f(hello_world):

if __name__ == "__main__":
jid = f.distribute(f"Hello World from slurminade! {datetime.datetime.now()!s}")
with Batch(20) as batch:
with JobBundling(20) as batch:
f.distribute("hello 1!")
f.distribute("hello 2!")
4 changes: 3 additions & 1 deletion examples/example_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ def clean_up():
if __name__ == "__main__":
jid = prepare.distribute()

with slurminade.Batch(max_size=20) as batch: # automatically bundles up to 20 tasks
with slurminade.JobBundling(
max_size=20
) as batch: # automatically bundles up to 20 tasks
# run 10x f after prepare has finished
for i in range(100):
f.wait_for(jid).distribute(i)
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ where = ["src"]

[project]
name = "slurminade"
version = "1.0.0"
version = "0.10.0"
authors = [
{ name = "TU Braunschweig, IBR, Algorithms Group (Dominik Krupke)", email = "[email protected]" },
]
description = "A decorator-based slurm runner."
readme = "README.rst"
requires-python = ">=3.7"
requires-python = ">=3.8"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
Expand Down
5 changes: 4 additions & 1 deletion src/slurminade/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ def clean_up():
allow_recursive_distribution,
disable_warning_on_repeated_flushes,
)
from .batch import Batch
from .bundling import JobBundling, Batch
from .dispatcher import (
srun,
sbatch,
join,
SlurmDispatcher,
set_dispatcher,
get_dispatcher,
Expand All @@ -82,8 +83,10 @@ def clean_up():
"set_dispatch_limit",
"allow_recursive_distribution",
"disable_warning_on_repeated_flushes",
"JobBundling",
"Batch",
"srun",
"join",
"sbatch",
"SlurmDispatcher",
"set_dispatcher",
Expand Down
37 changes: 34 additions & 3 deletions src/slurminade/batch.py → src/slurminade/bundling.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ def get(self, options: SlurmOptions) -> typing.List[FunctionCall]:
def clear(self):
self._tasks.clear()


class Batch(Dispatcher):
class JobBundling(Dispatcher):
"""
The logic to buffer the function calls. It wraps the original dispatcher.
Expand All @@ -64,6 +63,7 @@ def __init__(self, max_size: int):
self.subdispatcher = get_dispatcher()
self._tasks = TaskBuffer()
self._batch_guard = BatchGuard()
self._all_job_ids = []

def flush(self, options: typing.Optional[SlurmOptions] = None) -> typing.List[int]:
"""
Expand All @@ -90,7 +90,14 @@ def flush(self, options: typing.Optional[SlurmOptions] = None) -> typing.List[in
job_ids.append(job_id)
tasks = tasks[: self.max_size]
self._tasks.clear()
self._all_job_ids.extend(job_ids)
return job_ids

def get_all_job_ids(self):
"""
Return all job ids that have been used.
"""
return list(self._all_job_ids)

def add(self, func: SlurmFunction, *args, **kwargs):
"""
Expand All @@ -105,8 +112,14 @@ def add(self, func: SlurmFunction, *args, **kwargs):
)

def _dispatch(
self, funcs: typing.Iterable[FunctionCall], options: SlurmOptions
self,
funcs: typing.Iterable[FunctionCall],
options: SlurmOptions,
block: bool = False,
) -> int:
if block:
# if blocking, we don't buffer, but dispatch immediately
return self.subdispatcher._dispatch(funcs, options, block=True)
for func in funcs:
self._tasks.add(func, options)
return -1
Expand All @@ -117,6 +130,7 @@ def srun(
conf: typing.Optional[typing.Dict] = None,
simple_slurm_kwargs: typing.Optional[typing.Dict] = None,
):
conf = SlurmOptions(conf if conf else {})
return self.subdispatcher.srun(command, conf, simple_slurm_kwargs)

def sbatch(
Expand All @@ -125,6 +139,7 @@ def sbatch(
conf: typing.Optional[typing.Dict] = None,
simple_slurm_kwargs: typing.Optional[typing.Dict] = None,
):
conf = SlurmOptions(conf if conf else {})
return self.subdispatcher.sbatch(command, conf, simple_slurm_kwargs)

def __enter__(self):
Expand Down Expand Up @@ -152,5 +167,21 @@ def _log_dispatch(self, funcs: typing.List[FunctionCall], options: SlurmOptions)
def __del__(self):
self.flush()

def join(self):
self.flush()
return self.subdispatcher.join()

def is_sequential(self):
return self.subdispatcher.is_sequential()



class Batch(JobBundling):
"""
Compatibility alias for JobBundling. This is the old name. Deprecated.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
logging.getLogger("slurminade").warning(
"The `Batch` class has been renamed to `JobBundling`. Please update your code."
)
106 changes: 106 additions & 0 deletions src/slurminade/check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import socket
import tempfile
import time
from pathlib import Path

import click

from slurminade import join, slurmify, srun


@slurmify()
def _write_to_file(path, content):
time.sleep(1)
# get hostname and write it to the file
hostname = socket.gethostname()
with open(path, "w") as file:
print("Hello from ", hostname)
file.write(content + "\n" + hostname)
# wait a second for the file to be written
time.sleep(1)


@click.command()
@click.option("--partition", default=None, help="The partition to use.")
@click.option("--constraint", default=None, help="The constraint to use.")
def check_slurm(partition, constraint):
"""
Check if the code is running on a slurm node.
"""
# enforce slurm
from slurminade.conf import update_default_configuration
from slurminade.dispatcher import SlurmDispatcher, set_dispatcher
from slurminade.function_map import set_entry_point

set_dispatcher(SlurmDispatcher())
print("Setting entry point to ", Path(__file__).resolve())
set_entry_point(Path(__file__).resolve())

if partition:
update_default_configuration(partition=partition)
if constraint:
update_default_configuration(constraint=constraint)

# create a temporary folder for the slurm check
with tempfile.TemporaryDirectory(dir=".") as tmpdir:
tmpdir = Path(tmpdir).resolve()
assert Path(tmpdir).exists()
# Check 1
tmp_file_path = tmpdir / "check_1.txt"
_write_to_file.distribute_and_wait(str(tmp_file_path), "test")
if not Path(tmp_file_path).exists():
msg = "Slurminade failed: The file was not written to the temporary directory."
raise Exception(msg)
with open(tmp_file_path) as file:
content = file.readlines()
print(
"Slurminade check 1 successful. Test was run on node",
content[1].strip(),
)

# Check 2
tmp_file_path = tmpdir / "check_2.txt"
_write_to_file.distribute(str(tmp_file_path), "test")
# wait up to 1 minutes for the file to be written
for _ in range(60):
if Path(tmp_file_path).exists():
break
time.sleep(1)
if not Path(tmp_file_path).exists():
msg = "Slurminade failed: The file was not written to the temporary directory."
raise Exception(msg)
with open(tmp_file_path) as file:
content = file.readlines()
print(
"Slurminade check 2 successful. Test was run on node",
content[1].strip(),
)

join()

# Check 3
tmp_file_path = tmpdir / "check_3.txt"
srun(["touch", str(tmp_file_path)])
time.sleep(1)
if not Path(tmp_file_path).exists():
msg = "Slurminade failed: The file was not written to the temporary directory."
raise Exception(msg)
print("Slurminade check 3 successful.")
tmp_file_path.unlink()

# Check 4
tmp_file_path = tmpdir / "check_4.txt"
_write_to_file.distribute_and_wait(str(tmp_file_path), "test")
if not Path(tmp_file_path).exists():
msg = "Slurminade failed: The file was not written to the temporary directory."
raise Exception(msg)
with open(tmp_file_path) as file:
content = file.readlines()
print(
"Slurminade check 1 successful. Test was run on node",
content[1].strip(),
)


if __name__ == "__main__":
check_slurm()
Loading

0 comments on commit 987d4c8

Please sign in to comment.