Skip to content

Commit 83691dd

Browse files
Clean up: warn when functionality is not available (#638)
* Clean up: warn when functionality is not available * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Move import into function * refactor * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix minimal test * fix import --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent ba0f8ab commit 83691dd

File tree

11 files changed

+67
-49
lines changed

11 files changed

+67
-49
lines changed

executorlib/__init__.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,15 @@
88
SlurmClusterExecutor,
99
SlurmJobExecutor,
1010
)
11+
from executorlib.standalone.cache import get_cache_data
1112

1213
__all__: list[str] = [
14+
"get_cache_data",
1315
"FluxJobExecutor",
1416
"FluxClusterExecutor",
1517
"SingleNodeExecutor",
1618
"SlurmJobExecutor",
1719
"SlurmClusterExecutor",
1820
]
1921

20-
try:
21-
from executorlib.standalone.hdf import get_cache_data
22-
except ImportError:
23-
pass
24-
else:
25-
__all__ += ["get_cache_data"]
26-
2722
__version__ = _get_versions()["version"]

executorlib/executor/flux.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import contextlib
21
from typing import Callable, Optional, Union
32

43
from executorlib.executor.base import ExecutorBase
@@ -17,12 +16,6 @@
1716
from executorlib.task_scheduler.interactive.dependency import DependencyTaskScheduler
1817
from executorlib.task_scheduler.interactive.onetoone import OneProcessTaskScheduler
1918

20-
with contextlib.suppress(ImportError):
21-
from executorlib.task_scheduler.interactive.fluxspawner import (
22-
FluxPythonSpawner,
23-
validate_max_workers,
24-
)
25-
2619

2720
class FluxJobExecutor(ExecutorBase):
2821
"""
@@ -440,6 +433,11 @@ def create_flux_executor(
440433
Returns:
441434
InteractiveStepExecutor/ InteractiveExecutor
442435
"""
436+
from executorlib.task_scheduler.interactive.fluxspawner import (
437+
FluxPythonSpawner,
438+
validate_max_workers,
439+
)
440+
443441
if resource_dict is None:
444442
resource_dict = {}
445443
cores_per_worker = resource_dict.get("cores", 1)

executorlib/standalone/cache.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import os
2+
3+
import cloudpickle
4+
import numpy as np
5+
6+
group_dict = {
7+
"fn": "function",
8+
"args": "input_args",
9+
"kwargs": "input_kwargs",
10+
"output": "output",
11+
"error": "error",
12+
"runtime": "runtime",
13+
"queue_id": "queue_id",
14+
}
15+
16+
17+
def get_cache_data(cache_directory: str) -> list[dict]:
18+
"""
19+
Collect all HDF5 files in the cache directory
20+
21+
Args:
22+
cache_directory (str): The directory to store cache files.
23+
24+
Returns:
25+
list[dict]: List of dictionaries each representing on of the HDF5 files in the cache directory.
26+
"""
27+
import h5py
28+
29+
file_lst = []
30+
for task_key in os.listdir(cache_directory):
31+
file_name = os.path.join(cache_directory, task_key, "cache.h5out")
32+
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
33+
if os.path.exists(file_name):
34+
with h5py.File(file_name, "r") as hdf:
35+
file_content_dict = {
36+
key: cloudpickle.loads(np.void(hdf["/" + key]))
37+
for key in group_dict.values()
38+
if key in hdf
39+
}
40+
file_content_dict["filename"] = file_name
41+
file_lst.append(file_content_dict)
42+
return file_lst

executorlib/task_scheduler/file/backend.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import time
33
from typing import Any
44

5-
from executorlib.standalone.hdf import dump, load
5+
from executorlib.task_scheduler.file.hdf import dump, load
66
from executorlib.task_scheduler.file.shared import FutureItem
77

88

executorlib/standalone/hdf.py renamed to executorlib/task_scheduler/file/hdf.py

Lines changed: 10 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,10 @@
1-
import os
21
from typing import Any, Optional
32

43
import cloudpickle
54
import h5py
65
import numpy as np
76

8-
group_dict = {
9-
"fn": "function",
10-
"args": "input_args",
11-
"kwargs": "input_kwargs",
12-
"output": "output",
13-
"error": "error",
14-
"runtime": "runtime",
15-
"queue_id": "queue_id",
16-
}
7+
from executorlib.standalone.cache import group_dict
178

189

1910
def dump(file_name: Optional[str], data_dict: dict) -> None:
@@ -98,25 +89,17 @@ def get_runtime(file_name: str) -> float:
9889

9990

10091
def get_queue_id(file_name: Optional[str]) -> Optional[int]:
92+
"""
93+
Get queuing system id from HDF5 file
94+
95+
Args:
96+
file_name (str): file name of the HDF5 file as absolute path
97+
98+
Returns:
99+
int: queuing system id from the execution of the python function
100+
"""
101101
if file_name is not None:
102102
with h5py.File(file_name, "r") as hdf:
103103
if "queue_id" in hdf:
104104
return cloudpickle.loads(np.void(hdf["/queue_id"]))
105105
return None
106-
107-
108-
def get_cache_data(cache_directory: str) -> list[dict]:
109-
file_lst = []
110-
for task_key in os.listdir(cache_directory):
111-
file_name = os.path.join(cache_directory, task_key, "cache.h5out")
112-
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
113-
if os.path.exists(file_name):
114-
with h5py.File(file_name, "r") as hdf:
115-
file_content_dict = {
116-
key: cloudpickle.loads(np.void(hdf["/" + key]))
117-
for key in group_dict.values()
118-
if key in hdf
119-
}
120-
file_content_dict["filename"] = file_name
121-
file_lst.append(file_content_dict)
122-
return file_lst

executorlib/task_scheduler/file/queue_spawner.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44

55
from pysqa import QueueAdapter
66

7-
from executorlib.standalone.hdf import dump, get_queue_id
87
from executorlib.standalone.inputcheck import check_file_exists
8+
from executorlib.task_scheduler.file.hdf import dump, get_queue_id
99

1010

1111
def execute_with_pysqa(

executorlib/task_scheduler/file/shared.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
from typing import Any, Callable, Optional
88

99
from executorlib.standalone.command import get_command_path
10-
from executorlib.standalone.hdf import dump, get_output
1110
from executorlib.standalone.serialize import serialize_funct_h5
11+
from executorlib.task_scheduler.file.hdf import dump, get_output
1212

1313

1414
class FutureItem:

executorlib/task_scheduler/interactive/shared.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ def _execute_task_with_cache(
140140
future_queue (Queue): Queue for receiving new tasks.
141141
cache_directory (str): The directory to store cache files.
142142
"""
143-
from executorlib.standalone.hdf import dump, get_output
143+
from executorlib.task_scheduler.file.hdf import dump, get_output
144144

145145
task_key, data_dict = serialize_funct_h5(
146146
fn=task_dict["fn"],

tests/test_cache_backend_execute.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
try:
88
from executorlib.task_scheduler.file.backend import backend_execute_task_in_file
99
from executorlib.task_scheduler.file.shared import _check_task_output, FutureItem
10-
from executorlib.standalone.hdf import dump, get_runtime
10+
from executorlib.task_scheduler.file.hdf import dump, get_runtime
1111
from executorlib.standalone.serialize import serialize_funct_h5
1212

1313
skip_h5io_test = False

tests/test_singlenodeexecutor_cache.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22
import shutil
33
import unittest
44

5-
from executorlib import SingleNodeExecutor
5+
from executorlib import SingleNodeExecutor, get_cache_data
66
from executorlib.standalone.serialize import cloudpickle_register
77

88
try:
9-
from executorlib import get_cache_data
9+
import h5py
1010

1111
skip_h5py_test = False
1212
except ImportError:

0 commit comments

Comments
 (0)