diff --git a/env/GAEAC6.env b/env/GAEAC6.env index 07bdb854ae1..192298a21f0 100755 --- a/env/GAEAC6.env +++ b/env/GAEAC6.env @@ -268,6 +268,9 @@ case ${step} in export FI_CXI_RDZV_THRESHOLD=65536 export FI_CXI_DEFAULT_CQ_SIZE=1048576 + # Use parallel copying capability + export USE_CFP=${USE_CFP:-"YES"} + (( nnodes = (ntasks+tasks_per_node-1)/tasks_per_node )) (( ufs_ntasks = nnodes*tasks_per_node )) # With ESMF threading, the model wants to use the full node diff --git a/env/HERA.env b/env/HERA.env index 00ef74a5836..f8f7d8778cd 100755 --- a/env/HERA.env +++ b/env/HERA.env @@ -260,6 +260,9 @@ elif [[ "${step}" = "fcst" ]] || [[ "${step}" = "efcs" ]]; then export APRUN_UFS="${launcher} -n ${ufs_ntasks}" unset nnodes ufs_ntasks + # Use parallel copying capability + export USE_CFP=${USE_CFP:-"YES"} + elif [[ "${step}" = "upp" ]]; then export NTHREADS_UPP=${NTHREADS1} diff --git a/env/HERCULES.env b/env/HERCULES.env index 3a18a34e88e..3d7ae15121d 100755 --- a/env/HERCULES.env +++ b/env/HERCULES.env @@ -274,6 +274,9 @@ case ${step} in # With ESMF threading, the model wants to use the full node export APRUN_UFS="${launcher} -n ${ufs_ntasks}" unset nnodes ufs_ntasks + + # Use parallel copying capability + export USE_CFP=${USE_CFP:-"YES"} ;; "upp") diff --git a/env/ORION.env b/env/ORION.env index d9c605d1436..2bd55627e80 100755 --- a/env/ORION.env +++ b/env/ORION.env @@ -269,6 +269,9 @@ elif [[ "${step}" = "fcst" ]] || [[ "${step}" = "efcs" ]]; then export APRUN_UFS="${launcher} -n ${ufs_ntasks}" unset nnodes ufs_ntasks + # Use parallel copying capability + export USE_CFP=${USE_CFP:-"YES"} + elif [[ "${step}" = "upp" ]]; then export NTHREADS_UPP=${NTHREADS1} diff --git a/env/URSA.env b/env/URSA.env index c75fd759311..7ee1fb59fc6 100644 --- a/env/URSA.env +++ b/env/URSA.env @@ -249,6 +249,9 @@ elif [[ "${step}" = "fcst" ]] || [[ "${step}" = "efcs" ]]; then export APRUN_UFS="${launcher} -n ${ufs_ntasks}" unset nnodes ufs_ntasks + # Use parallel copying capability + export USE_CFP=${USE_CFP:-"YES"} + elif [[ "${step}" = "upp" ]]; then export NTHREADS_UPP=${NTHREADS1} diff --git a/env/WCOSS2.env b/env/WCOSS2.env index 502e3eaf996..be6145062c1 100755 --- a/env/WCOSS2.env +++ b/env/WCOSS2.env @@ -263,6 +263,9 @@ elif [[ "${step}" = "fcst" ]] || [[ "${step}" = "efcs" ]]; then export APRUN_UFS="${launcher} -n ${ufs_ntasks} -ppn ${tasks_per_node} --cpu-bind depth --depth 1" unset nnodes ufs_ntasks + # Use parallel copying capability + export USE_CFP=${USE_CFP:-"YES"} + # TODO: Why are fcst and efcs so different on WCOSS2? # TODO: Compare these with the ufs-weather-model regression test job card at: # https://github.com/ufs-community/ufs-weather-model/blob/develop/tests/fv3_conf/fv3_qsub.IN_wcoss2 diff --git a/ush/forecast_postdet.sh b/ush/forecast_postdet.sh index bc635f4656e..acc68270974 100755 --- a/ush/forecast_postdet.sh +++ b/ush/forecast_postdet.sh @@ -422,15 +422,22 @@ FV3_out() { local file_list fv3_file file_list=$(FV3_restarts) - # Copy restarts for the dates collected above to COM + # Build MPMD cmdfile to copy restarts in parallel + local cmdfile="${DATA}/cmdfile_fv3_out" + rm -f "${cmdfile}" for restart_date in "${restart_dates[@]}"; do echo "Copying FV3 restarts for 'RUN=${RUN}' at ${restart_date}" for fv3_file in ${file_list}; do - cpfs "${DATArestart}/FV3_RESTART/${restart_date}.${fv3_file}" \ - "${COMOUT_ATMOS_RESTART}/${restart_date}.${fv3_file}" + echo "cpfs ${DATArestart}/FV3_RESTART/${restart_date}.${fv3_file} ${COMOUT_ATMOS_RESTART}/${restart_date}.${fv3_file}" >> "${cmdfile}" done done + "${USHgfs}/run_mpmd.sh" "${cmdfile}" && true + export err=$? + if [[ ${err} -ne 0 ]]; then + err_exit "run_mpmd.sh failed to copy FV3 restart files!" + fi + echo "SUB ${FUNCNAME[0]}: Output data for FV3 copied" fi } @@ -551,14 +558,17 @@ WW3_out() { # Copy wave namelist from DATA to COMOUT_CONF after the forecast is run (and successfull) cpfs "${DATA}/ww3_shel.nml" "${COMOUT_CONF}/ufs.ww3_shel.nml" + # Build MPMD cmdfile to copy WW3 restarts in parallel + local cmdfile="${DATA}/cmdfile_ww3_out" + rm -f "${cmdfile}" + # Copy WW3 restarts at the end of the forecast segment to COM for RUN=gfs|gefs if [[ "${COPY_FINAL_RESTARTS}" == "YES" ]]; then local restart_file if [[ "${RUN}" == "gfs" || "${RUN}" == "gefs" || "${RUN}" == "gcafs" ]]; then echo "Copying WW3 restarts for 'RUN=${RUN}' at ${forecast_end_cycle}" restart_file="${forecast_end_cycle:0:8}.${forecast_end_cycle:8:2}0000.restart.ww3.nc" - cpfs "${DATArestart}/WW3_RESTART/${restart_file}" \ - "${COMOUT_WAVE_RESTART}/${restart_file}" + echo "cpfs ${DATArestart}/WW3_RESTART/${restart_file} ${COMOUT_WAVE_RESTART}/${restart_file}" >> "${cmdfile}" fi fi @@ -569,8 +579,7 @@ WW3_out() { restart_date="${model_start_date_next_cycle}" echo "Copying WW3 restarts for 'RUN=${RUN}' at ${restart_date}" restart_file="${restart_date:0:8}.${restart_date:8:2}0000.restart.ww3.nc" - cpfs "${DATArestart}/WW3_RESTART/${restart_file}" \ - "${COMOUT_WAVE_RESTART}/${restart_file}" + echo "cpfs ${DATArestart}/WW3_RESTART/${restart_file} ${COMOUT_WAVE_RESTART}/${restart_file}" >> "${cmdfile}" fi # Copy restarts for downstream usage in HAFS @@ -579,8 +588,15 @@ WW3_out() { restart_date="${next_cycle}" echo "Copying WW3 restarts for 'RUN=${RUN}' at ${restart_date}" restart_file="${restart_date:0:8}.${restart_date:8:2}0000.restart.ww3.nc" - cpfs "${DATArestart}/WW3_RESTART/${restart_file}" \ - "${COMOUT_WAVE_RESTART}/${restart_file}" + echo "cpfs ${DATArestart}/WW3_RESTART/${restart_file} ${COMOUT_WAVE_RESTART}/${restart_file}" >> "${cmdfile}" + fi + + if [[ -s "${cmdfile}" ]]; then + "${USHgfs}/run_mpmd.sh" "${cmdfile}" && true + export err=$? + if [[ ${err} -ne 0 ]]; then + err_exit "run_mpmd.sh failed to copy WW3 restart files!" + fi fi } @@ -713,6 +729,10 @@ MOM6_out() { *) ;; esac + # Build MPMD cmdfile to copy MOM6 restarts in parallel + local cmdfile="${DATA}/cmdfile_mom6_out" + rm -f "${cmdfile}" + case ${RUN} in gdas | enkfgdas | enkfgfs) # Copy restarts for the next cycle for RUN=gdas|enkfgdas|enkfgfs local restart_date @@ -720,8 +740,7 @@ MOM6_out() { echo "Copying MOM6 restarts for 'RUN=${RUN}' at ${restart_date}" for mom6_restart_file in "${mom6_restart_files[@]}"; do restart_file="${restart_date:0:8}.${restart_date:8:2}0000.${mom6_restart_file}" - cpfs "${DATArestart}/MOM6_RESTART/${restart_file}" \ - "${COMOUT_OCEAN_RESTART}/${restart_file}" + echo "cpfs ${DATArestart}/MOM6_RESTART/${restart_file} ${COMOUT_OCEAN_RESTART}/${restart_file}" >> "${cmdfile}" done ;; gfs | gefs | sfs | gcafs) # Copy MOM6 restarts at the end of the forecast segment to COM for RUN=gfs|gefs|sfs @@ -730,8 +749,7 @@ MOM6_out() { echo "Copying MOM6 restarts for 'RUN=${RUN}' at ${forecast_end_cycle}" for mom6_restart_file in "${mom6_restart_files[@]}"; do restart_file="${forecast_end_cycle:0:8}.${forecast_end_cycle:8:2}0000.${mom6_restart_file}" - cpfs "${DATArestart}/MOM6_RESTART/${restart_file}" \ - "${COMOUT_OCEAN_RESTART}/${restart_file}" + echo "cpfs ${DATArestart}/MOM6_RESTART/${restart_file} ${COMOUT_OCEAN_RESTART}/${restart_file}" >> "${cmdfile}" done fi ;; @@ -740,6 +758,14 @@ MOM6_out() { exit 25 ;; esac + + if [[ -s "${cmdfile}" ]]; then + "${USHgfs}/run_mpmd.sh" "${cmdfile}" && true + export err=$? + if [[ ${err} -ne 0 ]]; then + err_exit "run_mpmd.sh failed to copy MOM6 restart files!" + fi + fi } CICE_postdet() { @@ -829,6 +855,10 @@ CICE_out() { # Copy ice_in namelist from DATA to COMOUT_CONF after the forecast is run (and successfull) cpfs "${DATA}/ice_in" "${COMOUT_CONF}/ufs.ice_in" + # Build MPMD cmdfile to copy CICE restarts in parallel + local cmdfile="${DATA}/cmdfile_cice_out" + rm -f "${cmdfile}" + case ${RUN} in gdas | enkfgdas | enkfgfs) # Copy restarts for next cycle for RUN=gdas|enkfgdas|enkfgfs local restart_date @@ -837,8 +867,7 @@ CICE_out() { seconds=$(to_seconds "${restart_date:8:2}0000") # convert HHMMSS to seconds source_file="cice_model.res.${restart_date:0:4}-${restart_date:4:2}-${restart_date:6:2}-${seconds}.nc" target_file="${restart_date:0:8}.${restart_date:8:2}0000.cice_model.res.nc" - cpfs "${DATArestart}/CICE_RESTART/${source_file}" \ - "${COMOUT_ICE_RESTART}/${target_file}" + echo "cpfs ${DATArestart}/CICE_RESTART/${source_file} ${COMOUT_ICE_RESTART}/${target_file}" >> "${cmdfile}" ;; gfs | gefs | sfs | gcafs) # Copy CICE restarts at the end of the forecast segment to COM for RUN=gfs|gefs|sfs|gcafs if [[ "${COPY_FINAL_RESTARTS}" == "YES" ]]; then @@ -847,8 +876,7 @@ CICE_out() { seconds=$(to_seconds "${forecast_end_cycle:8:2}0000") # convert HHMMSS to seconds source_file="cice_model.res.${forecast_end_cycle:0:4}-${forecast_end_cycle:4:2}-${forecast_end_cycle:6:2}-${seconds}.nc" target_file="${forecast_end_cycle:0:8}.${forecast_end_cycle:8:2}0000.cice_model.res.nc" - cpfs "${DATArestart}/CICE_RESTART/${source_file}" \ - "${COMOUT_ICE_RESTART}/${target_file}" + echo "cpfs ${DATArestart}/CICE_RESTART/${source_file} ${COMOUT_ICE_RESTART}/${target_file}" >> "${cmdfile}" fi ;; *) @@ -856,6 +884,14 @@ CICE_out() { exit 25 ;; esac + + if [[ -s "${cmdfile}" ]]; then + "${USHgfs}/run_mpmd.sh" "${cmdfile}" && true + export err=$? + if [[ ${err} -ne 0 ]]; then + err_exit "run_mpmd.sh failed to copy CICE restart files!" + fi + fi } GOCART_rc() { @@ -924,15 +960,26 @@ GOCART_out() { "inst_du_bin" "inst_ss_bin" "inst_ca_bin" "inst_ni_bin" "inst_su_bin" "inst_2d" "inst_3d" "tavg_du_ss" "tavg_du_bin" "tavg_2d_rad" "tavg_3d_rad") + # Build MPMD cmdfile to copy GOCART output files in parallel + local cmdfile="${DATA}/cmdfile_gocart_out" + rm -f "${cmdfile}" + for fhr in $(GOCART_output_fh); do vdate=$(date --utc -d "${current_cycle:0:8} ${current_cycle:8:2} + ${fhr} hours" +%Y%m%d%H) for file_type in "${file_types[@]}"; do if [[ -e "${DATA}/gocart.${file_type}.${vdate:0:8}_${vdate:8:2}00z.nc4" ]]; then - cpfs "${DATA}/gocart.${file_type}.${vdate:0:8}_${vdate:8:2}00z.nc4" \ - "${COMOUT_CHEM_HISTORY}/gocart.${file_type}.${vdate:0:8}_${vdate:8:2}00z.nc4" + echo "cpfs ${DATA}/gocart.${file_type}.${vdate:0:8}_${vdate:8:2}00z.nc4 ${COMOUT_CHEM_HISTORY}/gocart.${file_type}.${vdate:0:8}_${vdate:8:2}00z.nc4" >> "${cmdfile}" fi done done + + if [[ -s "${cmdfile}" ]]; then + "${USHgfs}/run_mpmd.sh" "${cmdfile}" && true + export err=$? + if [[ ${err} -ne 0 ]]; then + err_exit "run_mpmd.sh failed to copy GOCART output files!" + fi + fi } # shellcheck disable=SC2178 @@ -1008,6 +1055,10 @@ CMEPS_postdet() { CMEPS_out() { echo "SUB ${FUNCNAME[0]}: Copying output data for CMEPS mediator" + # Build MPMD cmdfile to copy CMEPS mediator restarts in parallel + local cmdfile="${DATA}/cmdfile_cmeps_out" + rm -f "${cmdfile}" + case ${RUN} in gdas | enkfgdas | enkfgfs) # Copy restarts for the next cycle to COM local restart_date @@ -1017,8 +1068,7 @@ CMEPS_out() { source_file="ufs.cpld.cpl.r.${restart_date:0:4}-${restart_date:4:2}-${restart_date:6:2}-${seconds}.nc" target_file="${restart_date:0:8}.${restart_date:8:2}0000.ufs.cpld.cpl.r.nc" if [[ -f "${DATArestart}/CMEPS_RESTART/${source_file}" ]]; then - cpfs "${DATArestart}/CMEPS_RESTART/${source_file}" \ - "${COMOUT_MED_RESTART}/${target_file}" + echo "cpfs ${DATArestart}/CMEPS_RESTART/${source_file} ${COMOUT_MED_RESTART}/${target_file}" >> "${cmdfile}" else echo "Mediator restart '${DATArestart}/CMEPS_RESTART/${source_file}' not found." fi @@ -1031,8 +1081,7 @@ CMEPS_out() { source_file="ufs.cpld.cpl.r.${forecast_end_cycle:0:4}-${forecast_end_cycle:4:2}-${forecast_end_cycle:6:2}-${seconds}.nc" target_file="${forecast_end_cycle:0:8}.${forecast_end_cycle:8:2}0000.ufs.cpld.cpl.r.nc" if [[ -f "${DATArestart}/CMEPS_RESTART/${source_file}" ]]; then - cpfs "${DATArestart}/CMEPS_RESTART/${source_file}" \ - "${COMOUT_MED_RESTART}/${target_file}" + echo "cpfs ${DATArestart}/CMEPS_RESTART/${source_file} ${COMOUT_MED_RESTART}/${target_file}" >> "${cmdfile}" else echo "Mediator restart '${DATArestart}/CMEPS_RESTART/${source_file}' not found." fi @@ -1043,4 +1092,12 @@ CMEPS_out() { exit 25 ;; esac + + if [[ -s "${cmdfile}" ]]; then + "${USHgfs}/run_mpmd.sh" "${cmdfile}" && true + export err=$? + if [[ ${err} -ne 0 ]]; then + err_exit "run_mpmd.sh failed to copy CMEPS mediator restart files!" + fi + fi } diff --git a/ush/gsi_utils.py b/ush/gsi_utils.py old mode 100644 new mode 100755 diff --git a/ush/run_mpmd.sh b/ush/run_mpmd.sh index 773f8042d34..8f8503715b0 100755 --- a/ush/run_mpmd.sh +++ b/ush/run_mpmd.sh @@ -6,13 +6,14 @@ # Script name: run_mpmd.sh # Script description: Run multiple commands in MPMD mode or serially # -# Author: Rahul Mahajan +# Author: Rahul Mahajan and David Huber # # Org: NCEP/EMC # # Abstract: This script runs multiple commands in MPMD mode. It is used to run # multiple serial commands in parallel using the CFP (Coupled Framework -# Parallelism) feature of the workflow. +# Parallelism) feature of the workflow. The script handles chunking of the +# commands to avoid oversubscription of resources. # # Environment variables: # USE_CFP: If set to YES, run in MPMD mode, else run in serial mode. Default is 'NO'. @@ -31,12 +32,35 @@ # ################################################################################ -source "${USHgfs}/preamble.sh" - cmdfile=${1:?"run_mpmd requires an input file containing commands to execute in MPMD/serial mode"} -# If USE_CFP is not set, run in serial mode -if [[ "${USE_CFP:-}" != "YES" ]]; then +# Determine launcher type +if [[ "${launcher:-}" =~ ^srun.* ]]; then # srun-based system e.g. Hera, Orion, etc. + _mpmd_launcher=srun +elif [[ "${launcher:-}" =~ ^mpiexec.* ]]; then # mpiexec-based system e.g. WCOSS2 + _mpmd_launcher=mpiexec +else + echo "WARNING: Unsupported or empty launcher: '${launcher:-}', using serial mode instead" + echo " Supported launchers are 'srun' and 'mpiexec'" + _mpmd_launcher=unsupported +fi + +# Check if we are running a supported launcher +if [[ "${_mpmd_launcher}" == "srun" || "${_mpmd_launcher}" == "mpiexec" ]]; then + echo "INFO: Detected launcher '${_mpmd_launcher}', will attempt to run in MPMD mode if USE_CFP is set to YES" + if [[ -z "${max_tasks_per_node:-}" || -z "${ntasks:-}" ]]; then + echo "WARNING: max_tasks_per_node and/or ntasks is not set, disabling MPMD mode." + USE_CFP=NO + else + USE_CFP=${USE_CFP:-"NO"} + max_tasks_per_node=$((ntasks < max_tasks_per_node ? ntasks : max_tasks_per_node)) + fi +else + USE_CFP="NO" +fi + +# If USE_CFP is not set or is not YES, run in serial mode +if [[ "${USE_CFP}" != "YES" ]]; then echo "INFO: Using serial mode for MPMD job" chmod 755 "${cmdfile}" bash +x "${cmdfile}" > mpmd.out 2>&1 @@ -48,75 +72,154 @@ fi # Set OMP_NUM_THREADS to 1 to avoid oversubscription when doing MPMD export OMP_NUM_THREADS=1 -# Determine the number of MPMD processes from incoming ${cmdfile} -nprocs=$(wc -l < "${cmdfile}") - -# Local MPMD file containing instructions to run in CFP +# Establish the MPMD chunk file pattern. mpmd_cmdfile="${DATA:-}/mpmd_cmdfile" -if [[ -s "${mpmd_cmdfile}" ]]; then - rm -f "${mpmd_cmdfile}" -fi +rm -f "${mpmd_cmdfile}"* + +# Functions to support MPMD execution +chunk_mpmd() { + # Usage chunk_mpmd cmdfile chunk_size chunk_num chunk_file + # This takes a chunk of the full mpmd command file and creates a new chunk + # file with the specified number of lines + # Inputs: + # cmdfile: the full mpmd command file to read from and modify + # chunk_size: the number of lines to include in the chunk file + # chunk_num: the chunk number (used to determine which lines from the cmdfile to include in the chunk file) + # chunk_file: the name of the chunk file to create + # Use this function when the number of MPMD tasks is greater than the maximum tasks per node. + local mpmd_file="${1}" + local chunk_sz="${2}" + local chunk_num="${3}" + local chunk_file="${4}" + if [[ ! -s "${mpmd_file}" ]]; then + echo "ERROR: MPMD command file '${mpmd_file}' is empty or does not exist." + return 1 + fi + + # Determine which line to start reading from + local _start_line=$(((chunk_num - 1) * chunk_sz + 1)) + local _end_line=$((chunk_num * chunk_sz)) + + local _counter=1 + while IFS= read -r line; do + if [[ ${_counter} -ge ${_start_line} && ${_counter} -le ${_end_line} ]]; then + # Slurm requires a counter in front of each line in the script + if [[ "${_mpmd_launcher}" == "srun" ]]; then + echo "$((_counter - _start_line)) ${line}" >> "${chunk_file}" + elif [[ "${_mpmd_launcher}" == "mpiexec" ]]; then + echo "${line}" >> "${chunk_file}" + fi + err=$? + if [[ ${err} -ne 0 ]]; then + echo "ERROR: Failed to write line '${line}' to chunk file '${chunk_file}'" + return "${err}" + fi + fi + ((_counter = _counter + 1)) + done < "${mpmd_file}" + + return 0 +} + +cat_outputs() { + # This function concatenates the output files from the MPMD job and prints them to stdout. + # It also removes the individual output files after concatenation. + + # Optional argument to issue error if no output files are found. + _err_on_empty="${1:-false}" + out_files=$(find . -name 'mpmd.*.out') + if [[ -z "${out_files}" ]]; then + if [[ "${_err_on_empty}" == "true" ]]; then + echo "ERROR: No output files found for MPMD job" + return 1 + else + # Nothing to do, return success. + return 0 + fi + fi + for file in ${out_files}; do + { + echo "BEGIN OUTPUT FROM ${file}" + cat "${file}" + echo "END OUTPUT FROM ${file}" + } >> mpmd.out + rm -f "${file}" + done +} cat << EOF INFO: Executing MPMD job, STDOUT and STDERR redirected for each process separately INFO: On failure, logs for each job will be available in ${DATA}/mpmd.proc_num.out -INFO: The proc_num corresponds to the line in '${mpmd_cmdfile}' +INFO: The proc_num corresponds to the line in '${cmdfile}' EOF -if [[ "${launcher:-}" =~ ^srun.* ]]; then # srun-based system e.g. Hera, Orion, etc. - - # Slurm requires a counter in front of each line in the script - # Read the incoming cmdfile and create srun usable cmdfile - nm=0 - while IFS= read -r line; do - echo "${nm} ${line}" >> "${mpmd_cmdfile}" - ((nm = nm + 1)) - done < "${cmdfile}" +# Determine the number of MPMD processes from incoming ${cmdfile} +nm=$(wc -l < "${cmdfile}") + +# Test if the number of lines in the cmdfile is greater than the number of tasks per node ($max_tasks_per_node). + +if [[ ${nm} -gt ${max_tasks_per_node:-1} ]]; then + # If needed, split the cmdfile and run it in chunks. + # For now, keep all MPMD tasks on one node. + # TODO: consider running the MPMD job across multiple nodes. + echo "INFO: Number of MPMD tasks (${nm}) is greater than the maximum tasks per node (${max_tasks_per_node:-1})." + echo " Running MPMD job in chunks of ${max_tasks_per_node:-1} tasks per node." + chunk_size=${max_tasks_per_node:-1} +else + # Otherwise, we can run all MPMD tasks in one chunk. + chunk_size=${nm} +fi - unset_strict - # shellcheck disable=SC2086 - ${launcher:-} ${mpmd_opt:-} -n ${nprocs} "${mpmd_cmdfile}" +# Disable error checking for MPMD execution. +in_shellopts=${SHELLOPTS} +set +e +# Start chunking through the MPMD command file. +chunk_num=1 +err=0 +for ((i = 0; i < nm; i += chunk_size)); do + chunk_file="${mpmd_cmdfile}.chunk${chunk_num}" + chunk_mpmd "${cmdfile}" "${chunk_size}" "${chunk_num}" "${chunk_file}" err=$? - set_strict - -elif [[ "${launcher:-}" =~ ^mpiexec.* ]]; then # mpiexec - - # Redirect output from each process to its own stdout - # Read the incoming cmdfile and create mpiexec usable cmdfile - nm=0 - echo "#!/bin/bash" >> "${mpmd_cmdfile}" - while IFS= read -r line; do - echo "${line} > mpmd.${nm}.out 2>&1" >> "${mpmd_cmdfile}" - ((nm = nm + 1)) - done < "${cmdfile}" - chmod 755 "${mpmd_cmdfile}" - + if [[ ${err} -ne 0 ]]; then + echo "ERROR: Failed to create chunk file '${chunk_file}' from '${cmdfile}'" + break + fi + n_mpmd_tasks=$(wc -l < "${chunk_file}") # shellcheck disable=SC2086 - ${launcher:-} -np ${nprocs} ${mpmd_opt:-} "${mpmd_cmdfile}" + if [[ "${_mpmd_launcher}" == "srun" ]]; then + ${launcher:-} ${mpmd_opt:-} -n "${n_mpmd_tasks}" "${chunk_file}" + elif [[ "${_mpmd_launcher}" == "mpiexec" ]]; then + # shellcheck disable=SC2086 + ${launcher:-} ${mpmd_opt:-} -np "${n_mpmd_tasks}" "${chunk_file}" + fi err=$? - -else # Unsupported or empty launcher, run in serial mode - - echo "WARNING: CFP is not usable with launcher: '${launcher:-}', using serial mode instead" - chmod 755 "${cmdfile}" - bash +x "${cmdfile}" > mpmd.out 2>&1 + if [[ ${err} -ne 0 ]]; then + echo "ERROR: MPMD job failed for ${chunk_file}" + break + fi + # Call cat_outputs and error if no outputs are found. + cat_outputs "true" err=$? - + if [[ ${err} -ne 0 ]]; then + echo "ERROR: No output files found for MPMD job for chunk file '${chunk_file}'" + break + fi + ((chunk_num = chunk_num + 1)) +done + +# On success remove the command file and any chunk files. +if [[ ${err} -eq 0 ]]; then + rm -f "${mpmd_cmdfile}.chunk"* fi -# On success concatenate processor specific output into a single mpmd.out -if [[ ${err} -eq 0 ]]; then - rm -f "${mpmd_cmdfile}" - out_files=$(find . -name 'mpmd.*.out') - for file in ${out_files}; do - { - echo "BEGIN OUTPUT FROM ${file}" - cat "${file}" - echo "END OUTPUT FROM ${file}" - } >> mpmd.out - rm -f "${file}" - done +# Concatenate any remaining output files if they exist +cat_outputs +if [[ -s mpmd.out ]]; then cat mpmd.out +else + echo "WARNING: No output files found for MPMD job" fi +export SHELLOPTS="${in_shellopts}" + exit "${err}" diff --git a/ush/wave_domain_grid.sh b/ush/wave_domain_grid.sh old mode 100644 new mode 100755