Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 14 additions & 27 deletions dev/scripts/exglobal_atmos_analysis.sh
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,8 @@ if [[ "${GENDIAG}" == "YES" ]]; then
fi
# Make the gsidiags directory to house the GSI diagnostic data
GSIDIAGDIR=${GSIDIAGDIR:-"${pCOMOUT_ATMOS_ANALYSIS}/gsidiags"}
rm -rf "${GSIDIAGDIR}"
mkdir -p "${GSIDIAGDIR}"
fi

##############################################################
Expand Down Expand Up @@ -670,6 +672,18 @@ if [[ "${USE_SELECT}" == "YES" ]]; then
fi
fi

# If diags are to be generated, create the gsi.* directories in GSIDIAGDIR and link them here.
# This will allow the GSI to write directly to the GSIDIAGDIR.
if [[ "${GENDIAG}" == "YES" ]]; then
# The number of directories is controlled by the number of tasks
# (one each + 1, though the last will contain no data)
for task in $(seq 0 "${ntasks}"); do
dir="dir.$(printf %04d "${task}")"
mkdir -p "${GSIDIAGDIR}/${dir}"
${NLN} "${GSIDIAGDIR}/${dir}" "./${dir}"
done
fi

##############################################################
# If requested, copy and de-tar guess radstat file
if [[ "${USE_RADSTAT}" == "YES" ]]; then
Expand Down Expand Up @@ -909,33 +923,6 @@ fi
# shellcheck disable=SC2312
echo "${rCDUMP} ${PDY}${cyc} atminc done at $(date)" > "${COMOUT_ATMOS_ANALYSIS}/${APREFIX}increment.done.txt"

# Diagnostic files
# if requested, GSI diagnostic file directories for use later
if [[ "${GENDIAG}" == "YES" ]]; then
# Move the gsidiags dir.* directories to pCOMOUT_ATMOS_ANALYSIS for diagnostic jobs
# First, check that the directories exist (we need at least one, so stop after the first match)
# shellcheck disable=SC2312
count_dirs=$(find . -maxdepth 1 -type d -name 'dir.????' -printf "." -quit | wc -c)
if [[ ${count_dirs:-0} -gt 0 ]]; then
mkdir -p "${GSIDIAGDIR}"
err=$?

if [[ ! -d "${GSIDIAGDIR}" || ${err} -ne 0 ]]; then
err_exit "Failed to create gsidiags directory at ${GSIDIAGDIR}"
fi

for dir in dir.????; do
mv "${dir}" "${GSIDIAGDIR}/"
export err=$?
if [[ ${err} -ne 0 ]]; then
err_exit "Failed to move ${dir} to ${GSIDIAGDIR}/"
fi
done
else
echo "WARNING: No gsidiags dir.* directories found to move."
fi
fi

################################################################################

exit "${err}"
70 changes: 63 additions & 7 deletions dev/workflow/generate_workflows.sh
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,18 @@ function send_email() {
echo "${_body}" | mail -s "${_subject}" "${_email}"
}

# Function to notify user about REPLYTO for scrontab workflows
function mail_warning() {
if [[ "${_use_scron}" == true && "${_set_email}" == false && -z "${REPLYTO:-}" ]]; then
echo -e "\033[0;33mWARNING:\033[0m Set \033[0;32mexport REPLYTO=\"your_email\"\033[0m in your .bashrc or use generate_workflows.sh with \033[0;32m-e \"your_email\"\033[0m to receive job failure notifications."
fi
}

# Export REPLYTO if email was provided via -e flag and is not empty
if [[ "${_set_email}" == "true" && -n "${_email}" ]]; then
export REPLYTO="${_email}"
fi

function delete_dir() {
local dir_to_rm="${1:-}"
if [[ -z "${dir_to_rm}" ]]; then
Expand Down Expand Up @@ -577,20 +589,35 @@ for _case in "${_yaml_list[@]}"; do

if [[ "${_use_scron}" == true ]]; then
{
grep "^#.*${_pslot}" "${_runtests}/EXPDIR/${_pslot}/${_pslot}.crontab"
grep "^####" "${cron_file}"
grep "^#SCRON" "${cron_file}"
grep "${scron_sh_file}" "${_runtests}/EXPDIR/${_pslot}/${_pslot}.crontab"
grep "${scron_sh_file}" "${cron_file}"
} >> tests.cron
else
grep "${_pslot}" "${_runtests}/EXPDIR/${_pslot}/${_pslot}.crontab" >> tests.cron
fi
done
echo

# Add MAILTO to tests.cron for regular crontab
if [[ "${_use_scron}" == false ]]; then
if [[ "${_set_email}" == "true" ]]; then
# Use email from -e flag
sed -i "1i MAILTO=\"${_email}\"" tests.cron
elif [[ -n "${REPLYTO:-}" ]]; then
# Use REPLYTO environment variable
sed -i "1i MAILTO=\"${REPLYTO}\"" tests.cron
else
# Use empty MAILTO
sed -i "1i MAILTO=\"\"" tests.cron
fi
fi

# Update the cron
if [[ "${_update_cron}" == "true" ]]; then
printf "Updating the existing crontab\n\n"
echo
mail_warning
rm -f existing.cron final.cron "${_verbose_flag}"
touch existing.cron final.cron

Expand All @@ -603,16 +630,43 @@ if [[ "${_update_cron}" == "true" ]]; then
echo "#######################"
fi

# Save existing MAILTO before removing it
# shellcheck disable=SC2312
existing_mailto=$(grep "^MAILTO=" existing.cron 2> /dev/null | head -1 || echo "")

# Remove ALL MAILTO lines from existing.cron and tests.cron to prevent duplicates
sed -i '/^MAILTO=/d' existing.cron 2> /dev/null || true
sed -i '/^MAILTO=/d' tests.cron 2> /dev/null || true

if [[ "${_set_email}" == "true" ]]; then
# Replace the existing email in the crontab
# For scrontab, REPLYTO is already exported earlier; for crontab, set MAILTO
if [[ "${_verbose}" == "true" ]]; then
printf "Updating crontab/scrontab email to %s\n\n" "${_email}"
fi

if [[ "${_use_scron}" == true ]]; then
sed -i "s/.*--mail-user.*/#SCRON --mail-user=\"${_email}\"/" tests.cron
else
sed -i "s/^MAILTO.*/MAILTO=\"${_email}\"/" existing.cron
if [[ "${_use_scron}" == false ]]; then
# For regular crontab, set MAILTO at the top of final.cron
echo "MAILTO=\"${_email}\"" > final.cron
fi
else
# Preserve existing MAILTO if present with non-empty value (only for regular crontab)
if [[ "${_use_scron}" == false ]]; then
# Check if there was a MAILTO with a non-empty value in the original crontab
# Extract the email value from MAILTO="email" or MAILTO=email
if [[ -n "${existing_mailto}" ]]; then
# Extract email value between quotes or after =
existing_email=$(echo "${existing_mailto}" | sed -n 's/^MAILTO=["'\'']*\([^"'\'']*\)["'\'']*$/\1/p')
else
existing_email=""
fi

if [[ -n "${existing_email}" ]]; then
echo "${existing_mailto}" > final.cron
elif [[ -n "${REPLYTO:-}" ]]; then
echo "MAILTO=\"${REPLYTO}\"" > final.cron
else
echo "MAILTO=\"\"" > final.cron
fi
fi
fi

Expand All @@ -627,10 +681,12 @@ if [[ "${_update_cron}" == "true" ]]; then

${_crontab_cmd} final.cron
else
mail_warning
_message="Add the following to your crontab or scrontab to start running:"
_cron_tests=$(cat tests.cron)
_message="${_message}"$'\n'"${_cron_tests}"
echo "${_message}"
echo
if [[ "${_set_email}" == true ]]; then
final_message="${final_message:-}"$'\n'"${_message}"
fi
Expand Down
74 changes: 74 additions & 0 deletions dev/workflow/rocoto/rocoto_scron.sh.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#! /usr/bin/env bash
source "{{ HOMEgfs }}/dev/ush/gw_setup.sh"

# Run rocotorun
bash -c "{{ rocotorunstr }}"

# Monitor for failed jobs using rocotostat
LOCKFILE="{{ expdir }}/.failed_jobs.lock"
ROCOTOSTAT=$(command -v rocotostat)

if [[ -n "${ROCOTOSTAT}" ]]; then
# shellcheck disable=SC2312
FAILED_JOBS=$(${ROCOTOSTAT} -d "{{ expdir }}/{{ pslot }}.db" -w "{{ expdir }}/{{ pslot }}.xml" -c all 2> /dev/null | grep -E 'DEAD')

if [[ -n "${FAILED_JOBS}" ]]; then
# Read previously reported failures
PREV_FAILED=""
if [[ -f "${LOCKFILE}" ]]; then
PREV_FAILED=$(cat "${LOCKFILE}")
fi

# Check for NEW failures only (not just changes)
NEW_FAILURES=""
while IFS= read -r job; do
if [[ -n "${job}" ]] && ! echo "${PREV_FAILED}" | grep -qF "${job}"; then
NEW_FAILURES="${NEW_FAILURES}${job}"$'\n'
fi
done <<< "${FAILED_JOBS}"

# Send email only if there are NEW failures
if [[ -n "${NEW_FAILURES}" ]]; then
MSGFILE="/tmp/rocoto_fail_msg_$$.txt"
{
echo "The following jobs have failed in experiment {{ pslot }} on ${MACHINE_ID}:"
echo ""

# Format each failed job with detailed information
while IFS= read -r line; do
if [[ -n "${line}" ]]; then
# Parse rocotostat output: Cycle Task JobID State Try MaxTries Duration
read -r cycle task jobid state try maxtries duration <<< "${line}"
# Extract YYYYMMDDHH from cycle (first 10 characters)
cycle_short=${cycle:0:10}
# Get current timestamp
timestamp=$(date -u '+%m/%d/%y %H:%M:%S UTC')

# Format similar to user's example
echo "${timestamp} :: {{ pslot }}.xml :: Cycle ${cycle}, Task ${task}, jobid=${jobid}, in state ${state}, ran for ${duration} seconds, try=${try} (of ${maxtries})"
echo "Check log: {{ comroot }}/{{ pslot }}/logs/${cycle_short}/${task}.log"
echo ""
fi
done <<< "${NEW_FAILURES}"
} > "${MSGFILE}"

# Try to send email
EMAIL="{{ replyto }}"
hostname_domain=$(hostname -d)
FROM_EMAIL="no-reply@${hostname_domain}"
if [[ "${EMAIL}" != "None" ]] && command -v mail &> /dev/null; then
# On Gaea, the mail utility requires the -v (verbose) flag to ensure delivery.
# To avoid receiving verbose output as an actual email, a spoofed 'from' address is used for notifications.
mail -r "${FROM_EMAIL}" -v -s "[{{ pslot }}] Workflow Job Failures Detected" "${EMAIL}" < "${MSGFILE}" 2>&1
fi

rm -f "${MSGFILE}"
fi

# Always update lockfile to reflect current failures
echo "${FAILED_JOBS}" > "${LOCKFILE}"
else
# No failures, remove lockfile if it exists
[[ -f "${LOCKFILE}" ]] && rm -f "${LOCKFILE}"
fi
fi
50 changes: 37 additions & 13 deletions dev/workflow/rocoto/rocoto_xml.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from applications.applications import AppConfig
from workflow_suite import WorkflowSuite
from rocoto.workflow_tasks import get_wf_tasks
from wxflow import which, mkdir
from wxflow import which, mkdir, parse_j2tmpl
import rocoto.rocoto as rocoto
from abc import ABC, abstractmethod
from logging import getLogger
Expand Down Expand Up @@ -123,6 +123,36 @@ def write(self, xml_file: str = None, crontab_file: str = None):
if self._base["DO_ARCHCOM"] and self._base["ARCHCOM_TO"] == "globus_hpss":
self._write_server_crontab()

def _get_scron_script_content(self, rocotorunstr: str, replyto: str) -> str:
"""
Load and format the cron script template with experiment-specific values.

Parameters
----------
rocotorunstr : str
The rocotorun command string
replyto : str
Email address for notifications

Returns
-------
str
Formatted bash script content
"""
template_path = os.path.join(os.path.dirname(__file__), 'rocoto_scron.sh.j2')

# Format the template with experiment-specific values
context = {
'HOMEgfs': self.HOMEgfs,
'rocotorunstr': rocotorunstr,
'expdir': self.expdir,
'pslot': self.pslot,
'replyto': replyto,
'comroot': self._base.get('COMROOT')
}
template_content = parse_j2tmpl(template_path, context)
return template_content

def _write_xml(self, xml_file: str = None) -> None:

if xml_file is None:
Expand All @@ -147,8 +177,7 @@ def _write_crontab(self, crontab_file: str = None, cronint: int = 5) -> None:
rocotorunstr = f'{rocotoruncmd} -d {self.expdir}/{self.pslot}.db -w {self.expdir}/{self.pslot}.xml'
cronintstr = f'*/{cronint} * * * *'

replyto = os.environ.get('REPLYTO', "")

replyto = os.environ.get('REPLYTO', None)
crontab_strings = [
'',
f'#################### {self.pslot} ####################'
Expand All @@ -169,27 +198,22 @@ def _write_crontab(self, crontab_file: str = None, cronint: int = 5) -> None:
crontab_strings.extend([
f'#SCRON --partition={partition}',
f'#SCRON --account={account}',
f'#SCRON --mail-user={replyto}',
f'#SCRON --job-name={self.pslot}_scron',
f'#SCRON --output={self.expdir}/logs/scron.log',
'#SCRON --time=00:10:00',
'#SCRON --dependency=singleton'
f'#SCRON --time=00:10:00',
f'#SCRON --dependency=singleton'
])

# Now write the script that actually runs rocotorun
# Now write the script that actually runs rocotorun and monitors for failures
cron_cmd = f"{self.expdir}/{self.pslot}.scron.sh"
with open(cron_cmd, "w") as script_fh:
script_fh.write(
"#!/usr/bin/env bash\n" +
"set -x\n" +
f"source {self.HOMEgfs}/dev/ush/gw_setup.sh" + "\n" +
rocotorunstr + "\n"
)
script_fh.write(self._get_scron_script_content(rocotorunstr, replyto))

# Make the script executable
mode = os.stat(cron_cmd)
os.chmod(cron_cmd, mode.st_mode | stat.S_IEXEC)
else:
# For regular crontab, create a wrapper script with monitoring
cron_cmd = rocotorunstr
crontab_strings.extend([
'SHELL="/bin/bash"',
Expand Down
Loading