diff --git a/dev/workflow/generate_workflows.sh b/dev/workflow/generate_workflows.sh index 4a13c30ecb7..ae63779b5c5 100755 --- a/dev/workflow/generate_workflows.sh +++ b/dev/workflow/generate_workflows.sh @@ -187,6 +187,18 @@ function send_email() { echo "${_body}" | mail -s "${_subject}" "${_email}" } +# Function to notify user about REPLYTO for scrontab workflows +function mail_warning() { + if [[ "${_use_scron}" == true && "${_set_email}" == false && -z "${REPLYTO:-}" ]]; then + echo -e "\033[0;33mWARNING:\033[0m Set \033[0;32mexport REPLYTO=\"your_email\"\033[0m in your .bashrc or use generate_workflows.sh with \033[0;32m-e \"your_email\"\033[0m to receive job failure notifications." + fi +} + +# Export REPLYTO if email was provided via -e flag and is not empty +if [[ "${_set_email}" == "true" && -n "${_email}" ]]; then + export REPLYTO="${_email}" +fi + function delete_dir() { local dir_to_rm="${1:-}" if [[ -z "${dir_to_rm}" ]]; then @@ -584,9 +596,9 @@ for _case in "${_yaml_list[@]}"; do if [[ "${_use_scron}" == true ]]; then { - grep "^#.*${_pslot}" "${_runtests}/EXPDIR/${_pslot}/${_pslot}.crontab" + grep "^####" "${cron_file}" grep "^#SCRON" "${cron_file}" - grep "${scron_sh_file}" "${_runtests}/EXPDIR/${_pslot}/${_pslot}.crontab" + grep "${scron_sh_file}" "${cron_file}" } >> tests.cron else grep "${_pslot}" "${_runtests}/EXPDIR/${_pslot}/${_pslot}.crontab" >> tests.cron @@ -594,10 +606,25 @@ for _case in "${_yaml_list[@]}"; do done echo +# Add MAILTO to tests.cron for regular crontab +if [[ "${_use_scron}" == false ]]; then + if [[ "${_set_email}" == "true" ]]; then + # Use email from -e flag + sed -i "1i MAILTO=\"${_email}\"" tests.cron + elif [[ -n "${REPLYTO:-}" ]]; then + # Use REPLYTO environment variable + sed -i "1i MAILTO=\"${REPLYTO}\"" tests.cron + else + # Use empty MAILTO + sed -i "1i MAILTO=\"\"" tests.cron + fi +fi + # Update the cron if [[ "${_update_cron}" == "true" ]]; then printf "Updating the existing crontab\n\n" echo + mail_warning rm -f existing.cron final.cron "${_verbose_flag}" touch existing.cron final.cron @@ -610,16 +637,42 @@ if [[ "${_update_cron}" == "true" ]]; then echo "#######################" fi + # Save existing MAILTO before removing it + existing_mailto=$(grep "^MAILTO=" existing.cron 2> /dev/null | head -1 || echo "") + + # Remove ALL MAILTO lines from existing.cron and tests.cron to prevent duplicates + sed -i '/^MAILTO=/d' existing.cron 2> /dev/null || true + sed -i '/^MAILTO=/d' tests.cron 2> /dev/null || true + if [[ "${_set_email}" == "true" ]]; then - # Replace the existing email in the crontab + # For scrontab, REPLYTO is already exported earlier; for crontab, set MAILTO if [[ "${_verbose}" == "true" ]]; then printf "Updating crontab/scrontab email to %s\n\n" "${_email}" fi - if [[ "${_use_scron}" == true ]]; then - sed -i "s/.*--mail-user.*/#SCRON --mail-user=\"${_email}\"/" tests.cron - else - sed -i "s/^MAILTO.*/MAILTO=\"${_email}\"/" existing.cron + if [[ "${_use_scron}" == false ]]; then + # For regular crontab, set MAILTO at the top of final.cron + echo "MAILTO=\"${_email}\"" > final.cron + fi + else + # Preserve existing MAILTO if present with non-empty value (only for regular crontab) + if [[ "${_use_scron}" == false ]]; then + # Check if there was a MAILTO with a non-empty value in the original crontab + # Extract the email value from MAILTO="email" or MAILTO=email + if [[ -n "${existing_mailto}" ]]; then + # Extract email value between quotes or after = + existing_email=$(echo "${existing_mailto}" | sed -n 's/^MAILTO=["'\'']*\([^"'\'']*\)["'\'']*$/\1/p') + else + existing_email="" + fi + + if [[ -n "${existing_email}" ]]; then + echo "${existing_mailto}" > final.cron + elif [[ -n "${REPLYTO:-}" ]]; then + echo "MAILTO=\"${REPLYTO}\"" > final.cron + else + echo "MAILTO=\"\"" > final.cron + fi fi fi @@ -634,10 +687,12 @@ if [[ "${_update_cron}" == "true" ]]; then ${_crontab_cmd} final.cron else + mail_warning _message="Add the following to your crontab or scrontab to start running:" _cron_tests=$(cat tests.cron) _message="${_message}"$'\n'"${_cron_tests}" echo "${_message}" + echo if [[ "${_set_email}" == true ]]; then final_message="${final_message:-}"$'\n'"${_message}" fi diff --git a/dev/workflow/rocoto/rocoto_scron.sh.j2 b/dev/workflow/rocoto/rocoto_scron.sh.j2 new file mode 100644 index 00000000000..eab9828b3c9 --- /dev/null +++ b/dev/workflow/rocoto/rocoto_scron.sh.j2 @@ -0,0 +1,73 @@ +#! /usr/bin/env bash +source "{{ HOMEgfs }}/dev/ush/gw_setup.sh" + +# Run rocotorun +bash -c "{{ rocotorunstr }}" + +# Monitor for failed jobs using rocotostat +LOCKFILE="{{ expdir }}/.failed_jobs.lock" +ROCOTOSTAT=$(command -v rocotostat) + +if [[ -n "${ROCOTOSTAT}" ]]; then + FAILED_JOBS=$(${ROCOTOSTAT} -d "{{ expdir }}/{{ pslot }}.db" -w "{{ expdir }}/{{ pslot }}.xml" -c all 2> /dev/null | grep -E 'DEAD') + + if [[ -n "${FAILED_JOBS}" ]]; then + # Read previously reported failures + PREV_FAILED="" + if [[ -f "${LOCKFILE}" ]]; then + PREV_FAILED=$(cat "${LOCKFILE}") + fi + + # Check for NEW failures only (not just changes) + NEW_FAILURES="" + while IFS= read -r job; do + if [[ -n "${job}" ]] && ! echo "${PREV_FAILED}" | grep -qF "${job}"; then + NEW_FAILURES="${NEW_FAILURES}${job}"$'\n' + fi + done <<< "${FAILED_JOBS}" + + # Send email only if there are NEW failures + if [[ -n "${NEW_FAILURES}" ]]; then + MSGFILE="/tmp/rocoto_fail_msg_$$.txt" + { + echo "The following jobs have failed in experiment {{ pslot }} on ${MACHINE_ID}:" + echo "" + + # Format each failed job with detailed information + while IFS= read -r line; do + if [[ -n "${line}" ]]; then + # Parse rocotostat output: Cycle Task JobID State Try MaxTries Duration + read -r cycle task jobid state try maxtries duration <<< "${line}" + # Extract YYYYMMDDHH from cycle (first 10 characters) + cycle_short=${cycle:0:10} + # Get current timestamp + timestamp=$(date -u '+%m/%d/%y %H:%M:%S UTC') + + # Format similar to user's example + echo "${timestamp} :: {{ pslot }}.xml :: Cycle ${cycle}, Task ${task}, jobid=${jobid}, in state ${state}, ran for ${duration} seconds, try=${try} (of ${maxtries})" + echo "Check log: {{ comroot }}/{{ pslot }}/logs/${cycle_short}/${task}.log" + echo "" + fi + done <<< "${NEW_FAILURES}" + } > "${MSGFILE}" + + # Try to send email + EMAIL="{{ replyto }}" + hostname_domain=$(hostname -d) + FROM_EMAIL="no-reply@${hostname_domain}" + if [[ "${EMAIL}" != "None" ]] && command -v mail &> /dev/null; then + # On Gaea, the mail utility requires the -v (verbose) flag to ensure delivery. + # To avoid receiving verbose output as an actual email, a spoofed 'from' address is used for notifications. + mail -r "${FROM_EMAIL}" -v -s "[{{ pslot }}] Workflow Job Failures Detected" "${EMAIL}" < "${MSGFILE}" 2>&1 + fi + + rm -f "${MSGFILE}" + fi + + # Always update lockfile to reflect current failures + echo "${FAILED_JOBS}" > "${LOCKFILE}" + else + # No failures, remove lockfile if it exists + [[ -f "${LOCKFILE}" ]] && rm -f "${LOCKFILE}" + fi +fi diff --git a/dev/workflow/rocoto/rocoto_xml.py b/dev/workflow/rocoto/rocoto_xml.py index 176a6d644f1..68f2ce89adb 100644 --- a/dev/workflow/rocoto/rocoto_xml.py +++ b/dev/workflow/rocoto/rocoto_xml.py @@ -8,7 +8,7 @@ from applications.applications import AppConfig from workflow_suite import WorkflowSuite from rocoto.workflow_tasks import get_wf_tasks -from wxflow import which, mkdir +from wxflow import which, mkdir, parse_j2tmpl import rocoto.rocoto as rocoto from abc import ABC, abstractmethod from logging import getLogger @@ -123,6 +123,36 @@ def write(self, xml_file: str = None, crontab_file: str = None): if self._base["DO_ARCHCOM"] and self._base["ARCHCOM_TO"] == "globus_hpss": self._write_server_crontab() + def _get_scron_script_content(self, rocotorunstr: str, replyto: str) -> str: + """ + Load and format the cron script template with experiment-specific values. + + Parameters + ---------- + rocotorunstr : str + The rocotorun command string + replyto : str + Email address for notifications + + Returns + ------- + str + Formatted bash script content + """ + template_path = os.path.join(os.path.dirname(__file__), 'rocoto_scron.sh.j2') + + # Format the template with experiment-specific values + context = { + 'HOMEgfs': self.HOMEgfs, + 'rocotorunstr': rocotorunstr, + 'expdir': self.expdir, + 'pslot': self.pslot, + 'replyto': replyto, + 'comroot': self._base.get('COMROOT') + } + template_content = parse_j2tmpl(template_path, context) + return template_content + def _write_xml(self, xml_file: str = None) -> None: if xml_file is None: @@ -147,8 +177,7 @@ def _write_crontab(self, crontab_file: str = None, cronint: int = 5) -> None: rocotorunstr = f'{rocotoruncmd} -d {self.expdir}/{self.pslot}.db -w {self.expdir}/{self.pslot}.xml' cronintstr = f'*/{cronint} * * * *' - replyto = os.environ.get('REPLYTO', "") - + replyto = os.environ.get('REPLYTO', None) crontab_strings = [ '', f'#################### {self.pslot} ####################' @@ -169,28 +198,23 @@ def _write_crontab(self, crontab_file: str = None, cronint: int = 5) -> None: crontab_strings.extend([ f'#SCRON --partition={partition}', f'#SCRON --account={account}', - f'#SCRON --mail-user={replyto}', f'#SCRON --job-name={self.pslot}_scron', f'#SCRON --output={self.expdir}/logs/scron.log', - '#SCRON --time=00:10:00', - '#SCRON --dependency=singleton' + f'#SCRON --time=00:10:00', + f'#SCRON --dependency=singleton' ]) - # Now write the script that actually runs rocotorun + # Now write the script that actually runs rocotorun and monitors for failures cron_cmd = f"{self.expdir}/{self.pslot}.scron.sh" with open(cron_cmd, "w") as script_fh: - script_fh.write( - "#!/usr/bin/env bash\n" + - "set -x\n" + - f"source {self.HOMEgfs}/dev/ush/gw_setup.sh" + "\n" + - rocotorunstr + "\n" - ) + script_fh.write(self._get_scron_script_content(rocotorunstr, replyto)) # Make the script executable mode = os.stat(cron_cmd) os.chmod(cron_cmd, mode.st_mode | stat.S_IEXEC) else: - cron_cmd = rocotorunstr + # For regular crontab, create a wrapper script with monitoring + cron_cmd = f"{self.expdir}/{self.pslot}.cron.sh" crontab_strings.extend([ 'SHELL="/bin/bash"', f'MAILTO="{replyto}"'