Skip to content

Commit e019a0b

Browse files
SlurmClusterExecutor - resubmission of crashed jobs (#699)
* Fix SlurmClusterExecutor bug * Format black * SlurmClusterExecutor - resubmission of crashed jobs * remove file rather than try and except * add test * extend tests * fixes * fix file write * fix import * input files --------- Co-authored-by: pyiron-runner <[email protected]>
1 parent 94dfa66 commit e019a0b

File tree

2 files changed

+34
-0
lines changed

2 files changed

+34
-0
lines changed

executorlib/task_scheduler/file/shared.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@ def execute_tasks_h5(
116116
cache_directory, task_key + "_o.h5"
117117
) not in get_cache_files(cache_directory=cache_directory):
118118
file_name = os.path.join(cache_directory, task_key + "_i.h5")
119+
if os.path.exists(file_name):
120+
os.remove(file_name)
119121
dump(file_name=file_name, data_dict=data_dict)
120122
if not disable_dependencies:
121123
task_dependent_lst = [

tests/test_fluxclusterexecutor.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
try:
1010
import flux.job
11+
from executorlib.task_scheduler.file.hdf import dump
1112

1213
skip_flux_test = "FLUX_URI" not in os.environ
1314
pmi = os.environ.get("EXECUTORLIB_PMIX", None)
@@ -57,5 +58,36 @@ def test_executor_no_cwd(self):
5758
self.assertEqual(len(os.listdir("executorlib_cache")), 2)
5859
self.assertTrue(fs1.done())
5960

61+
def test_executor_existing_files(self):
62+
with FluxClusterExecutor(
63+
resource_dict={"cores": 2, "cwd": "executorlib_cache"},
64+
block_allocation=False,
65+
cache_directory="executorlib_cache",
66+
) as exe:
67+
cloudpickle_register(ind=1)
68+
fs1 = exe.submit(mpi_funct, 1)
69+
self.assertFalse(fs1.done())
70+
self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)])
71+
self.assertTrue(fs1.done())
72+
self.assertEqual(len(os.listdir("executorlib_cache")), 4)
73+
for file_name in os.listdir("executorlib_cache"):
74+
file_path = os.path.join("executorlib_cache", file_name )
75+
os.remove(file_path)
76+
if ".h5" in file_path:
77+
task_key = file_path[:-5] + "_i.h5"
78+
dump(file_name=task_key, data_dict={"a": 1})
79+
80+
with FluxClusterExecutor(
81+
resource_dict={"cores": 2, "cwd": "executorlib_cache"},
82+
block_allocation=False,
83+
cache_directory="executorlib_cache",
84+
) as exe:
85+
cloudpickle_register(ind=1)
86+
fs1 = exe.submit(mpi_funct, 1)
87+
self.assertFalse(fs1.done())
88+
self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)])
89+
self.assertTrue(fs1.done())
90+
self.assertEqual(len(os.listdir("executorlib_cache")), 4)
91+
6092
def tearDown(self):
6193
shutil.rmtree("executorlib_cache", ignore_errors=True)

0 commit comments

Comments
 (0)