From bb3465f47f2030fb8bd148e606a69f384d51d58a Mon Sep 17 00:00:00 2001 From: mkphuthi Date: Fri, 15 Nov 2024 16:13:13 -0800 Subject: [PATCH] Barebones grouping, needs testing on slurm --- CHANGELOG.md | 30 +-- .../asimmodules/workflows/distributed.py | 5 +- asimtools/job.py | 211 ++++++++++-------- .../distributed_bash_sim_input.yaml | 38 ++++ .../distributed_group_bash_sim_input.yaml | 40 ++++ examples/internal/env_input.yaml | 4 + pyproject.toml | 1 + .../asimmodules/workflows/test_distributed.py | 28 +++ tests/conftest.py | 49 ++++ 9 files changed, 290 insertions(+), 116 deletions(-) create mode 100644 examples/internal/distributed/distributed_bash_sim_input.yaml create mode 100644 examples/internal/distributed/distributed_group_bash_sim_input.yaml diff --git a/CHANGELOG.md b/CHANGELOG.md index 294e321..f40080b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,25 +7,19 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [develop] - 2024-11-13 ### Added -- Added natural sorting of files in workflows -- Added benchmarking/distribution asimmodule -- Added data/collect_images asimmodule +- ASIMMODULE: benchmarking/distribution +- ASIMMODULE: data/collect_images asimmodule +- CALCULATOR: OMAT24 calculator +- VASP examples +- Jobs submitted as an array can now be grouped together in slurm jobs. See + distributed examples ### Changed - -### Fixed -- Bugs in phonopy asimmodule that would give incorrectly sorted EOS energies +- We now append rather than rewrite to stderr.txt and stdout.txt +- Now forcing natural sorting of files in workflows file-based arrays were + confusing - +- Bugs in phonopy asimmodule that would give incorrectly sorted EOS energies +- If an atoms object had unknown keys, writing the output extxyz file would + throw an error, now all the ASE sanctioned properties are written diff --git a/asimtools/asimmodules/workflows/distributed.py b/asimtools/asimmodules/workflows/distributed.py index e036496..bc9b9c9 100755 --- a/asimtools/asimmodules/workflows/distributed.py +++ b/asimtools/asimmodules/workflows/distributed.py @@ -14,6 +14,7 @@ def distributed( env_input: Optional[Dict] = None, calc_input: Optional[Dict] = None, array_max: Optional[int] = None, + group_size: int = 1, ) -> Dict: """Distributes a set of jobs based on inpout parameter. The scnarios are as follows: @@ -32,6 +33,8 @@ def distributed( :type calc_input: Optional[Dict], optional :param array_max: Maximum number of jobs to run in array, defaults to None :type array_max: Optional[int], optional + :param group_size: Number of jobs to group together, defaults to 1 + :type group_size: int, optional :return: Dictionary of results :rtype: Dict """ @@ -40,7 +43,7 @@ def distributed( env_input=env_input, calc_input=calc_input ) - job_ids = djob.submit(array_max=array_max) + job_ids = djob.submit(array_max=array_max, group_size=group_size) results = {'job_ids': job_ids} return results diff --git a/asimtools/job.py b/asimtools/job.py index e46f36b..1785d91 100644 --- a/asimtools/job.py +++ b/asimtools/job.py @@ -595,14 +595,106 @@ def __init__( ) if same_env and all_slurm: - self.use_slurm_array = True - elif same_env and all_bash: - self.use_bash_array = True + self.use_slurm = True else: - self.use_slurm_array = False + self.use_slurm = False + + if all_bash: + self.use_bash = True self.unitjobs = unitjobs + def _gen_array_script( + self, + write: bool = True, + group_size: int = 1, + ) -> None: + ''' + Generates a slurm job array file and if necessary + writes it in the work directory for the job. Only works + if there is one env_id for all jobs + ''' + env_id = self.unitjobs[0].env_id + env = self.env_input[env_id] + + slurm_params = env.get('slurm', {}) + + txt = self._gen_slurm_batch_preamble( + slurm_params=slurm_params, + extra_flags=[ + '-o slurm_stdout.id-%a_j%A', + '-e slurm_stderr.id-%a_j%A', + ] + ) + + txt += 'echo "Job started on `hostname` at `date`"\n' + txt += 'CUR_DIR=`pwd`\n' + txt += 'echo "LAUNCHDIR: ${CUR_DIR}"\n' + txt += f'GROUP_SIZE={group_size}\n' + seqtxt = '$(seq $((${SLURM_ARRAY_TASK_ID}*${GROUP_SIZE})) ' + seqtxt += '$(((${SLURM_ARRAY_TASK_ID}+1)*${GROUP_SIZE})))' + txt += f'WORKDIRS=($(ls -dv ./id-*))\n' + txt += f'for i in {seqtxt}; do\n' + txt += 'cd ${WORKDIRS[$i]};\n' + # else: + # txt += '\nif [[ ! -z ${SLURM_ARRAY_TASK_ID} ]]; then\n' + # txt += ' fls=( id-* )\n' + # txt += ' WORKDIR=${fls[${SLURM_ARRAY_TASK_ID}]}\n' + # txt += 'fi\n\n' + # txt += 'cd ${WORKDIR}\n' + txt += '\n' + txt += '\n'.join(slurm_params.get('precommands', [])) + txt += '\n' + txt += '\n'.join(self.unitjobs[0].calc_params.get('precommands', [])) + txt += '\n' + txt += 'echo "WORKDIR: ${WORKDIRS[$i]}"\n' + txt += self.unitjobs[0].gen_run_command() + '\n' + txt += '\n'.join(slurm_params.get('postcommands', [])) + txt += '\n' + txt += 'cd ${CUR_DIR}\n' + txt += 'done\n' + txt += 'echo "Job ended at `date`"' + + if write: + slurm_file = self.workdir / 'job_array.sh' + slurm_file.write_text(txt) + + def _gen_bash_script( + self, + write: bool = True, + ) -> None: + ''' + Generates a bash script for job submission and if necessary + writes it in the work directory for the job. Only works + if there is one env_id for all jobs + ''' + env_id = self.unitjobs[0].env_id + env = self.env_input[env_id] + + txt = '#!/usr/bin/env sh\n\n' + # txt += f'WDIRS=($(ls -dv ./id-*))\n' + txt += f'for WORKDIR in id-*; do\n' + txt += ' cd ${WORKDIR};\n' + txt += '\n'.join(self.unitjobs[0].calc_params.get('precommands', [])) + txt += '\n asim-run sim_input.yaml -c calc_input.yaml\n' + txt += '\n'.join(self.unitjobs[0].calc_params.get('precommands', [])) + txt += f'\n cd ../;\n' + txt += 'done' + + if write: + script_file = self.workdir / 'bash_script.sh' + script_file.write_text(txt) + + def gen_input_files(self) -> None: + ''' Write input files to working directory ''' + + if self.use_slurm: + self._gen_array_script() + if self.use_bash: + self._gen_bash_script() + for unitjob in self.unitjobs: + unitjob.gen_input_files() + def submit_jobs( self, **kwargs, @@ -639,10 +731,10 @@ def submit_bash_array( return None command = [ - 'source ', - 'job_script.sh', + 'sh', + './bash_script.sh', ] - + print(command) completed_process = subprocess.run( command, check=False, capture_output=True, text=True, ) @@ -661,8 +753,8 @@ def submit_bash_array( logger.error(err_msg) completed_process.check_returncode() - if os.env.get('SLURM_JOB_ID', False): - job_ids = [int(os.env['SLURM_JOB_ID'])] + if os.environ.get('SLURM_JOB_ID', False): + job_ids = [int(os.environ['SLURM_JOB_ID'])] else: job_ids = None return job_ids @@ -671,7 +763,8 @@ def submit_slurm_array( self, array_max=None, dependency: Union[List[str],None] = None, - **kwargs, # Necessary for compatibility with job.submit + group_size: int = 1, + **kwargs, ) -> Union[None,List[int]]: ''' Submits a job array if all the jobs have the same env and use slurm @@ -698,6 +791,15 @@ def submit_slurm_array( else: arr_max_str = '' + if group_size > 1: + nslurm_jobs = int(np.ceil(njobs / group_size)) + self._gen_array_script( + dist_ids=f'0-{nslurm_jobs-1}{arr_max_str}' + ) + # if group_size is not 1 + # write the bash submission script + # the submit command is to submit the bash script + if dependency is not None: dependstr = None for dep in dependency: @@ -738,91 +840,6 @@ def submit_slurm_array( job_ids = [int(completed_process.stdout.split(' ')[-1])] return job_ids - def _gen_array_script(self, write: bool = True) -> None: - ''' - Generates a slurm job array file and if necessary - writes it in the work directory for the job. Only works - if there is one env_id for all jobs - ''' - env_id = self.unitjobs[0].env_id - env = self.env_input[env_id] - - slurm_params = env.get('slurm', {}) - - txt = self._gen_slurm_batch_preamble( - slurm_params=slurm_params, - extra_flags=[ - '-o slurm_stdout.id-%a_j%A', - '-e slurm_stderr.id-%a_j%A', - ] - ) - - txt += '\nif [[ ! -z ${SLURM_ARRAY_TASK_ID} ]]; then\n' - txt += ' fls=( id-* )\n' - txt += ' WORKDIR=${fls[${SLURM_ARRAY_TASK_ID}]}\n' - txt += 'fi\n\n' - txt += 'CUR_DIR=`pwd`\n' - txt += 'cd ${WORKDIR}\n' - txt += '\n' - txt += '\n'.join(slurm_params.get('precommands', [])) - txt += '\n' - txt += '\n'.join(self.unitjobs[0].calc_params.get('precommands', [])) - txt += '\n' - txt += 'echo "LAUNCHDIR: ${CUR_DIR}"\n' - txt += 'echo "WORKDIR: ${WORKDIR}"\n' - txt += 'echo "Job started on `hostname` at `date`"\n' - txt += self.unitjobs[0].gen_run_command() + '\n' - txt += '\n'.join(slurm_params.get('postcommands', [])) - txt += '\n' - txt += 'cd ${CUR_DIR}\n' - txt += 'echo "Job ended at `date`"' - - if write: - slurm_file = self.workdir / 'job_array.sh' - slurm_file.write_text(txt) - - def _gen_bash_script( - self, - write: bool = True, - dist_ids: Union[str,Sequence,None] = None - ) -> None: - ''' - Generates a bash script for job submission and if necessary - writes it in the work directory for the job. Only works - if there is one env_id for all jobs - ''' - env_id = self.unitjobs[0].env_id - env = self.env_input[env_id] - if dist_ids is None: - ids2run = '$(seq 1 $END)' - elif isinstance(dist_ids, str): - ids2run = parse_slice(dist_ids, bash=True) - else: - ids2run = '('+ ' '.join(dist_ids) + ')' - - txt = '#!/usr/bin/sh\n\n' - txt += f'WDIRS=($(ls -dv ./id-*))\n' - txt += f'for i in {ids2run}; do\n' - txt += ' cd ${{WDIRS[$i]}};\n' - txt += '\n'.join(self.unitjobs - [0].calc_params.get('precommands', [])) - txt += '\n asim-run sim_input.yaml -c calc_input.yaml\n' - txt += '\n'.join(self.unitjobs[0].calc_params.get('precommands', [])) - txt += f'\n cd {self.workdir};\n' - txt += 'done' - - if write: - script_file = self.workdir / 'job_script.sh' - script_file.write_text(txt) - - def gen_input_files(self) -> None: - ''' Write input files to working directory ''' - - if self.use_slurm_array: - self._gen_array_script() - for unitjob in self.unitjobs: - unitjob.gen_input_files() - def submit(self, **kwargs) -> None: ''' Submit a job using slurm, interactively or in the terminal @@ -831,9 +848,9 @@ def submit(self, **kwargs) -> None: cur_dir = Path('.').resolve() os.chdir(self.workdir) job_ids = None - if self.use_slurm_array: + if self.use_slurm: job_ids = self.submit_slurm_array(**kwargs) - elif self.use_bash_array: + elif self.use_bash: job_ids = self.submit_bash_array(**kwargs) else: job_ids = self.submit_jobs(**kwargs) diff --git a/examples/internal/distributed/distributed_bash_sim_input.yaml b/examples/internal/distributed/distributed_bash_sim_input.yaml new file mode 100644 index 0000000..be81ed5 --- /dev/null +++ b/examples/internal/distributed/distributed_bash_sim_input.yaml @@ -0,0 +1,38 @@ +asimmodule: workflows.distributed +workdir: distributed_bash_results +args: + subsim_inputs: + id-0000: + asimmodule: singlepoint + env_id: bash #If all env_ids are slurm batch jobs using the same configuration, we automatically use arrays + args: + calc_id: lj_Ar + image: + name: Ar + id-0001: + # Doesn't have to be same asimmodule + asimmodule: geometry_optimization.atom_relax + env_id: bash + args: + calc_id: lj_Ar + image: + name: Ar + id-0002: + # This is just the calc_array example copied and pasted! + asimmodule: workflows.calc_array + env_id: bash + args: + calc_ids: [emt, lj_Cu] + env_ids: [inline, inline] # Must correspond to calc_id if list is given + subsim_input: + asimmodule: elastic_constants.cubic_energy_expansion + args: + calc_id: emt + image: + name: Cu + a: 3.6 + cubic: true + ase_cubic_eos_args: # The minimum for every calculator should be in the e-v range + npoints: 5 + eps: 0.04 + eos_string: sj diff --git a/examples/internal/distributed/distributed_group_bash_sim_input.yaml b/examples/internal/distributed/distributed_group_bash_sim_input.yaml new file mode 100644 index 0000000..1fc42ca --- /dev/null +++ b/examples/internal/distributed/distributed_group_bash_sim_input.yaml @@ -0,0 +1,40 @@ +asimmodule: workflows.distributed +workdir: distributed_group_batch_results +args: + array_max: 2 # You can limit how many slurm jobs are run at the same time here (only for job arrays) + group_size: 2 + subsim_inputs: + id-0000: + asimmodule: singlepoint + env_id: batch #If all env_ids are slurm batch jobs using the same configuration, we automatically use arrays + args: + calc_id: lj_Ar + image: + name: Ar + id-0001: + # Doesn't have to be same asimmodule + asimmodule: geometry_optimization.atom_relax + env_id: batch + args: + calc_id: lj_Ar + image: + name: Ar + id-0002: + # This is just the calc_array example copied and pasted! + asimmodule: workflows.calc_array + env_id: batch + args: + calc_ids: [emt, lj_Cu] + env_ids: [inline, inline] # Must correspond to calc_id if list is given + subsim_input: + asimmodule: elastic_constants.cubic_energy_expansion + args: + calc_id: emt + image: + name: Cu + a: 3.6 + cubic: true + ase_cubic_eos_args: # The minimum for every calculator should be in the e-v range + npoints: 5 + eps: 0.04 + eos_string: sj diff --git a/examples/internal/env_input.yaml b/examples/internal/env_input.yaml index 1cc16e0..b8af294 100644 --- a/examples/internal/env_input.yaml +++ b/examples/internal/env_input.yaml @@ -26,6 +26,10 @@ srun: slurm: flags: - -n 1 +bash: + mode: + use_bash: true + interactive: true inline: mode: use_slurm: false diff --git a/pyproject.toml b/pyproject.toml index 557e43b..e1ded66 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,6 +33,7 @@ dependencies = [ # Recommended to use the master branch of the ASE>=3.23 from gitlab but only # required for lammps to handle masses correctly so far "colorama", + "natsort", "mp-api", ] diff --git a/tests/asimmodules/workflows/test_distributed.py b/tests/asimmodules/workflows/test_distributed.py index 21a1b1b..d1c95ec 100644 --- a/tests/asimmodules/workflows/test_distributed.py +++ b/tests/asimmodules/workflows/test_distributed.py @@ -30,3 +30,31 @@ def test_distributed(env_input, calc_input, sim_input, tmp_path, request): assert uj.get_sim_input()['workdir'] == './' assert unitjob.get_status(descend=True) == (True, 'complete') + +@pytest.mark.parametrize("calc_input",["lj_argon_calc_input"]) +@pytest.mark.parametrize("env_input",["batch_env_input"]) +@pytest.mark.parametrize("sim_input",[ + "lj_distributed_batch_sim_input", + "lj_distributed_group_batch_sim_input", +]) +def test_batch_distributed(env_input, calc_input, sim_input, tmp_path, request): + env_input = request.getfixturevalue(env_input) + calc_input = request.getfixturevalue(calc_input) + sim_input = request.getfixturevalue(sim_input) + wdir = tmp_path / 'wdir' + unitjob = create_unitjob(sim_input, env_input, wdir, calc_input=calc_input) + unitjob.submit() + + assert load_job_from_directory(wdir).get_status()[1] == 'complete' + dirs = glob(str(wdir / 'id*')) + assert len(dirs) == len(sim_input['args']['subsim_inputs']) + + for d in dirs: + assert str(d).rsplit('/', maxsplit=1)[-1].startswith('id-') + + uj = load_job_from_directory(d) + assert uj.get_status()[1] == 'complete' + + assert uj.get_sim_input()['workdir'] == './' + + assert unitjob.get_status(descend=True) == (True, 'complete') \ No newline at end of file diff --git a/tests/conftest.py b/tests/conftest.py index 1fe34eb..03850a6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -33,6 +33,22 @@ def batch_env_input(): } return env_input +@pytest.fixture +def group_batch_env_input(): + """Group Batch job env_input""" + env_input = { + 'group': { + 'mode': { + 'use_slurm': True, + 'interactive': False, + }, + 'slurm': { + 'flags': ['-n 1', '-J test', '--mem=1G'] + } + } + } + return env_input + @pytest.fixture def batch_dict_env_input(): """Batch job env_input""" @@ -262,3 +278,36 @@ def lj_distributed_batch_sim_input(): } return sim_input + +@pytest.fixture +def lj_distributed_group_batch_sim_input(): + ''' + Sim input for a distributed job that does some lj calculations + ''' + subsim_input = { + 'asimmodule': 'singlepoint', + 'env_id': 'inline', # This should be overwrriten by the group env + 'args': { + 'calc_id': 'lj', + 'image': { + 'name': 'Ar', + }, + 'properties': ['energy', 'forces'], + } + } + sim_input = { + 'asimmodule': 'workflows.distributed', + 'env_id': 'batch', + 'args': { + 'group_size': 2, + 'subsim_inputs': { + 'id-0000': subsim_input, + 'id-0001': subsim_input, + 'id-0002': subsim_input, + 'id-0003': subsim_input, + 'id-0004': subsim_input, + } + } + } + + return sim_input