Skip to content

Commit 0ffd312

Browse files
authored
[feature] Add option to specify number of nodes (#565)
* [feature] Add option to specify number of nodes * extend test * Add DocStrings
1 parent ca707f9 commit 0ffd312

File tree

5 files changed

+54
-17
lines changed

5 files changed

+54
-17
lines changed

executorlib/interactive/flux.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ class FluxPythonSpawner(BaseSpawner):
2929
cores (int, optional): The number of cores. Defaults to 1.
3030
threads_per_core (int, optional): The number of threads per base. Defaults to 1.
3131
gpus_per_core (int, optional): The number of GPUs per base. Defaults to 0.
32+
num_nodes (int, optional): The number of compute nodes to use for executing the task. Defaults to None.
33+
exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing compute notes. Defaults to False.
3234
openmpi_oversubscribe (bool, optional): Whether to oversubscribe. Defaults to False.
3335
flux_executor (flux.job.FluxExecutor, optional): The FluxExecutor instance. Defaults to None.
3436
flux_executor_pmi_mode (str, optional): The PMI option. Defaults to None.
@@ -42,6 +44,8 @@ def __init__(
4244
cores: int = 1,
4345
threads_per_core: int = 1,
4446
gpus_per_core: int = 0,
47+
num_nodes: Optional[int] = None,
48+
exclusive: bool = False,
4549
openmpi_oversubscribe: bool = False,
4650
flux_executor: Optional[flux.job.FluxExecutor] = None,
4751
flux_executor_pmi_mode: Optional[str] = None,
@@ -55,6 +59,8 @@ def __init__(
5559
)
5660
self._threads_per_core = threads_per_core
5761
self._gpus_per_core = gpus_per_core
62+
self._num_nodes = num_nodes
63+
self._exclusive = exclusive
5864
self._flux_executor = flux_executor
5965
self._flux_executor_pmi_mode = flux_executor_pmi_mode
6066
self._flux_executor_nesting = flux_executor_nesting
@@ -85,17 +91,17 @@ def bootup(
8591
num_tasks=self._cores,
8692
cores_per_task=self._threads_per_core,
8793
gpus_per_task=self._gpus_per_core,
88-
num_nodes=None,
89-
exclusive=False,
94+
num_nodes=self._num_nodes,
95+
exclusive=self._exclusive,
9096
)
9197
else:
9298
jobspec = flux.job.JobspecV1.from_nest_command(
9399
command=command_lst,
94100
num_slots=self._cores,
95101
cores_per_slot=self._threads_per_core,
96102
gpus_per_slot=self._gpus_per_core,
97-
num_nodes=None,
98-
exclusive=False,
103+
num_nodes=self._num_nodes,
104+
exclusive=self._exclusive,
99105
)
100106
jobspec.environment = dict(os.environ)
101107
if self._flux_executor_pmi_mode is not None:

executorlib/interactive/slurm.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ def __init__(
2727
cores: int = 1,
2828
threads_per_core: int = 1,
2929
gpus_per_core: int = 0,
30+
num_nodes: Optional[int] = None,
31+
exclusive: bool = False,
3032
openmpi_oversubscribe: bool = False,
3133
slurm_cmd_args: Optional[list[str]] = None,
3234
):
@@ -38,6 +40,8 @@ def __init__(
3840
cores (int, optional): The number of cores to use. Defaults to 1.
3941
threads_per_core (int, optional): The number of threads per core. Defaults to 1.
4042
gpus_per_core (int, optional): The number of GPUs per core. Defaults to 0.
43+
num_nodes (int, optional): The number of compute nodes to use for executing the task. Defaults to None.
44+
exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing compute notes. Defaults to False.
4145
openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
4246
slurm_cmd_args (list[str], optional): Additional command line arguments. Defaults to [].
4347
"""
@@ -49,6 +53,8 @@ def __init__(
4953
)
5054
self._gpus_per_core = gpus_per_core
5155
self._slurm_cmd_args = slurm_cmd_args
56+
self._num_nodes = num_nodes
57+
self._exclusive = exclusive
5258

5359
def generate_command(self, command_lst: list[str]) -> list[str]:
5460
"""
@@ -65,6 +71,8 @@ def generate_command(self, command_lst: list[str]) -> list[str]:
6571
cwd=self._cwd,
6672
threads_per_core=self._threads_per_core,
6773
gpus_per_core=self._gpus_per_core,
74+
num_nodes=self._num_nodes,
75+
exclusive=self._exclusive,
6876
openmpi_oversubscribe=self._openmpi_oversubscribe,
6977
slurm_cmd_args=self._slurm_cmd_args,
7078
)
@@ -78,6 +86,8 @@ def generate_slurm_command(
7886
cwd: Optional[str],
7987
threads_per_core: int = 1,
8088
gpus_per_core: int = 0,
89+
num_nodes: Optional[int] = None,
90+
exclusive: bool = False,
8191
openmpi_oversubscribe: bool = False,
8292
slurm_cmd_args: Optional[list[str]] = None,
8393
) -> list[str]:
@@ -89,6 +99,8 @@ def generate_slurm_command(
8999
cwd (str): The current working directory.
90100
threads_per_core (int, optional): The number of threads per core. Defaults to 1.
91101
gpus_per_core (int, optional): The number of GPUs per core. Defaults to 0.
102+
num_nodes (int, optional): The number of compute nodes to use for executing the task. Defaults to None.
103+
exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing compute notes. Defaults to False.
92104
openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
93105
slurm_cmd_args (list[str], optional): Additional command line arguments. Defaults to [].
94106
@@ -98,10 +110,14 @@ def generate_slurm_command(
98110
command_prepend_lst = [SLURM_COMMAND, "-n", str(cores)]
99111
if cwd is not None:
100112
command_prepend_lst += ["-D", cwd]
113+
if num_nodes is not None:
114+
command_prepend_lst += ["-N", str(num_nodes)]
101115
if threads_per_core > 1:
102116
command_prepend_lst += ["--cpus-per-task=" + str(threads_per_core)]
103117
if gpus_per_core > 0:
104118
command_prepend_lst += ["--gpus-per-task=" + str(gpus_per_core)]
119+
if exclusive:
120+
command_prepend_lst += ["--exact"]
105121
if openmpi_oversubscribe:
106122
command_prepend_lst += ["--oversubscribe"]
107123
if slurm_cmd_args is not None and len(slurm_cmd_args) > 0:

executorlib/interfaces/flux.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,10 @@ class FluxJobExecutor:
4343
- threads_per_core (int): number of OpenMP threads to be used for each function call
4444
- gpus_per_core (int): number of GPUs per worker - defaults to 0
4545
- cwd (str/None): current working directory where the parallel python task is executed
46-
- openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and
47-
SLURM only) - default False
48-
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
46+
- num_nodes (int, optional): The number of compute nodes to use for executing the task.
47+
Defaults to None.
48+
- exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing
49+
compute notes. Defaults to False.
4950
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
5051
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
5152
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
@@ -147,10 +148,10 @@ def __new__(
147148
- threads_per_core (int): number of OpenMP threads to be used for each function call
148149
- gpus_per_core (int): number of GPUs per worker - defaults to 0
149150
- cwd (str/None): current working directory where the parallel python task is executed
150-
- openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI
151-
and SLURM only) - default False
152-
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM
153-
only)
151+
- num_nodes (int, optional): The number of compute nodes to use for executing the task.
152+
Defaults to None.
153+
- exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing
154+
compute notes. Defaults to False.
154155
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
155156
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
156157
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
@@ -444,10 +445,10 @@ def create_flux_executor(
444445
- threads_per_core (int): number of OpenMP threads to be used for each function call
445446
- gpus_per_core (int): number of GPUs per worker - defaults to 0
446447
- cwd (str/None): current working directory where the parallel python task is executed
447-
- openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI
448-
and SLURM only) - default False
449-
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM
450-
only)
448+
- num_nodes (int, optional): The number of compute nodes to use for executing the task.
449+
Defaults to None.
450+
- exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing
451+
compute notes. Defaults to False.
451452
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
452453
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
453454
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.

executorlib/interfaces/slurm.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,10 @@ class SlurmJobExecutor:
227227
- openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and
228228
SLURM only) - default False
229229
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
230+
- num_nodes (int, optional): The number of compute nodes to use for executing the task.
231+
Defaults to None.
232+
- exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing
233+
compute notes. Defaults to False.
230234
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
231235
context of an HPC cluster this essential to be able to communicate to an
232236
Executor running on a different compute node within the same allocation. And
@@ -320,6 +324,10 @@ def __new__(
320324
and SLURM only) - default False
321325
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM
322326
only)
327+
- num_nodes (int, optional): The number of compute nodes to use for executing the task.
328+
Defaults to None.
329+
- exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing
330+
compute notes. Defaults to False.
323331
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
324332
context of an HPC cluster this essential to be able to communicate to an
325333
Executor running on a different compute node within the same allocation. And
@@ -409,6 +417,10 @@ def create_slurm_executor(
409417
and SLURM only) - default False
410418
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM
411419
only)
420+
- num_nodes (int, optional): The number of compute nodes to use for executing the task.
421+
Defaults to None.
422+
- exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing
423+
compute notes. Defaults to False.
412424
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
413425
context of an HPC cluster this essential to be able to communicate to an
414426
Executor running on a different compute node within the same allocation. And

tests/test_pysqa_subprocess.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,11 @@ def test_generate_slurm_command(self):
5151
cwd="/tmp/test",
5252
threads_per_core=2,
5353
gpus_per_core=1,
54+
num_nodes=1,
55+
exclusive=True,
5456
openmpi_oversubscribe=True,
5557
slurm_cmd_args=["--help"],
5658
)
57-
self.assertEqual(len(command_lst), 9)
58-
reply_lst = ['srun', '-n', '1', '-D', '/tmp/test', '--cpus-per-task=2', '--gpus-per-task=1', '--oversubscribe', '--help']
59+
self.assertEqual(len(command_lst), 12)
60+
reply_lst = ['srun', '-n', '1', '-D', '/tmp/test', '-N', '1', '--cpus-per-task=2', '--gpus-per-task=1', '--exact', '--oversubscribe', '--help']
5961
self.assertEqual(command_lst, reply_lst)

0 commit comments

Comments
 (0)