Skip to content

Commit 11d44cc

Browse files
Add linting (#555)
* Add linting * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * some fixes * more fixes * more and more fixes * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * more fixes * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * more fixes * more strict --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 5ec2a20 commit 11d44cc

21 files changed

+176
-117
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ repos:
44
hooks:
55
- id: ruff
66
name: ruff lint
7-
args: ["--select", "I", "--fix"]
7+
args: ["--fix"]
88
files: ^executorlib/
99
- id: ruff-format
1010
name: ruff format

executorlib/backend/cache_parallel.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import pickle
22
import sys
33
import time
4-
from typing import Any
54

65
import cloudpickle
76

@@ -40,10 +39,7 @@ def main() -> None:
4039
apply_dict = backend_load_file(file_name=file_name)
4140
apply_dict = MPI.COMM_WORLD.bcast(apply_dict, root=0)
4241
output = apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"])
43-
if mpi_size_larger_one:
44-
result = MPI.COMM_WORLD.gather(output, root=0)
45-
else:
46-
result = output
42+
result = MPI.COMM_WORLD.gather(output, root=0) if mpi_size_larger_one else output
4743
if mpi_rank_zero:
4844
backend_write_file(
4945
file_name=file_name,

executorlib/backend/interactive_parallel.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,17 +57,17 @@ def main() -> None:
5757
input_dict = MPI.COMM_WORLD.bcast(input_dict, root=0)
5858

5959
# Parse input
60-
if "shutdown" in input_dict.keys() and input_dict["shutdown"]:
60+
if "shutdown" in input_dict and input_dict["shutdown"]:
6161
if mpi_rank_zero:
6262
interface_send(socket=socket, result_dict={"result": True})
6363
interface_shutdown(socket=socket, context=context)
6464
MPI.COMM_WORLD.Barrier()
6565
break
6666
elif (
67-
"fn" in input_dict.keys()
68-
and "init" not in input_dict.keys()
69-
and "args" in input_dict.keys()
70-
and "kwargs" in input_dict.keys()
67+
"fn" in input_dict
68+
and "init" not in input_dict
69+
and "args" in input_dict
70+
and "kwargs" in input_dict
7171
):
7272
# Execute function
7373
try:
@@ -87,10 +87,10 @@ def main() -> None:
8787
if mpi_rank_zero:
8888
interface_send(socket=socket, result_dict={"result": output_reply})
8989
elif (
90-
"init" in input_dict.keys()
90+
"init" in input_dict
9191
and input_dict["init"]
92-
and "args" in input_dict.keys()
93-
and "kwargs" in input_dict.keys()
92+
and "args" in input_dict
93+
and "kwargs" in input_dict
9494
):
9595
memory = call_funct(input_dict=input_dict, funct=None)
9696

executorlib/backend/interactive_serial.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import sys
22
from os.path import abspath
3-
from typing import List, Optional
3+
from typing import Optional
44

55
from executorlib.standalone.interactive.backend import call_funct, parse_arguments
66
from executorlib.standalone.interactive.communication import (
@@ -11,7 +11,7 @@
1111
)
1212

1313

14-
def main(argument_lst: Optional[List[str]] = None):
14+
def main(argument_lst: Optional[list[str]] = None):
1515
"""
1616
The main function of the program.
1717
@@ -40,15 +40,15 @@ def main(argument_lst: Optional[List[str]] = None):
4040
input_dict = interface_receive(socket=socket)
4141

4242
# Parse input
43-
if "shutdown" in input_dict.keys() and input_dict["shutdown"]:
43+
if "shutdown" in input_dict and input_dict["shutdown"]:
4444
interface_send(socket=socket, result_dict={"result": True})
4545
interface_shutdown(socket=socket, context=context)
4646
break
4747
elif (
48-
"fn" in input_dict.keys()
49-
and "init" not in input_dict.keys()
50-
and "args" in input_dict.keys()
51-
and "kwargs" in input_dict.keys()
48+
"fn" in input_dict
49+
and "init" not in input_dict
50+
and "args" in input_dict
51+
and "kwargs" in input_dict
5252
):
5353
# Execute function
5454
try:
@@ -62,10 +62,10 @@ def main(argument_lst: Optional[List[str]] = None):
6262
# Send output
6363
interface_send(socket=socket, result_dict={"result": output})
6464
elif (
65-
"init" in input_dict.keys()
65+
"init" in input_dict
6666
and input_dict["init"]
67-
and "args" in input_dict.keys()
68-
and "kwargs" in input_dict.keys()
67+
and "args" in input_dict
68+
and "kwargs" in input_dict
6969
):
7070
memory = call_funct(input_dict=input_dict, funct=None)
7171

executorlib/base/executor.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1+
import contextlib
12
import queue
23
from concurrent.futures import (
34
Executor as FutureExecutor,
45
)
56
from concurrent.futures import (
67
Future,
78
)
8-
from typing import Callable, List, Optional, Union
9+
from typing import Callable, Optional, Union
910

1011
from executorlib.standalone.inputcheck import check_resource_dict
1112
from executorlib.standalone.queue import cancel_items_in_queue
@@ -28,7 +29,7 @@ def __init__(self, max_cores: Optional[int] = None):
2829
cloudpickle_register(ind=3)
2930
self._max_cores = max_cores
3031
self._future_queue: Optional[queue.Queue] = queue.Queue()
31-
self._process: Optional[Union[RaisingThread, List[RaisingThread]]] = None
32+
self._process: Optional[Union[RaisingThread, list[RaisingThread]]] = None
3233

3334
@property
3435
def info(self) -> Optional[dict]:
@@ -40,13 +41,13 @@ def info(self) -> Optional[dict]:
4041
"""
4142
if self._process is not None and isinstance(self._process, list):
4243
meta_data_dict = self._process[0].get_kwargs().copy()
43-
if "future_queue" in meta_data_dict.keys():
44+
if "future_queue" in meta_data_dict:
4445
del meta_data_dict["future_queue"]
4546
meta_data_dict["max_workers"] = len(self._process)
4647
return meta_data_dict
4748
elif self._process is not None:
4849
meta_data_dict = self._process.get_kwargs().copy()
49-
if "future_queue" in meta_data_dict.keys():
50+
if "future_queue" in meta_data_dict:
5051
del meta_data_dict["future_queue"]
5152
return meta_data_dict
5253
else:
@@ -62,7 +63,13 @@ def future_queue(self) -> Optional[queue.Queue]:
6263
"""
6364
return self._future_queue
6465

65-
def submit(self, fn: Callable, *args, resource_dict: dict = {}, **kwargs) -> Future: # type: ignore
66+
def submit( # type: ignore
67+
self,
68+
fn: Callable,
69+
*args,
70+
resource_dict: Optional[dict] = None,
71+
**kwargs,
72+
) -> Future:
6673
"""
6774
Submits a callable to be executed with the given arguments.
6875
@@ -87,7 +94,9 @@ def submit(self, fn: Callable, *args, resource_dict: dict = {}, **kwargs) -> Fut
8794
Returns:
8895
Future: A Future representing the given call.
8996
"""
90-
cores = resource_dict.get("cores", None)
97+
if resource_dict is None:
98+
resource_dict = {}
99+
cores = resource_dict.get("cores")
91100
if (
92101
cores is not None
93102
and self._max_cores is not None
@@ -161,7 +170,5 @@ def __del__(self):
161170
"""
162171
Clean-up the resources associated with the Executor.
163172
"""
164-
try:
173+
with contextlib.suppress(AttributeError, RuntimeError):
165174
self.shutdown(wait=False)
166-
except (AttributeError, RuntimeError):
167-
pass

executorlib/cache/queue_spawner.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import os
22
import subprocess
3-
from typing import List, Optional, Tuple, Union
3+
from typing import Optional, Union
44

55
from pysqa import QueueAdapter
66

@@ -10,7 +10,7 @@
1010

1111
def execute_with_pysqa(
1212
command: list,
13-
task_dependent_lst: list[int] = [],
13+
task_dependent_lst: Optional[list[int]] = None,
1414
file_name: Optional[str] = None,
1515
resource_dict: Optional[dict] = None,
1616
config_directory: Optional[str] = None,
@@ -35,6 +35,8 @@ def execute_with_pysqa(
3535
Returns:
3636
int: queuing system ID
3737
"""
38+
if task_dependent_lst is None:
39+
task_dependent_lst = []
3840
check_file_exists(file_name=file_name)
3941
queue_id = get_queue_id(file_name=file_name)
4042
qa = QueueAdapter(
@@ -79,7 +81,7 @@ def _pysqa_execute_command(
7981
split_output: bool = True,
8082
shell: bool = False,
8183
error_filename: str = "pysqa.err",
82-
) -> Union[str, List[str]]:
84+
) -> Union[str, list[str]]:
8385
"""
8486
A wrapper around the subprocess.check_output function. Modified from pysqa to raise an exception if the subprocess
8587
fails to submit the job to the queue.

executorlib/cache/shared.py

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1+
import contextlib
12
import importlib.util
23
import os
34
import queue
45
import sys
56
from concurrent.futures import Future
6-
from typing import Any, Callable, Optional, Tuple
7+
from typing import Any, Callable, Optional
78

89
from executorlib.standalone.command import get_command_path
910
from executorlib.standalone.hdf import dump, get_output
@@ -79,15 +80,9 @@ def execute_tasks_h5(
7980
file_name_dict: dict = {}
8081
while True:
8182
task_dict = None
82-
try:
83+
with contextlib.suppress(queue.Empty):
8384
task_dict = future_queue.get_nowait()
84-
except queue.Empty:
85-
pass
86-
if (
87-
task_dict is not None
88-
and "shutdown" in task_dict.keys()
89-
and task_dict["shutdown"]
90-
):
85+
if task_dict is not None and "shutdown" in task_dict and task_dict["shutdown"]:
9186
if terminate_function is not None:
9287
for task in process_dict.values():
9388
terminate_function(task=task)
@@ -110,7 +105,7 @@ def execute_tasks_h5(
110105
fn_kwargs=task_kwargs,
111106
resource_dict=task_resource_dict,
112107
)
113-
if task_key not in memory_dict.keys():
108+
if task_key not in memory_dict:
114109
if task_key + ".h5out" not in os.listdir(cache_directory):
115110
file_name = os.path.join(cache_directory, task_key + ".h5in")
116111
dump(file_name=file_name, data_dict=data_dict)
@@ -204,7 +199,7 @@ def _check_task_output(
204199

205200
def _convert_args_and_kwargs(
206201
task_dict: dict, memory_dict: dict, file_name_dict: dict
207-
) -> Tuple[list, dict, list]:
202+
) -> tuple[list, dict, list]:
208203
"""
209204
Convert the arguments and keyword arguments in a task dictionary to the appropriate types.
210205

executorlib/cache/subprocess_spawner.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
def execute_in_subprocess(
99
command: list,
10-
task_dependent_lst: list = [],
10+
task_dependent_lst: Optional[list] = None,
1111
file_name: Optional[str] = None,
1212
resource_dict: Optional[dict] = None,
1313
config_directory: Optional[str] = None,
@@ -33,6 +33,8 @@ def execute_in_subprocess(
3333
subprocess.Popen: The subprocess object.
3434
3535
"""
36+
if task_dependent_lst is None:
37+
task_dependent_lst = []
3638
check_file_exists(file_name=file_name)
3739
while len(task_dependent_lst) > 0:
3840
task_dependent_lst = [
@@ -46,10 +48,7 @@ def execute_in_subprocess(
4648
raise ValueError("backend parameter is not supported for subprocess spawner.")
4749
if resource_dict is None:
4850
resource_dict = {}
49-
if "cwd" in resource_dict:
50-
cwd = resource_dict["cwd"]
51-
else:
52-
cwd = cache_directory
51+
cwd = resource_dict.get("cwd", cache_directory)
5352
return subprocess.Popen(command, universal_newlines=True, cwd=cwd)
5453

5554

executorlib/interactive/create.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def create_executor(
3535
backend: str = "local",
3636
max_cores: Optional[int] = None,
3737
cache_directory: Optional[str] = None,
38-
resource_dict: dict = {},
38+
resource_dict: Optional[dict] = None,
3939
flux_executor=None,
4040
flux_executor_pmi_mode: Optional[str] = None,
4141
flux_executor_nesting: bool = False,
@@ -83,6 +83,8 @@ def create_executor(
8383
of the individual function.
8484
init_function (None): optional function to preset arguments for functions which are submitted later
8585
"""
86+
if resource_dict is None:
87+
resource_dict = {}
8688
if flux_executor is not None and backend != "flux_allocation":
8789
backend = "flux_allocation"
8890
if backend == "flux_allocation":
@@ -149,7 +151,7 @@ def create_flux_allocation_executor(
149151
max_workers: Optional[int] = None,
150152
max_cores: Optional[int] = None,
151153
cache_directory: Optional[str] = None,
152-
resource_dict: dict = {},
154+
resource_dict: Optional[dict] = None,
153155
flux_executor=None,
154156
flux_executor_pmi_mode: Optional[str] = None,
155157
flux_executor_nesting: bool = False,
@@ -160,16 +162,18 @@ def create_flux_allocation_executor(
160162
) -> Union[InteractiveStepExecutor, InteractiveExecutor]:
161163
check_init_function(block_allocation=block_allocation, init_function=init_function)
162164
check_pmi(backend="flux_allocation", pmi=flux_executor_pmi_mode)
165+
if resource_dict is None:
166+
resource_dict = {}
163167
cores_per_worker = resource_dict.get("cores", 1)
164168
resource_dict["cache_directory"] = cache_directory
165169
resource_dict["hostname_localhost"] = hostname_localhost
166170
check_oversubscribe(oversubscribe=resource_dict.get("openmpi_oversubscribe", False))
167171
check_command_line_argument_lst(
168172
command_line_argument_lst=resource_dict.get("slurm_cmd_args", [])
169173
)
170-
if "openmpi_oversubscribe" in resource_dict.keys():
174+
if "openmpi_oversubscribe" in resource_dict:
171175
del resource_dict["openmpi_oversubscribe"]
172-
if "slurm_cmd_args" in resource_dict.keys():
176+
if "slurm_cmd_args" in resource_dict:
173177
del resource_dict["slurm_cmd_args"]
174178
resource_dict["flux_executor"] = flux_executor
175179
resource_dict["flux_executor_pmi_mode"] = flux_executor_pmi_mode
@@ -206,12 +210,14 @@ def create_slurm_allocation_executor(
206210
max_workers: Optional[int] = None,
207211
max_cores: Optional[int] = None,
208212
cache_directory: Optional[str] = None,
209-
resource_dict: dict = {},
213+
resource_dict: Optional[dict] = None,
210214
hostname_localhost: Optional[bool] = None,
211215
block_allocation: bool = False,
212216
init_function: Optional[Callable] = None,
213217
) -> Union[InteractiveStepExecutor, InteractiveExecutor]:
214218
check_init_function(block_allocation=block_allocation, init_function=init_function)
219+
if resource_dict is None:
220+
resource_dict = {}
215221
cores_per_worker = resource_dict.get("cores", 1)
216222
resource_dict["cache_directory"] = cache_directory
217223
resource_dict["hostname_localhost"] = hostname_localhost
@@ -246,12 +252,14 @@ def create_local_executor(
246252
max_workers: Optional[int] = None,
247253
max_cores: Optional[int] = None,
248254
cache_directory: Optional[str] = None,
249-
resource_dict: dict = {},
255+
resource_dict: Optional[dict] = None,
250256
hostname_localhost: Optional[bool] = None,
251257
block_allocation: bool = False,
252258
init_function: Optional[Callable] = None,
253259
) -> Union[InteractiveStepExecutor, InteractiveExecutor]:
254260
check_init_function(block_allocation=block_allocation, init_function=init_function)
261+
if resource_dict is None:
262+
resource_dict = {}
255263
cores_per_worker = resource_dict.get("cores", 1)
256264
resource_dict["cache_directory"] = cache_directory
257265
resource_dict["hostname_localhost"] = hostname_localhost
@@ -260,11 +268,11 @@ def create_local_executor(
260268
check_command_line_argument_lst(
261269
command_line_argument_lst=resource_dict.get("slurm_cmd_args", [])
262270
)
263-
if "threads_per_core" in resource_dict.keys():
271+
if "threads_per_core" in resource_dict:
264272
del resource_dict["threads_per_core"]
265-
if "gpus_per_core" in resource_dict.keys():
273+
if "gpus_per_core" in resource_dict:
266274
del resource_dict["gpus_per_core"]
267-
if "slurm_cmd_args" in resource_dict.keys():
275+
if "slurm_cmd_args" in resource_dict:
268276
del resource_dict["slurm_cmd_args"]
269277
if block_allocation:
270278
resource_dict["init_function"] = init_function

0 commit comments

Comments
 (0)