Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1.0.1 #22

Merged
merged 2 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ The project is reasonably easy:
Changes
-------

- 1.0.1: Dispatcher now return jobs references instead of job ids. This allows to do some fancier stuff in the future, when the jobs infos are only available a short time after the job has been submitted.
- 0.10.1: FIX: Listing functions will no longer execute setup functions.
- 0.10.0: `Batch` is now named `JobBundling`. There is a method `join` for easier synchronization. `exec` allows to executed commands just like `srun` and `sbatch`, but uniform syntax with other slurmified functions. Functions can now also be called with `distribute_and_wait`. If you call `python3 -m slurminade.check --partition YOUR_PARTITION --constraint YOUR_CONSTRAINT` you can check if your slurm configuration is running correctly.
- 0.9.0: Lots of improvements.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ where = ["src"]

[project]
name = "slurminade"
version = "0.10.1"
version = "1.0.1"
authors = [
{ name = "TU Braunschweig, IBR, Algorithms Group (Dominik Krupke)", email = "[email protected]" },
]
Expand Down
22 changes: 18 additions & 4 deletions src/slurminade/bundling.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,21 @@
from .function import SlurmFunction
from .guard import BatchGuard
from .options import SlurmOptions
from .job_reference import JobReference

class BundlingJobReference(JobReference):
def __init__(self) -> None:
super().__init__()
pass

def get_job_id(self) -> typing.Optional[int]:
return None

def get_exit_code(self) -> typing.Optional[int]:
return None

def get_info(self) -> typing.Dict[str, typing.Any]:
return {}

class TaskBuffer:
"""
Expand Down Expand Up @@ -116,20 +130,20 @@ def _dispatch(
funcs: typing.Iterable[FunctionCall],
options: SlurmOptions,
block: bool = False,
) -> int:
) -> JobReference:
if block:
# if blocking, we don't buffer, but dispatch immediately
return self.subdispatcher._dispatch(funcs, options, block=True)
for func in funcs:
self._tasks.add(func, options)
return -1
return BundlingJobReference()

def srun(
self,
command: str,
conf: typing.Optional[typing.Dict] = None,
simple_slurm_kwargs: typing.Optional[typing.Dict] = None,
):
) -> JobReference:
conf = SlurmOptions(conf if conf else {})
return self.subdispatcher.srun(command, conf, simple_slurm_kwargs)

Expand All @@ -138,7 +152,7 @@ def sbatch(
command: str,
conf: typing.Optional[typing.Dict] = None,
simple_slurm_kwargs: typing.Optional[typing.Dict] = None,
):
) -> JobReference:
conf = SlurmOptions(conf if conf else {})
return self.subdispatcher.sbatch(command, conf, simple_slurm_kwargs)

Expand Down
86 changes: 69 additions & 17 deletions src/slurminade/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import shutil
import subprocess
import typing

from typing import Any, Dict, Optional
import simple_slurm

from .conf import _get_conf
Expand All @@ -24,6 +24,7 @@
# MAX_ARG_STRLEN on a Linux system with PAGE_SIZE 4096 is 131072
DEFAULT_MAX_ARG_LENGTH = 100000

from .job_reference import JobReference

class Dispatcher(abc.ABC):
"""
Expand All @@ -38,7 +39,7 @@ def _dispatch(
funcs: typing.Iterable[FunctionCall],
options: SlurmOptions,
block: bool = False,
) -> int:
) -> JobReference:
"""
Define how to dispatch a number of function calls.
:param funcs: The function calls to be dispatched.
Expand All @@ -52,7 +53,7 @@ def srun(
command: str,
conf: typing.Optional[SlurmOptions] = None,
simple_slurm_kwargs: typing.Optional[typing.Dict] = None,
) -> int:
) -> JobReference:
"""
Define how you want to execute an `srun` command. This command is directly
executed and only terminates after completion.
Expand All @@ -68,7 +69,7 @@ def sbatch(
command: str,
conf: typing.Optional[SlurmOptions] = None,
simple_slurm_kwargs: typing.Optional[typing.Dict] = None,
) -> int:
) -> JobReference:
"""
Define how you want to execute an `sbatch` command. The command is scheduled
and the function return immediately.
Expand All @@ -93,7 +94,7 @@ def __call__(
funcs: typing.Union[FunctionCall, typing.Iterable[FunctionCall]],
options: SlurmOptions,
block: bool = False,
) -> int:
) -> JobReference:
"""
Dispatches a function call or a number of function calls.
:param funcs: The function calls to be distributed.
Expand Down Expand Up @@ -124,6 +125,15 @@ def join(self):
msg = "Joining is not implemented for this dispatcher."
raise NotImplementedError(msg)

class TestJobReference(JobReference):
def get_job_id(self) -> None:
return None

def get_exit_code(self) -> None:
return None

def get_info(self) -> Dict[str, Any]:
return {"info": "test"}

class TestDispatcher(Dispatcher):
"""
Expand All @@ -142,7 +152,7 @@ def _dispatch(
funcs: typing.Iterable[FunctionCall],
options: SlurmOptions,
block: bool = False,
) -> int:
) -> JobReference:
dispatch_guard()
funcs = list(funcs)
command = create_slurminade_command(
Expand All @@ -151,7 +161,7 @@ def _dispatch(
logging.getLogger("slurminade").info(command)
self.calls.append(funcs)
self._cleanup(command)
return -1
return TestJobReference()

def _cleanup(self, command):
args = shlex.split(command)
Expand All @@ -170,6 +180,7 @@ def srun(
dispatch_guard()
self.sruns.append(command)
logging.getLogger("slurminade").info("[test output] SRUN %s", command)
return TestJobReference()

def sbatch(
self,
Expand All @@ -180,10 +191,28 @@ def sbatch(
dispatch_guard()
self.sbatches.append(command)
logging.getLogger("slurminade").info("[test output] SBATCH %s", command)
return TestJobReference()

def is_sequential(self):
return True

class SlurmJobReference(JobReference):
def __init__(self, job_id, exit_code, mode: str):
self.job_id = job_id
self.exit_code = exit_code
self.mode = mode

def get_job_id(self) -> int:
return self.job_id

def get_exit_code(self) -> Optional[int]:
return self.exit_code

def get_info(self) -> Dict[str, Any]:
return {"job_id": self.job_id,
"exit_code": self.exit_code,
"on_slurm": True,
"mode": self.mode}

class SlurmDispatcher(Dispatcher):
"""
Expand Down Expand Up @@ -214,7 +243,7 @@ def _dispatch(
funcs: typing.Iterable[FunctionCall],
options: SlurmOptions,
block: bool = False,
) -> typing.Optional[int]:
) -> SlurmJobReference:
dispatch_guard()
if "job_name" not in options:
funcs = list(funcs)
Expand All @@ -234,17 +263,17 @@ def _dispatch(
logging.getLogger("slurminade").info(
"Returned from srun with exit code %s", ret
)
return None
return SlurmJobReference(None, ret, "srun")
jid = slurm.sbatch(command)
self._all_job_ids.append(jid)
return jid
return SlurmJobReference(jid, None, "sbatch")

def sbatch(
self,
command: str,
conf: typing.Optional[typing.Dict] = None,
simple_slurm_kwargs: typing.Optional[typing.Dict] = None,
):
) -> SlurmJobReference:
dispatch_guard()
conf = _get_conf(conf)
slurm = simple_slurm.Slurm(**conf)
Expand All @@ -254,7 +283,7 @@ def sbatch(
else:
jid = slurm.sbatch(command)
self._all_job_ids.append(jid)
return jid
return SlurmJobReference(jid, None, "sbatch")

def join(self):
if not self._all_job_ids:
Expand All @@ -266,7 +295,7 @@ def srun(
command: str,
conf: typing.Optional[typing.Dict] = None,
simple_slurm_kwargs: typing.Optional[typing.Dict] = None,
):
) -> SlurmJobReference:
dispatch_guard()
conf = _get_conf(conf)
slurm = simple_slurm.Slurm(**conf)
Expand All @@ -275,8 +304,20 @@ def srun(
ret = slurm.srun(command, **simple_slurm_kwargs)
else:
ret = slurm.srun(command)
return ret
return SlurmJobReference(None, ret, "srun")

class SubprocessJobReference(JobReference):
def __init__(self):
pass

def get_job_id(self) -> Optional[int]:
return None

def get_exit_code(self) -> Optional[int]:
return None

def get_info(self) -> Dict[str, Any]:
return {"on_slurm": False}

class SubprocessDispatcher(Dispatcher):
"""
Expand Down Expand Up @@ -325,6 +366,16 @@ def sbatch(
def is_sequential(self):
return True

class LocalJobReference(JobReference):
def get_job_id(self) -> None:
return None

def get_exit_code(self) -> None:
return None

def get_info(self) -> Dict[str, Any]:
return {"on_slurm": False}


class DirectCallDispatcher(Dispatcher):
"""
Expand All @@ -338,11 +389,11 @@ def _dispatch(
funcs: typing.Iterable[FunctionCall],
options: SlurmOptions,
block: bool = False,
) -> int:
) -> LocalJobReference:
dispatch_guard()
for func in funcs:
FunctionMap.call(func.func_id, func.args, func.kwargs)
return -1
return LocalJobReference()

def srun(
self,
Expand All @@ -352,14 +403,15 @@ def srun(
):
dispatch_guard()
subprocess.run(command, check=True)
return LocalJobReference()

def sbatch(
self,
command: str,
conf: typing.Optional[typing.Dict] = None,
simple_slurm_kwargs: typing.Optional[typing.Dict] = None,
):
self.srun(command)
return self.srun(command)

def is_sequential(self):
return True
Expand Down
10 changes: 5 additions & 5 deletions src/slurminade/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from .function_map import FunctionMap
from .guard import guard_recursive_distribution
from .options import SlurmOptions

from .job_reference import JobReference

class CallPolicy(Enum):
"""
Expand Down Expand Up @@ -52,7 +52,7 @@ def update_options(self, conf: typing.Dict[str, typing.Any]):
self.special_slurm_opts.update(conf)

def wait_for(
self, job_ids: typing.Union[int, typing.Iterable[int]], method: str = "afterany"
self, job_ids: typing.Union[JobReference, typing.Iterable[JobReference]], method: str = "afterany"
) -> "SlurmFunction":
"""
Add a dependency to a distribution.
Expand All @@ -69,17 +69,17 @@ def wait_for(
"""
sfunc = SlurmFunction(self.special_slurm_opts, self.func, self.func_id)
job_ids = (
[job_ids] if isinstance(job_ids, int) else list(job_ids)
[job_ids] if isinstance(job_ids, JobReference) else list(job_ids)
) # make sure it is a list
if not job_ids and not get_dispatcher().is_sequential():
msg = "Creating a dependency on an empty list of job ids."
msg += " This is probably an error in your code."
msg += " Maybe you are using `Batch` but flush outside of the `with` block?"
raise RuntimeError(msg)
if any(jid < 0 for jid in job_ids) and not get_dispatcher().is_sequential():
if any(jid.get_job_id() is None for jid in job_ids) and not get_dispatcher().is_sequential():
msg = "Invalid job id. Not every dispatcher can directly return job ids, because it may not directly distribute them or doesn't distribute them at all."
raise RuntimeError(msg)
sfunc.special_slurm_opts.add_dependencies(list(job_ids), method)
sfunc.special_slurm_opts.add_dependencies(list(jid.get_job_id() for jid in job_ids), method)
return sfunc

def with_options(self, **kwargs) -> "SlurmFunction":
Expand Down
15 changes: 15 additions & 0 deletions src/slurminade/job_reference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import abc
from typing import Any, Dict, Optional

class JobReference(abc.ABC):
@abc.abstractmethod
def get_job_id(self) -> Optional[int]:
pass

@abc.abstractmethod
def get_exit_code(self) -> Optional[int]:
pass

@abc.abstractmethod
def get_info(self) -> Dict[str, Any]:
pass
Loading