Skip to content

Commit 40c35e3

Browse files
[minor] Move create_executor() function to separate module (#550)
* [minor] Move create_executor() function to separate module * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * split create function into sub functions * fix type hints * fix type hints again --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 7f9448a commit 40c35e3

File tree

4 files changed

+264
-199
lines changed

4 files changed

+264
-199
lines changed

executorlib/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
from typing import Callable, Optional
22

33
from executorlib._version import get_versions as _get_versions
4+
from executorlib.interactive.create import create_executor as _create_executor
45
from executorlib.interactive.executor import (
56
ExecutorWithDependencies as _ExecutorWithDependencies,
67
)
7-
from executorlib.interactive.executor import create_executor as _create_executor
88
from executorlib.standalone.inputcheck import (
99
check_plot_dependency_graph as _check_plot_dependency_graph,
1010
)

executorlib/interactive/create.py

Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
from typing import Callable, Optional, Union
2+
3+
from executorlib.interactive.shared import (
4+
InteractiveExecutor,
5+
InteractiveStepExecutor,
6+
)
7+
from executorlib.interactive.slurm import SrunSpawner
8+
from executorlib.interactive.slurm import (
9+
validate_max_workers as validate_max_workers_slurm,
10+
)
11+
from executorlib.standalone.inputcheck import (
12+
check_command_line_argument_lst,
13+
check_executor,
14+
check_flux_log_files,
15+
check_gpus_per_worker,
16+
check_init_function,
17+
check_nested_flux_executor,
18+
check_oversubscribe,
19+
check_pmi,
20+
validate_number_of_cores,
21+
)
22+
from executorlib.standalone.interactive.spawner import MpiExecSpawner
23+
24+
try: # The PyFluxExecutor requires flux-base to be installed.
25+
from executorlib.interactive.flux import FluxPythonSpawner
26+
from executorlib.interactive.flux import (
27+
validate_max_workers as validate_max_workers_flux,
28+
)
29+
except ImportError:
30+
pass
31+
32+
33+
def create_executor(
34+
max_workers: Optional[int] = None,
35+
backend: str = "local",
36+
max_cores: Optional[int] = None,
37+
cache_directory: Optional[str] = None,
38+
resource_dict: dict = {},
39+
flux_executor=None,
40+
flux_executor_pmi_mode: Optional[str] = None,
41+
flux_executor_nesting: bool = False,
42+
flux_log_files: bool = False,
43+
hostname_localhost: Optional[bool] = None,
44+
block_allocation: bool = False,
45+
init_function: Optional[Callable] = None,
46+
) -> Union[InteractiveStepExecutor, InteractiveExecutor]:
47+
"""
48+
Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor,
49+
executorlib.slurm.PySlurmExecutor or executorlib.flux.PyFluxExecutor depending on which backend is available. The
50+
executorlib.flux.PyFluxExecutor is the preferred choice while the executorlib.mpi.PyMPIExecutor is primarily used
51+
for development and testing. The executorlib.flux.PyFluxExecutor requires flux-base from the flux-framework to be
52+
installed and in addition flux-sched to enable GPU scheduling. Finally, the executorlib.slurm.PySlurmExecutor
53+
requires the SLURM workload manager to be installed on the system.
54+
55+
Args:
56+
max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
57+
cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
58+
recommended, as computers have a limited number of compute cores.
59+
backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local".
60+
max_cores (int): defines the number cores which can be used in parallel
61+
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
62+
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
63+
- cores (int): number of MPI cores to be used for each function call
64+
- threads_per_core (int): number of OpenMP threads to be used for each function call
65+
- gpus_per_core (int): number of GPUs per worker - defaults to 0
66+
- cwd (str/None): current working directory where the parallel python task is executed
67+
- openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and
68+
SLURM only) - default False
69+
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
70+
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
71+
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
72+
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
73+
flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
74+
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
75+
context of an HPC cluster this essential to be able to communicate to an Executor
76+
running on a different compute node within the same allocation. And in principle
77+
any computer should be able to resolve that their own hostname points to the same
78+
address as localhost. Still MacOS >= 12 seems to disable this look up for security
79+
reasons. So on MacOS it is required to set this option to true
80+
block_allocation (boolean): To accelerate the submission of a series of python functions with the same
81+
resource requirements, executorlib supports block allocation. In this case all
82+
resources have to be defined on the executor, rather than during the submission
83+
of the individual function.
84+
init_function (None): optional function to preset arguments for functions which are submitted later
85+
"""
86+
check_init_function(block_allocation=block_allocation, init_function=init_function)
87+
if flux_executor is not None and backend != "flux_allocation":
88+
backend = "flux_allocation"
89+
check_pmi(backend=backend, pmi=flux_executor_pmi_mode)
90+
cores_per_worker = resource_dict.get("cores", 1)
91+
resource_dict["cache_directory"] = cache_directory
92+
resource_dict["hostname_localhost"] = hostname_localhost
93+
if backend == "flux_allocation":
94+
check_oversubscribe(
95+
oversubscribe=resource_dict.get("openmpi_oversubscribe", False)
96+
)
97+
check_command_line_argument_lst(
98+
command_line_argument_lst=resource_dict.get("slurm_cmd_args", [])
99+
)
100+
return create_flux_allocation_executor(
101+
max_workers=max_workers,
102+
max_cores=max_cores,
103+
cores_per_worker=cores_per_worker,
104+
resource_dict=resource_dict,
105+
flux_executor=flux_executor,
106+
flux_executor_pmi_mode=flux_executor_pmi_mode,
107+
flux_executor_nesting=flux_executor_nesting,
108+
flux_log_files=flux_log_files,
109+
block_allocation=block_allocation,
110+
init_function=init_function,
111+
)
112+
elif backend == "slurm_allocation":
113+
check_executor(executor=flux_executor)
114+
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
115+
check_flux_log_files(flux_log_files=flux_log_files)
116+
return create_slurm_allocation_executor(
117+
max_workers=max_workers,
118+
max_cores=max_cores,
119+
cores_per_worker=cores_per_worker,
120+
resource_dict=resource_dict,
121+
block_allocation=block_allocation,
122+
init_function=init_function,
123+
)
124+
elif backend == "local":
125+
check_executor(executor=flux_executor)
126+
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
127+
check_flux_log_files(flux_log_files=flux_log_files)
128+
check_gpus_per_worker(gpus_per_worker=resource_dict.get("gpus_per_core", 0))
129+
check_command_line_argument_lst(
130+
command_line_argument_lst=resource_dict.get("slurm_cmd_args", [])
131+
)
132+
return create_local_executor(
133+
max_workers=max_workers,
134+
max_cores=max_cores,
135+
cores_per_worker=cores_per_worker,
136+
resource_dict=resource_dict,
137+
block_allocation=block_allocation,
138+
init_function=init_function,
139+
)
140+
else:
141+
raise ValueError(
142+
"The supported backends are slurm_allocation, slurm_submission, flux_allocation, flux_submission and local."
143+
)
144+
145+
146+
def create_flux_allocation_executor(
147+
max_workers: Optional[int] = None,
148+
max_cores: Optional[int] = None,
149+
cores_per_worker: int = 1,
150+
resource_dict: dict = {},
151+
flux_executor=None,
152+
flux_executor_pmi_mode: Optional[str] = None,
153+
flux_executor_nesting: bool = False,
154+
flux_log_files: bool = False,
155+
block_allocation: bool = False,
156+
init_function: Optional[Callable] = None,
157+
) -> Union[InteractiveStepExecutor, InteractiveExecutor]:
158+
if "openmpi_oversubscribe" in resource_dict.keys():
159+
del resource_dict["openmpi_oversubscribe"]
160+
if "slurm_cmd_args" in resource_dict.keys():
161+
del resource_dict["slurm_cmd_args"]
162+
resource_dict["flux_executor"] = flux_executor
163+
resource_dict["flux_executor_pmi_mode"] = flux_executor_pmi_mode
164+
resource_dict["flux_executor_nesting"] = flux_executor_nesting
165+
resource_dict["flux_log_files"] = flux_log_files
166+
if block_allocation:
167+
resource_dict["init_function"] = init_function
168+
max_workers = validate_number_of_cores(
169+
max_cores=max_cores,
170+
max_workers=max_workers,
171+
cores_per_worker=cores_per_worker,
172+
set_local_cores=False,
173+
)
174+
validate_max_workers_flux(
175+
max_workers=max_workers,
176+
cores=cores_per_worker,
177+
threads_per_core=resource_dict.get("threads_per_core", 1),
178+
)
179+
return InteractiveExecutor(
180+
max_workers=max_workers,
181+
executor_kwargs=resource_dict,
182+
spawner=FluxPythonSpawner,
183+
)
184+
else:
185+
return InteractiveStepExecutor(
186+
max_cores=max_cores,
187+
max_workers=max_workers,
188+
executor_kwargs=resource_dict,
189+
spawner=FluxPythonSpawner,
190+
)
191+
192+
193+
def create_slurm_allocation_executor(
194+
max_workers: Optional[int] = None,
195+
max_cores: Optional[int] = None,
196+
cores_per_worker: int = 1,
197+
resource_dict: dict = {},
198+
block_allocation: bool = False,
199+
init_function: Optional[Callable] = None,
200+
) -> Union[InteractiveStepExecutor, InteractiveExecutor]:
201+
if block_allocation:
202+
resource_dict["init_function"] = init_function
203+
max_workers = validate_number_of_cores(
204+
max_cores=max_cores,
205+
max_workers=max_workers,
206+
cores_per_worker=cores_per_worker,
207+
set_local_cores=False,
208+
)
209+
validate_max_workers_slurm(
210+
max_workers=max_workers,
211+
cores=cores_per_worker,
212+
threads_per_core=resource_dict.get("threads_per_core", 1),
213+
)
214+
return InteractiveExecutor(
215+
max_workers=max_workers,
216+
executor_kwargs=resource_dict,
217+
spawner=SrunSpawner,
218+
)
219+
else:
220+
return InteractiveStepExecutor(
221+
max_cores=max_cores,
222+
max_workers=max_workers,
223+
executor_kwargs=resource_dict,
224+
spawner=SrunSpawner,
225+
)
226+
227+
228+
def create_local_executor(
229+
max_workers: Optional[int] = None,
230+
max_cores: Optional[int] = None,
231+
cores_per_worker: int = 1,
232+
resource_dict: dict = {},
233+
block_allocation: bool = False,
234+
init_function: Optional[Callable] = None,
235+
) -> Union[InteractiveStepExecutor, InteractiveExecutor]:
236+
if "threads_per_core" in resource_dict.keys():
237+
del resource_dict["threads_per_core"]
238+
if "gpus_per_core" in resource_dict.keys():
239+
del resource_dict["gpus_per_core"]
240+
if "slurm_cmd_args" in resource_dict.keys():
241+
del resource_dict["slurm_cmd_args"]
242+
if block_allocation:
243+
resource_dict["init_function"] = init_function
244+
return InteractiveExecutor(
245+
max_workers=validate_number_of_cores(
246+
max_cores=max_cores,
247+
max_workers=max_workers,
248+
cores_per_worker=cores_per_worker,
249+
set_local_cores=True,
250+
),
251+
executor_kwargs=resource_dict,
252+
spawner=MpiExecSpawner,
253+
)
254+
else:
255+
return InteractiveStepExecutor(
256+
max_cores=max_cores,
257+
max_workers=max_workers,
258+
executor_kwargs=resource_dict,
259+
spawner=MpiExecSpawner,
260+
)

0 commit comments

Comments
 (0)