Skip to content

Commit 94dfa66

Browse files
authored
SlurmClusterExecutor - create a separate working directory for each job (#698)
* SlurmClusterExecutor - create a separate working directory for each job Even when the user does not define a specific working directory, otherwise the submission script and the error log gets overwritten. * The cache directory is a mandatory input * extend tests * test length after successful execution * fix
1 parent a951651 commit 94dfa66

File tree

2 files changed

+19
-3
lines changed

2 files changed

+19
-3
lines changed

executorlib/task_scheduler/file/queue_spawner.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,19 @@
1010

1111
def execute_with_pysqa(
1212
command: list,
13+
cache_directory: str,
1314
task_dependent_lst: Optional[list[int]] = None,
1415
file_name: Optional[str] = None,
1516
resource_dict: Optional[dict] = None,
1617
config_directory: Optional[str] = None,
1718
backend: Optional[str] = None,
18-
cache_directory: Optional[str] = None,
1919
) -> Optional[int]:
2020
"""
2121
Execute a command by submitting it to the queuing system
2222
2323
Args:
2424
command (list): The command to be executed.
25+
cache_directory (str): The directory to store the HDF5 files.
2526
task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to [].
2627
file_name (str): Name of the HDF5 file which contains the Python function
2728
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the function.
@@ -30,7 +31,6 @@ def execute_with_pysqa(
3031
}
3132
config_directory (str, optional): path to the config directory.
3233
backend (str, optional): name of the backend used to spawn tasks.
33-
cache_directory (str): The directory to store the HDF5 files.
3434
3535
Returns:
3636
int: queuing system ID
@@ -50,7 +50,9 @@ def execute_with_pysqa(
5050
if "cwd" in resource_dict and resource_dict["cwd"] is not None:
5151
cwd = resource_dict["cwd"]
5252
else:
53-
cwd = cache_directory
53+
folder = command[-1].split("_i.h5")[0]
54+
cwd = os.path.join(cache_directory, folder)
55+
os.makedirs(cwd, exist_ok=True)
5456
submit_kwargs = {
5557
"command": " ".join(command),
5658
"dependency_list": [str(qid) for qid in task_dependent_lst],

tests/test_fluxclusterexecutor.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,20 @@ def test_executor(self):
4141
fs1 = exe.submit(mpi_funct, 1)
4242
self.assertFalse(fs1.done())
4343
self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)])
44+
self.assertEqual(len(os.listdir("executorlib_cache")), 4)
45+
self.assertTrue(fs1.done())
46+
47+
def test_executor_no_cwd(self):
48+
with FluxClusterExecutor(
49+
resource_dict={"cores": 2},
50+
block_allocation=False,
51+
cache_directory="executorlib_cache",
52+
) as exe:
53+
cloudpickle_register(ind=1)
54+
fs1 = exe.submit(mpi_funct, 1)
55+
self.assertFalse(fs1.done())
56+
self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)])
57+
self.assertEqual(len(os.listdir("executorlib_cache")), 2)
4458
self.assertTrue(fs1.done())
4559

4660
def tearDown(self):

0 commit comments

Comments
 (0)