Skip to content

Commit

Permalink
Barebones grouping, needs testing on slurm
Browse files Browse the repository at this point in the history
  • Loading branch information
mkphuthi authored and mkphuthi committed Nov 16, 2024
1 parent 859bdd7 commit bb3465f
Show file tree
Hide file tree
Showing 9 changed files with 290 additions and 116 deletions.
30 changes: 12 additions & 18 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,19 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
## [develop] - 2024-11-13

### Added
- Added natural sorting of files in workflows
- Added benchmarking/distribution asimmodule
- Added data/collect_images asimmodule
- ASIMMODULE: benchmarking/distribution
- ASIMMODULE: data/collect_images asimmodule
- CALCULATOR: OMAT24 calculator
- VASP examples
- Jobs submitted as an array can now be grouped together in slurm jobs. See
distributed examples

### Changed

### Fixed
- Bugs in phonopy asimmodule that would give incorrectly sorted EOS energies
- We now append rather than rewrite to stderr.txt and stdout.txt
- Now forcing natural sorting of files in workflows file-based arrays were
confusing

<!--
## [1.2.3] - 2017-03-14
### Added
### Changed
### Fixed
- [PROJECTNAME-UUUU](http://tickets.projectname.com/browse/PROJECTNAME-UUUU)
MINOR Fix module foo tests
- [PROJECTNAME-RRRR](http://tickets.projectname.com/browse/PROJECTNAME-RRRR)
MAJOR Module foo's timeline uses the browser timezone for date resolution -->
- Bugs in phonopy asimmodule that would give incorrectly sorted EOS energies
- If an atoms object had unknown keys, writing the output extxyz file would
throw an error, now all the ASE sanctioned properties are written
5 changes: 4 additions & 1 deletion asimtools/asimmodules/workflows/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def distributed(
env_input: Optional[Dict] = None,
calc_input: Optional[Dict] = None,
array_max: Optional[int] = None,
group_size: int = 1,
) -> Dict:
"""Distributes a set of jobs based on inpout parameter. The scnarios are
as follows:
Expand All @@ -32,6 +33,8 @@ def distributed(
:type calc_input: Optional[Dict], optional
:param array_max: Maximum number of jobs to run in array, defaults to None
:type array_max: Optional[int], optional
:param group_size: Number of jobs to group together, defaults to 1
:type group_size: int, optional
:return: Dictionary of results
:rtype: Dict
"""
Expand All @@ -40,7 +43,7 @@ def distributed(
env_input=env_input,
calc_input=calc_input
)
job_ids = djob.submit(array_max=array_max)
job_ids = djob.submit(array_max=array_max, group_size=group_size)

results = {'job_ids': job_ids}
return results
211 changes: 114 additions & 97 deletions asimtools/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,14 +595,106 @@ def __init__(
)

if same_env and all_slurm:
self.use_slurm_array = True
elif same_env and all_bash:
self.use_bash_array = True
self.use_slurm = True
else:
self.use_slurm_array = False
self.use_slurm = False

if all_bash:
self.use_bash = True

self.unitjobs = unitjobs

def _gen_array_script(
self,
write: bool = True,
group_size: int = 1,
) -> None:
'''
Generates a slurm job array file and if necessary
writes it in the work directory for the job. Only works
if there is one env_id for all jobs
'''
env_id = self.unitjobs[0].env_id
env = self.env_input[env_id]

slurm_params = env.get('slurm', {})

txt = self._gen_slurm_batch_preamble(
slurm_params=slurm_params,
extra_flags=[
'-o slurm_stdout.id-%a_j%A',
'-e slurm_stderr.id-%a_j%A',
]
)

txt += 'echo "Job started on `hostname` at `date`"\n'
txt += 'CUR_DIR=`pwd`\n'
txt += 'echo "LAUNCHDIR: ${CUR_DIR}"\n'
txt += f'GROUP_SIZE={group_size}\n'
seqtxt = '$(seq $((${SLURM_ARRAY_TASK_ID}*${GROUP_SIZE})) '
seqtxt += '$(((${SLURM_ARRAY_TASK_ID}+1)*${GROUP_SIZE})))'
txt += f'WORKDIRS=($(ls -dv ./id-*))\n'
txt += f'for i in {seqtxt}; do\n'
txt += 'cd ${WORKDIRS[$i]};\n'
# else:
# txt += '\nif [[ ! -z ${SLURM_ARRAY_TASK_ID} ]]; then\n'
# txt += ' fls=( id-* )\n'
# txt += ' WORKDIR=${fls[${SLURM_ARRAY_TASK_ID}]}\n'
# txt += 'fi\n\n'
# txt += 'cd ${WORKDIR}\n'
txt += '\n'
txt += '\n'.join(slurm_params.get('precommands', []))
txt += '\n'
txt += '\n'.join(self.unitjobs[0].calc_params.get('precommands', []))
txt += '\n'
txt += 'echo "WORKDIR: ${WORKDIRS[$i]}"\n'
txt += self.unitjobs[0].gen_run_command() + '\n'
txt += '\n'.join(slurm_params.get('postcommands', []))
txt += '\n'
txt += 'cd ${CUR_DIR}\n'
txt += 'done\n'
txt += 'echo "Job ended at `date`"'

if write:
slurm_file = self.workdir / 'job_array.sh'
slurm_file.write_text(txt)

def _gen_bash_script(
self,
write: bool = True,
) -> None:
'''
Generates a bash script for job submission and if necessary
writes it in the work directory for the job. Only works
if there is one env_id for all jobs
'''
env_id = self.unitjobs[0].env_id
env = self.env_input[env_id]

txt = '#!/usr/bin/env sh\n\n'
# txt += f'WDIRS=($(ls -dv ./id-*))\n'
txt += f'for WORKDIR in id-*; do\n'
txt += ' cd ${WORKDIR};\n'
txt += '\n'.join(self.unitjobs[0].calc_params.get('precommands', []))
txt += '\n asim-run sim_input.yaml -c calc_input.yaml\n'
txt += '\n'.join(self.unitjobs[0].calc_params.get('precommands', []))
txt += f'\n cd ../;\n'
txt += 'done'

if write:
script_file = self.workdir / 'bash_script.sh'
script_file.write_text(txt)

def gen_input_files(self) -> None:
''' Write input files to working directory '''

if self.use_slurm:
self._gen_array_script()
if self.use_bash:
self._gen_bash_script()
for unitjob in self.unitjobs:
unitjob.gen_input_files()

def submit_jobs(
self,
**kwargs,
Expand Down Expand Up @@ -639,10 +731,10 @@ def submit_bash_array(
return None

command = [
'source ',
'job_script.sh',
'sh',
'./bash_script.sh',
]

print(command)
completed_process = subprocess.run(
command, check=False, capture_output=True, text=True,
)
Expand All @@ -661,8 +753,8 @@ def submit_bash_array(
logger.error(err_msg)
completed_process.check_returncode()

if os.env.get('SLURM_JOB_ID', False):
job_ids = [int(os.env['SLURM_JOB_ID'])]
if os.environ.get('SLURM_JOB_ID', False):
job_ids = [int(os.environ['SLURM_JOB_ID'])]
else:
job_ids = None
return job_ids
Expand All @@ -671,7 +763,8 @@ def submit_slurm_array(
self,
array_max=None,
dependency: Union[List[str],None] = None,
**kwargs, # Necessary for compatibility with job.submit
group_size: int = 1,
**kwargs,
) -> Union[None,List[int]]:
'''
Submits a job array if all the jobs have the same env and use slurm
Expand All @@ -698,6 +791,15 @@ def submit_slurm_array(
else:
arr_max_str = ''

if group_size > 1:
nslurm_jobs = int(np.ceil(njobs / group_size))
self._gen_array_script(
dist_ids=f'0-{nslurm_jobs-1}{arr_max_str}'
)
# if group_size is not 1
# write the bash submission script
# the submit command is to submit the bash script

if dependency is not None:
dependstr = None
for dep in dependency:
Expand Down Expand Up @@ -738,91 +840,6 @@ def submit_slurm_array(
job_ids = [int(completed_process.stdout.split(' ')[-1])]
return job_ids

def _gen_array_script(self, write: bool = True) -> None:
'''
Generates a slurm job array file and if necessary
writes it in the work directory for the job. Only works
if there is one env_id for all jobs
'''
env_id = self.unitjobs[0].env_id
env = self.env_input[env_id]

slurm_params = env.get('slurm', {})

txt = self._gen_slurm_batch_preamble(
slurm_params=slurm_params,
extra_flags=[
'-o slurm_stdout.id-%a_j%A',
'-e slurm_stderr.id-%a_j%A',
]
)

txt += '\nif [[ ! -z ${SLURM_ARRAY_TASK_ID} ]]; then\n'
txt += ' fls=( id-* )\n'
txt += ' WORKDIR=${fls[${SLURM_ARRAY_TASK_ID}]}\n'
txt += 'fi\n\n'
txt += 'CUR_DIR=`pwd`\n'
txt += 'cd ${WORKDIR}\n'
txt += '\n'
txt += '\n'.join(slurm_params.get('precommands', []))
txt += '\n'
txt += '\n'.join(self.unitjobs[0].calc_params.get('precommands', []))
txt += '\n'
txt += 'echo "LAUNCHDIR: ${CUR_DIR}"\n'
txt += 'echo "WORKDIR: ${WORKDIR}"\n'
txt += 'echo "Job started on `hostname` at `date`"\n'
txt += self.unitjobs[0].gen_run_command() + '\n'
txt += '\n'.join(slurm_params.get('postcommands', []))
txt += '\n'
txt += 'cd ${CUR_DIR}\n'
txt += 'echo "Job ended at `date`"'

if write:
slurm_file = self.workdir / 'job_array.sh'
slurm_file.write_text(txt)

def _gen_bash_script(
self,
write: bool = True,
dist_ids: Union[str,Sequence,None] = None
) -> None:
'''
Generates a bash script for job submission and if necessary
writes it in the work directory for the job. Only works
if there is one env_id for all jobs
'''
env_id = self.unitjobs[0].env_id
env = self.env_input[env_id]
if dist_ids is None:
ids2run = '$(seq 1 $END)'
elif isinstance(dist_ids, str):
ids2run = parse_slice(dist_ids, bash=True)
else:
ids2run = '('+ ' '.join(dist_ids) + ')'

txt = '#!/usr/bin/sh\n\n'
txt += f'WDIRS=($(ls -dv ./id-*))\n'
txt += f'for i in {ids2run}; do\n'
txt += ' cd ${{WDIRS[$i]}};\n'
txt += '\n'.join(self.unitjobs
[0].calc_params.get('precommands', []))
txt += '\n asim-run sim_input.yaml -c calc_input.yaml\n'
txt += '\n'.join(self.unitjobs[0].calc_params.get('precommands', []))
txt += f'\n cd {self.workdir};\n'
txt += 'done'

if write:
script_file = self.workdir / 'job_script.sh'
script_file.write_text(txt)

def gen_input_files(self) -> None:
''' Write input files to working directory '''

if self.use_slurm_array:
self._gen_array_script()
for unitjob in self.unitjobs:
unitjob.gen_input_files()

def submit(self, **kwargs) -> None:
'''
Submit a job using slurm, interactively or in the terminal
Expand All @@ -831,9 +848,9 @@ def submit(self, **kwargs) -> None:
cur_dir = Path('.').resolve()
os.chdir(self.workdir)
job_ids = None
if self.use_slurm_array:
if self.use_slurm:
job_ids = self.submit_slurm_array(**kwargs)
elif self.use_bash_array:
elif self.use_bash:
job_ids = self.submit_bash_array(**kwargs)
else:
job_ids = self.submit_jobs(**kwargs)
Expand Down
38 changes: 38 additions & 0 deletions examples/internal/distributed/distributed_bash_sim_input.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
asimmodule: workflows.distributed
workdir: distributed_bash_results
args:
subsim_inputs:
id-0000:
asimmodule: singlepoint
env_id: bash #If all env_ids are slurm batch jobs using the same configuration, we automatically use arrays
args:
calc_id: lj_Ar
image:
name: Ar
id-0001:
# Doesn't have to be same asimmodule
asimmodule: geometry_optimization.atom_relax
env_id: bash
args:
calc_id: lj_Ar
image:
name: Ar
id-0002:
# This is just the calc_array example copied and pasted!
asimmodule: workflows.calc_array
env_id: bash
args:
calc_ids: [emt, lj_Cu]
env_ids: [inline, inline] # Must correspond to calc_id if list is given
subsim_input:
asimmodule: elastic_constants.cubic_energy_expansion
args:
calc_id: emt
image:
name: Cu
a: 3.6
cubic: true
ase_cubic_eos_args: # The minimum for every calculator should be in the e-v range
npoints: 5
eps: 0.04
eos_string: sj
Loading

0 comments on commit bb3465f

Please sign in to comment.