Skip to content

Commit f3c99a1

Browse files
Internal refactoring for interface classes (#636)
* Interfaces: Use Base Interface rather than factory pattern * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * special character from concurrent futures * fix mypy * another workaround * fix resize test * fix plot test * remove del * fix exit issue * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fixes * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Rename internal variable to _task_scheduler * update Documentation --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 5a33b55 commit f3c99a1

File tree

8 files changed

+292
-251
lines changed

8 files changed

+292
-251
lines changed

executorlib/base/executor.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ def future_queue(self) -> Optional[queue.Queue]:
7272
def submit( # type: ignore
7373
self,
7474
fn: Callable,
75+
/,
7576
*args,
7677
resource_dict: Optional[dict] = None,
7778
**kwargs,
@@ -178,3 +179,111 @@ def __del__(self):
178179
"""
179180
with contextlib.suppress(AttributeError, RuntimeError):
180181
self.shutdown(wait=False)
182+
183+
184+
class ExecutorInterface(FutureExecutor):
185+
"""
186+
Interface class for the executor.
187+
188+
Args:
189+
executor (ExecutorBase): internal executor
190+
"""
191+
192+
def __init__(self, executor: ExecutorBase):
193+
self._task_scheduler = executor
194+
195+
@property
196+
def max_workers(self) -> Optional[int]:
197+
return self._task_scheduler.max_workers
198+
199+
@max_workers.setter
200+
def max_workers(self, max_workers: int):
201+
self._task_scheduler.max_workers = max_workers
202+
203+
@property
204+
def info(self) -> Optional[dict]:
205+
"""
206+
Get the information about the executor.
207+
208+
Returns:
209+
Optional[dict]: Information about the executor.
210+
"""
211+
return self._task_scheduler.info
212+
213+
@property
214+
def future_queue(self) -> Optional[queue.Queue]:
215+
"""
216+
Get the future queue.
217+
218+
Returns:
219+
queue.Queue: The future queue.
220+
"""
221+
return self._task_scheduler.future_queue
222+
223+
def submit( # type: ignore
224+
self,
225+
fn: Callable,
226+
/,
227+
*args,
228+
resource_dict: Optional[dict] = None,
229+
**kwargs,
230+
) -> Future:
231+
"""
232+
Submits a callable to be executed with the given arguments.
233+
234+
Schedules the callable to be executed as fn(*args, **kwargs) and returns
235+
a Future instance representing the execution of the callable.
236+
237+
Args:
238+
fn (callable): function to submit for execution
239+
args: arguments for the submitted function
240+
kwargs: keyword arguments for the submitted function
241+
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
242+
function. Example resource dictionary: {
243+
cores: 1,
244+
threads_per_core: 1,
245+
gpus_per_worker: 0,
246+
oversubscribe: False,
247+
cwd: None,
248+
executor: None,
249+
hostname_localhost: False,
250+
}
251+
252+
Returns:
253+
Future: A Future representing the given call.
254+
"""
255+
return self._task_scheduler.submit(
256+
*([fn] + list(args)), resource_dict=resource_dict, **kwargs
257+
)
258+
259+
def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
260+
"""
261+
Clean-up the resources associated with the Executor.
262+
263+
It is safe to call this method several times. Otherwise, no other
264+
methods can be called after this one.
265+
266+
Args:
267+
wait (bool): If True then shutdown will not return until all running
268+
futures have finished executing and the resources used by the
269+
parallel_executors have been reclaimed.
270+
cancel_futures (bool): If True then shutdown will cancel all pending
271+
futures. Futures that are completed or running will not be
272+
cancelled.
273+
"""
274+
self._task_scheduler.shutdown(wait=wait, cancel_futures=cancel_futures)
275+
276+
def __len__(self) -> int:
277+
"""
278+
Get the length of the executor.
279+
280+
Returns:
281+
int: The length of the executor.
282+
"""
283+
return len(self._task_scheduler)
284+
285+
def __exit__(self, *args, **kwargs) -> None:
286+
"""
287+
Exit method called when exiting the context manager.
288+
"""
289+
self._task_scheduler.__exit__(*args, **kwargs)

executorlib/interfaces/flux.py

Lines changed: 58 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import contextlib
22
from typing import Callable, Optional, Union
33

4+
from executorlib.base.executor import ExecutorInterface
45
from executorlib.interactive.blockallocation import BlockAllocationExecutor
56
from executorlib.interactive.dependency import DependencyExecutor
67
from executorlib.interactive.onetoone import OneTaskPerProcessExecutor
@@ -21,7 +22,7 @@
2122
)
2223

2324

24-
class FluxJobExecutor:
25+
class FluxJobExecutor(ExecutorInterface):
2526
"""
2627
The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or
2728
preferable the flux framework for distributing python functions within a given resource allocation. In contrast to
@@ -104,27 +105,6 @@ def __init__(
104105
refresh_rate: float = 0.01,
105106
plot_dependency_graph: bool = False,
106107
plot_dependency_graph_filename: Optional[str] = None,
107-
):
108-
# Use __new__() instead of __init__(). This function is only implemented to enable auto-completion.
109-
pass
110-
111-
def __new__(
112-
cls,
113-
max_workers: Optional[int] = None,
114-
cache_directory: Optional[str] = None,
115-
max_cores: Optional[int] = None,
116-
resource_dict: Optional[dict] = None,
117-
flux_executor=None,
118-
flux_executor_pmi_mode: Optional[str] = None,
119-
flux_executor_nesting: bool = False,
120-
flux_log_files: bool = False,
121-
hostname_localhost: Optional[bool] = None,
122-
block_allocation: bool = False,
123-
init_function: Optional[Callable] = None,
124-
disable_dependencies: bool = False,
125-
refresh_rate: float = 0.01,
126-
plot_dependency_graph: bool = False,
127-
plot_dependency_graph_filename: Optional[str] = None,
128108
):
129109
"""
130110
Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor,
@@ -186,7 +166,31 @@ def __new__(
186166
{k: v for k, v in default_resource_dict.items() if k not in resource_dict}
187167
)
188168
if not disable_dependencies:
189-
return DependencyExecutor(
169+
super().__init__(
170+
executor=DependencyExecutor(
171+
executor=create_flux_executor(
172+
max_workers=max_workers,
173+
cache_directory=cache_directory,
174+
max_cores=max_cores,
175+
resource_dict=resource_dict,
176+
flux_executor=flux_executor,
177+
flux_executor_pmi_mode=flux_executor_pmi_mode,
178+
flux_executor_nesting=flux_executor_nesting,
179+
flux_log_files=flux_log_files,
180+
hostname_localhost=hostname_localhost,
181+
block_allocation=block_allocation,
182+
init_function=init_function,
183+
),
184+
max_cores=max_cores,
185+
refresh_rate=refresh_rate,
186+
plot_dependency_graph=plot_dependency_graph,
187+
plot_dependency_graph_filename=plot_dependency_graph_filename,
188+
)
189+
)
190+
else:
191+
check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph)
192+
check_refresh_rate(refresh_rate=refresh_rate)
193+
super().__init__(
190194
executor=create_flux_executor(
191195
max_workers=max_workers,
192196
cache_directory=cache_directory,
@@ -199,31 +203,11 @@ def __new__(
199203
hostname_localhost=hostname_localhost,
200204
block_allocation=block_allocation,
201205
init_function=init_function,
202-
),
203-
max_cores=max_cores,
204-
refresh_rate=refresh_rate,
205-
plot_dependency_graph=plot_dependency_graph,
206-
plot_dependency_graph_filename=plot_dependency_graph_filename,
207-
)
208-
else:
209-
check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph)
210-
check_refresh_rate(refresh_rate=refresh_rate)
211-
return create_flux_executor(
212-
max_workers=max_workers,
213-
cache_directory=cache_directory,
214-
max_cores=max_cores,
215-
resource_dict=resource_dict,
216-
flux_executor=flux_executor,
217-
flux_executor_pmi_mode=flux_executor_pmi_mode,
218-
flux_executor_nesting=flux_executor_nesting,
219-
flux_log_files=flux_log_files,
220-
hostname_localhost=hostname_localhost,
221-
block_allocation=block_allocation,
222-
init_function=init_function,
206+
)
223207
)
224208

225209

226-
class FluxClusterExecutor:
210+
class FluxClusterExecutor(ExecutorInterface):
227211
"""
228212
The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or
229213
preferable the flux framework for distributing python functions within a given resource allocation. In contrast to
@@ -299,24 +283,6 @@ def __init__(
299283
refresh_rate: float = 0.01,
300284
plot_dependency_graph: bool = False,
301285
plot_dependency_graph_filename: Optional[str] = None,
302-
):
303-
# Use __new__() instead of __init__(). This function is only implemented to enable auto-completion.
304-
pass
305-
306-
def __new__(
307-
cls,
308-
max_workers: Optional[int] = None,
309-
cache_directory: Optional[str] = None,
310-
max_cores: Optional[int] = None,
311-
resource_dict: Optional[dict] = None,
312-
pysqa_config_directory: Optional[str] = None,
313-
hostname_localhost: Optional[bool] = None,
314-
block_allocation: bool = False,
315-
init_function: Optional[Callable] = None,
316-
disable_dependencies: bool = False,
317-
refresh_rate: float = 0.01,
318-
plot_dependency_graph: bool = False,
319-
plot_dependency_graph_filename: Optional[str] = None,
320286
):
321287
"""
322288
Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor,
@@ -377,41 +343,45 @@ def __new__(
377343
if not plot_dependency_graph:
378344
from executorlib.cache.executor import create_file_executor
379345

380-
return create_file_executor(
381-
max_workers=max_workers,
382-
backend="flux_submission",
383-
max_cores=max_cores,
384-
cache_directory=cache_directory,
385-
resource_dict=resource_dict,
386-
flux_executor=None,
387-
flux_executor_pmi_mode=None,
388-
flux_executor_nesting=False,
389-
flux_log_files=False,
390-
pysqa_config_directory=pysqa_config_directory,
391-
hostname_localhost=hostname_localhost,
392-
block_allocation=block_allocation,
393-
init_function=init_function,
394-
disable_dependencies=disable_dependencies,
395-
)
396-
else:
397-
return DependencyExecutor(
398-
executor=create_flux_executor(
346+
super().__init__(
347+
executor=create_file_executor(
399348
max_workers=max_workers,
400-
cache_directory=cache_directory,
349+
backend="flux_submission",
401350
max_cores=max_cores,
351+
cache_directory=cache_directory,
402352
resource_dict=resource_dict,
403353
flux_executor=None,
404354
flux_executor_pmi_mode=None,
405355
flux_executor_nesting=False,
406356
flux_log_files=False,
357+
pysqa_config_directory=pysqa_config_directory,
407358
hostname_localhost=hostname_localhost,
408359
block_allocation=block_allocation,
409360
init_function=init_function,
410-
),
411-
max_cores=max_cores,
412-
refresh_rate=refresh_rate,
413-
plot_dependency_graph=plot_dependency_graph,
414-
plot_dependency_graph_filename=plot_dependency_graph_filename,
361+
disable_dependencies=disable_dependencies,
362+
)
363+
)
364+
else:
365+
super().__init__(
366+
executor=DependencyExecutor(
367+
executor=create_flux_executor(
368+
max_workers=max_workers,
369+
cache_directory=cache_directory,
370+
max_cores=max_cores,
371+
resource_dict=resource_dict,
372+
flux_executor=None,
373+
flux_executor_pmi_mode=None,
374+
flux_executor_nesting=False,
375+
flux_log_files=False,
376+
hostname_localhost=hostname_localhost,
377+
block_allocation=block_allocation,
378+
init_function=init_function,
379+
),
380+
max_cores=max_cores,
381+
refresh_rate=refresh_rate,
382+
plot_dependency_graph=plot_dependency_graph,
383+
plot_dependency_graph_filename=plot_dependency_graph_filename,
384+
)
415385
)
416386

417387

0 commit comments

Comments
 (0)