diff --git a/dev/jobs/JGLOBAL_ARCHIVE_VRFY b/dev/jobs/JGLOBAL_ARCHIVE_VRFY index e5965c22706..2e6e9a42d37 100755 --- a/dev/jobs/JGLOBAL_ARCHIVE_VRFY +++ b/dev/jobs/JGLOBAL_ARCHIVE_VRFY @@ -20,6 +20,12 @@ for grid in "0p25" "0p50" "1p00"; do "COMIN_ATMOS_GRIB_${grid}:COM_ATMOS_GRIB_GRID_TMPL" done +# GEFS-specific: Ensemble statistics path +if [[ "${RUN}" == "gefs" ]]; then + MEMDIR="ensstat" YMD=${PDY} HH=${cyc} GRID="1p00" declare_from_tmpl -rx \ + COMIN_ATMOS_ENSSTAT_1p00:COM_ATMOS_GRIB_GRID_TMPL +fi + ############################################################### # Run archive script ############################################################### diff --git a/dev/scripts/exglobal_archive_vrfy.py b/dev/scripts/exglobal_archive_vrfy.py index 5054908fc4d..634c1c1db34 100755 --- a/dev/scripts/exglobal_archive_vrfy.py +++ b/dev/scripts/exglobal_archive_vrfy.py @@ -3,6 +3,7 @@ import os from pygfs.task.archive import Archive +from pygfs.utils.archive_vars import ArchiveVrfyVars from wxflow import AttrDict, Logger, cast_strdict_as_dtypedict, logit, chdir # initialize root logger @@ -14,38 +15,17 @@ def main(): config = cast_strdict_as_dtypedict(os.environ) - # Instantiate the Archive object + # Instantiate the Archive task object archive = Archive(config) - # update these keys to be 3 digits if they are part of archive.task_config.keys - for key in ['OCNRES', 'ICERES']: - try: - archive.task_config[key] = f"{archive.task_config[key]:03d}" - except KeyError as ee: - logger.info(f"key ({key}) not found in archive.task_config!") - - # Pull out all the configuration keys needed to run the rest of archive steps - keys = ['current_cycle', 'RUN', 'PSLOT', 'ROTDIR', 'PARMgfs', - 'ARCDIR', 'MODE', 'DO_JEDIATMENS', 'DO_FIT2OBS', 'DO_JEDIATMVAR', 'FHMIN_GFS', - 'DO_JEDISNOWDA', 'DO_AERO_ANL', 'DO_PREP_OBS_AERO', 'NET', 'MODE', 'FHOUT_GFS', - 'FHMAX_HF_GFS', 'FHOUT_GFS', 'FHMAX_FITS', 'FHMAX', 'FHOUT', 'FHMAX_GFS', 'DO_GSISOILDA', 'DO_LAND_IAU'] - - archive_dict = AttrDict() - for key in keys: - try: - archive_dict[key] = archive.task_config[key] - except KeyError as ee: - logger.warning(f"WARNING: key ({key}) not found in archive.task_config!") - - # Also import all COMIN* and COMOUT* directory and template variables - for key in archive.task_config.keys(): - if key.startswith(("COM_", "COMIN_", "COMOUT_")): - archive_dict[key] = archive.task_config.get(key) + # Collect all archive variables in complete arch_dict for YAML templates + # Use static utility methods from ArchiveVrfyVars + arch_dict = ArchiveVrfyVars.get_all_yaml_vars(archive.task_config) - with chdir(config.ROTDIR): + # Pass arch_dict to configure_vrfy which will render the Jinja2 YAML + arcdir_set = archive.configure_vrfy(arch_dict) - # Determine which archives to create - arcdir_set = archive.configure_vrfy(archive_dict) + with chdir(config.ROTDIR): # Populate the product archive (ARCDIR) archive.execute_store_products(arcdir_set) diff --git a/dev/scripts/exglobal_enkf_earc_vrfy.py b/dev/scripts/exglobal_enkf_earc_vrfy.py index 973a4257b91..4315cf0ca69 100755 --- a/dev/scripts/exglobal_enkf_earc_vrfy.py +++ b/dev/scripts/exglobal_enkf_earc_vrfy.py @@ -3,6 +3,7 @@ import os from pygfs.task.archive import Archive +from pygfs.utils.archive_vars import ArchiveVrfyVars from wxflow import AttrDict, Logger, cast_strdict_as_dtypedict, chdir, logit # initialize root logger @@ -14,37 +15,20 @@ def main(): config = cast_strdict_as_dtypedict(os.environ) - # Instantiate the Archive object + # Instantiate the Archive task object archive = Archive(config) - # Pull out all the configuration keys needed to run the rest of archive steps - keys = ['current_cycle', 'RUN', 'PSLOT', 'ROTDIR', 'PARMgfs', - 'ARCDIR', 'MODE', 'DO_JEDIATMENS', 'DO_FIT2OBS', 'DO_JEDIATMVAR', - 'DO_JEDISNOWDA', 'DO_AERO_ANL', 'DO_PREP_OBS_AERO', 'NET', 'MODE', 'FHOUT_GFS', - 'FHMAX_HF_GFS', 'FHOUT_GFS', 'FHMAX_FITS', 'FHMAX', 'FHOUT', 'FHMAX_GFS', 'DO_GSISOILDA', 'DO_LAND_IAU'] + # Collect all archive variables in complete arch_dict for YAML templates + # Use static utility methods from ArchiveVrfyVars + arch_dict = ArchiveVrfyVars.get_all_yaml_vars(archive.task_config) - archive_dict = AttrDict() - for key in keys: - archive_dict[key] = archive.task_config.get(key) - if archive_dict[key] is None: - print(f"Warning: key ({key}) not found in task_config!") + # Pass arch_dict to configure_vrfy which will render the Jinja2 YAML + arcdir_set = archive.configure_vrfy(arch_dict) - # Also import all COMIN* directory and template variables - for key in archive.task_config.keys(): - if key.startswith("COMIN"): - archive_dict[key] = archive.task_config[key] + with chdir(config.ROTDIR): - cwd = os.getcwd() - - os.chdir(config.ROTDIR) - - # Determine which archives to create - arcdir_set = archive.configure_vrfy(archive_dict) - - # Populate the product archive (ARCDIR) - archive.execute_store_products(arcdir_set) - - os.chdir(cwd) + # Populate the product archive (ARCDIR) + archive.execute_store_products(arcdir_set) if __name__ == '__main__': diff --git a/parm/archive/enkf_arcdir.yaml.j2 b/parm/archive/enkf_arcdir.yaml.j2 new file mode 100644 index 00000000000..5219f035610 --- /dev/null +++ b/parm/archive/enkf_arcdir.yaml.j2 @@ -0,0 +1,25 @@ +# Variables provided by archive_vars.py: +# - cycle_HH, cycle_YMDH, cycle_YMD, head +# - ARCDIR +# - All COMIN_* paths + +# Ensemble (EnKF) archiving template +# Used for: enkfgdas, enkfgfs, enkfgcafs, enkfgcdas + +# Build head string (e.g., 'gfs.t00z.') +{% set head = RUN + ".t" + cycle_HH + "z." %} + +mkdir: + - "{{ ARCDIR }}" + +copy_opt: + # Ensemble analysis statistics + {% if DO_JEDIATMENS == True %} + - ["{{ COMIN_ATMOS_ANALYSIS_ENSSTAT }}/{{ head }}stat.atm.tar", + "{{ ARCDIR }}/atmensstat.{{ RUN }}.{{ cycle_YMDH }}"] + {% else %} + - ["{{ COMIN_ATMOS_ANALYSIS_ENSSTAT }}/{{ head }}enkfstat.txt", + "{{ ARCDIR }}/enkfstat.{{ RUN }}.{{ cycle_YMDH }}"] + - ["{{ COMIN_ATMOS_ANALYSIS_ENSSTAT }}/{{ head }}gsistat_ensmean.txt", + "{{ ARCDIR }}/gsistat.{{ RUN }}.{{ cycle_YMDH }}.ensmean"] + {% endif %} diff --git a/parm/archive/gcafs_arcdir.yaml.j2 b/parm/archive/gcafs_arcdir.yaml.j2 index 325ffba80e6..c7b3d8dff65 100644 --- a/parm/archive/gcafs_arcdir.yaml.j2 +++ b/parm/archive/gcafs_arcdir.yaml.j2 @@ -1,107 +1,58 @@ -{% set cycle_HH = current_cycle | strftime("%H") %} -{% set cycle_YMDH = current_cycle | to_YMDH %} -{% set cycle_YMD = current_cycle | to_YMD %} -{% set head = RUN + ".t" + cycle_HH + "z." %} - -# Select data to store in the ARCDIR and VFYARC from deterministic runs -# This file set will contain all source-destination pairs to send to the FileHandler for copying -{% set file_set = [] %} - -# Declare the VFYARC where Fit2Obs data will be sent -{% set VFYARC = ROTDIR ~ "/vrfyarch" %} +# Variables provided by archive_vars.py: +# - cycle_HH, cycle_YMDH, cycle_YMD, head +# - VFYARC +# - All COMIN_* paths -# Deterministic files -{% if "enkf" not in RUN %} - # Common files to be added to both the gcafs and gcdas keys below - {% set det_files = [] %} +# Deterministic GCAFS/GCDAS archiving template +# Used for: gcafs, gcdas - # Deterministic analysis files (generated for cycled experiments) - {% set det_anl_files = [] %} - - {% if DO_AERO_ANL %} - {% do det_anl_files.append([COMIN_CHEM_ANALYSIS ~ "/" ~ head ~ "aerostat.tgz", - ARCDIR ~ "/aerostat." ~ RUN ~ "." ~ cycle_YMDH ~ ".tgz"]) %} - {% endif %} +# Build head string (e.g., 'gfs.t00z.') +{% set head = RUN + ".t" + cycle_HH + "z." %} - {% if DO_PREP_OBS_AERO == True %} - {% do det_anl_files.append([COMIN_OBS ~ "/" ~ head ~ "aeroobs", - ARCDIR ~ "/aeroobs." ~ RUN ~ "." ~ cycle_YMDH]) %} - {% do det_anl_files.append([COMIN_OBS ~ "/" ~ head ~ "aeroawobs", - ARCDIR ~ "/aeroawobs." ~ RUN ~ "." ~ cycle_YMDH]) %} - {% endif %} +mkdir: + - "{{ ARCDIR }}" +{% if DO_FIT2OBS == True %} + - "{{ VFYARC }}/{{ RUN }}.{{ cycle_YMD }}/{{ cycle_HH }}" +{% endif %} - # GCAFS-specific files - {% set gfs_files = [] %} +copy_req: +{% if RUN == "gcafs" %} + # GCAFS forecast files - REQUIRED {% for fhr in range(0, FHMAX_GFS + 1, FHOUT_GFS) %} - {% do gfs_files.append([COMIN_ATMOS_GRIB_1p00 ~ "/" ~ head ~ "pres_a.1p00.f" ~ '%03d'|format(fhr) ~ ".grib2", - ARCDIR ~ "/pgbf" ~ '%02d'|format(fhr) ~ "." ~ RUN ~ "." ~ cycle_YMDH ~ ".grib2"]) %} + - ["{{ COMIN_ATMOS_GRIB_1p00 }}/{{ head }}pres_a.1p00.f{{ '%03d'|format(fhr) }}.grib2", + "{{ ARCDIR }}/pgbf{{ '%02d'|format(fhr) }}.{{ RUN }}.{{ cycle_YMDH }}.grib2"] {% endfor %} - # GCAFS Fit2Obs data - {% set fit2obs_files = [] %} - {% for fhr in range(0, FHMAX_FITS + 1, 6) %} - {% set sfcfile = "/" + head + "sfc.f" + '%03d'|format(fhr) + ".nc" %} - {% set sigfile = "/" + head + "atm.f" + '%03d'|format(fhr) + ".nc" %} - {% do fit2obs_files.append([COMIN_ATMOS_HISTORY ~ "/" ~ sfcfile, - VFYARC ~ "/" ~ RUN ~ "." ~ cycle_YMD ~ "/" ~ cycle_HH ~ "/" ~ sfcfile ]) %} - {% do fit2obs_files.append([COMIN_ATMOS_HISTORY ~ "/" ~ sigfile, - VFYARC ~ "/" ~ RUN ~ "." ~ cycle_YMD ~ "/" ~ cycle_HH ~ "/" ~ sigfile ]) %} - {% endfor %} + {% if DO_FIT2OBS == True %} + # GCAFS Fit2Obs data - REQUIRED if DO_FIT2OBS is enabled + {% for fhr in range(0, FHMAX_FITS + 1, 6) %} + - ["{{ COMIN_ATMOS_HISTORY }}/{{ head }}sfc.f{{ '%03d'|format(fhr) }}.nc", + "{{ VFYARC }}/{{ RUN }}.{{ cycle_YMD }}/{{ cycle_HH }}/{{ head }}sfc.f{{ '%03d'|format(fhr) }}.nc"] + - ["{{ COMIN_ATMOS_HISTORY }}/{{ head }}atm.f{{ '%03d'|format(fhr) }}.nc", + "{{ VFYARC }}/{{ RUN }}.{{ cycle_YMD }}/{{ cycle_HH }}/{{ head }}atm.f{{ '%03d'|format(fhr) }}.nc"] + {% endfor %} + {% endif %} +{% endif %} - # GCDAS-specific files - {% set gdas_files = [] %} +{% if RUN == "gcdas" %} + # GCDAS forecast files - REQUIRED {% for fhr in range(0, FHMAX + 1, FHOUT) %} - {% do gdas_files.append([COMIN_ATMOS_GRIB_1p00 ~ "/" ~ head ~ "pres_a.1p00.f" ~ '%03d'|format(fhr) ~ ".grib2", - ARCDIR ~ "/pgbf" ~ '%02d'|format(fhr) ~ "." ~ RUN ~ "." ~ cycle_YMDH ~ ".grib2"]) %} + - ["{{ COMIN_ATMOS_GRIB_1p00 }}/{{ head }}pres_a.1p00.f{{ '%03d'|format(fhr) }}.grib2", + "{{ ARCDIR }}/pgbf{{ '%02d'|format(fhr) }}.{{ RUN }}.{{ cycle_YMDH }}.grib2"] {% endfor %} +{% endif %} - # Now append the necessary file pairs to file_set - # Common deterministic files - {% set file_set = file_set + det_files %} - {% if MODE == "cycled" %} - {% set file_set = file_set + det_anl_files %} - {% endif %} - - # Run-specific deterministic files - {% if RUN == "gcafs" %} - {% set file_set = file_set + gfs_files %} - # Fit2Obs files - {% if DO_FIT2OBS == True %} - {% set file_set = file_set + fit2obs_files %} - {% endif %} - {% elif RUN == "gcdas" %} - {% set file_set = file_set + gdas_files %} +{% if MODE == "cycled" %} + # Deterministic analysis files (cycled mode only) - REQUIRED + {% if DO_AERO_ANL %} + - ["{{ COMIN_CHEM_ANALYSIS }}/{{ head }}aerostat.tgz", + "{{ ARCDIR }}/aerostat.{{ RUN }}.{{ cycle_YMDH }}.tgz"] {% endif %} -{% else %} # End of deterministic files - - # Ensemble analysis files - {% set enkf_files = [] %} - {% if DO_JEDIATMENS == True %} - {% do enkf_files.append([COMIN_ATMOS_ANALYSIS_ENSSTAT ~ "/" ~ head ~ "atmensstat", - ARCDIR ~ "/atmensstat." ~ RUN ~ "." ~ cycle_YMDH ]) %} - {% else %} - {% do enkf_files.append([COMIN_ATMOS_ANALYSIS_ENSSTAT ~ "/" ~ head ~ "enkfstat.txt", - ARCDIR ~ "/enkfstat." ~ RUN ~ "." ~ cycle_YMDH ]) %} - {% do enkf_files.append([COMIN_ATMOS_ANALYSIS_ENSSTAT ~ "/" ~ head ~ "gsistat_ensmean.txt", - ARCDIR ~ "/gsistat." ~ RUN ~ "." ~ cycle_YMDH ~ ".ensmean"]) %} + {% if DO_PREP_OBS_AERO == True %} + - ["{{ COMIN_OBS }}/{{ head }}aeroobs", + "{{ ARCDIR }}/aeroobs.{{ RUN }}.{{ cycle_YMDH }}"] + - ["{{ COMIN_OBS }}/{{ head }}aeroawobs", + "{{ ARCDIR }}/aeroawobs.{{ RUN }}.{{ cycle_YMDH }}"] {% endif %} - - # Construct the final file set - {% set file_set = file_set + enkf_files %} - {% endif %} - - -# Actually write the yaml -mkdir: - - "{{ ARCDIR }}" - - {% if DO_FIT2OBS == True %} - - "{{ VFYARC }}/{{ RUN }}.{{ cycle_YMD }}/{{ cycle_HH }}" - {% endif %} - -copy: - {% for source_dest_pair in file_set %} - - {{ source_dest_pair }} - {% endfor %} diff --git a/parm/archive/gefs_arcdir.yaml.j2 b/parm/archive/gefs_arcdir.yaml.j2 index 2f8da8bd444..8be329861ec 100644 --- a/parm/archive/gefs_arcdir.yaml.j2 +++ b/parm/archive/gefs_arcdir.yaml.j2 @@ -1,36 +1,20 @@ -{% set cycle_HH = current_cycle | strftime("%H") %} -{% set cycle_YMDH = current_cycle | to_YMDH %} -{% set cycle_YMD = current_cycle | to_YMD %} -{% set head = RUN + ".t" + cycle_HH + "z." %} - -# Declare the GEFS_ARCH where atmos data will be sent -{% set GEFS_ARCH = ROTDIR ~ "/gefsarch" %} - -{% set file_set = [] %} +# Variables provided by archive_vars.py: +# - cycle_HH, cycle_YMDH, cycle_YMD, head +# - VFYARC (archive directory) +# - COMIN_ATMOS_ENSSTAT_1p00 (calculated in Python with MEMDIR='ensstat') -{% set tmpl_dict = ({ '${ROTDIR}':ROTDIR, - '${RUN}':RUN, - '${YMD}':cycle_YMD, - '${HH}':cycle_HH, - '${GRID}': '1p00', - '${MEMDIR}': 'ensstat' }) %} +# Build head string (e.g., 'gfs.t00z.') +{% set head = RUN + ".t" + cycle_HH + "z." %} -{% set COMIN_ATMOS_ENSSTAT_1p00 = COM_ATMOS_GRIB_GRID_TMPL | replace_tmpl(tmpl_dict) %} +# Create directories first +mkdir: + - "{{ VFYARC }}" -# Select ensstat files to copy to the arcdir -{% set ensstat_files = [] %} -{% if path_exists(COMIN_ATMOS_ENSSTAT_1p00) %} +# Define all source-destination pairs for archiving +# Use copy_req for files that MUST exist (raise error if missing) +copy_req: + # GEFS ensemble mean forecast files - REQUIRED {% for fhr in range(FHMIN_GFS, FHMAX_GFS + FHOUT_GFS, FHOUT_GFS) %} - {% do ensstat_files.append([COMIN_ATMOS_ENSSTAT_1p00 ~ "/" ~ head ~ "mean.pres_." ~ - "1p00" ~ ".f" ~ '%03d'|format(fhr) ~ ".grib2", - GEFS_ARCH]) %} + - ["{{ COMIN_ATMOS_ENSSTAT_1p00 }}/{{ head }}mean.pres_.1p00.f{{ '%03d'|format(fhr) }}.grib2", + "{{ VFYARC }}/{{ head }}mean.pres_.1p00.f{{ '%03d'|format(fhr) }}.grib2"] {% endfor %} -{% endif %} -{% set file_set = ensstat_files %} -# Actually write the yaml -mkdir: - - "{{ GEFS_ARCH }}" -copy: - {% for source_dest_pair in file_set %} - - {{ source_dest_pair }} - {% endfor %} diff --git a/parm/archive/gfs_arcdir.yaml.j2 b/parm/archive/gfs_arcdir.yaml.j2 index ed873b21c8b..9fffe5a1b7e 100644 --- a/parm/archive/gfs_arcdir.yaml.j2 +++ b/parm/archive/gfs_arcdir.yaml.j2 @@ -1,164 +1,108 @@ -{% set cycle_HH = current_cycle | strftime("%H") %} -{% set cycle_YMDH = current_cycle | to_YMDH %} -{% set cycle_YMD = current_cycle | to_YMD %} -{% set head = RUN + ".t" + cycle_HH + "z." %} - -# Select data to store in the ARCDIR and VFYARC from deterministic runs -# This file set will contain all source-destination pairs to send to the FileHandler for copying -{% set file_set = [] %} +# Variables provided by archive_vars.py: +# - cycle_HH, cycle_YMDH, cycle_YMD, head +# - VFYARC, GEFS_ARCH +# - All COMIN_* paths -# Declare the VFYARC where Fit2Obs data will be sent -{% set VFYARC = ROTDIR ~ "/vrfyarch" %} +# Deterministic GFS/GDAS archiving template +# Used for: gfs, gdas -# Deterministic files -{% if "enkf" not in RUN %} - # Common files to be added to both the gfs and gdas keys below - {% set det_files = [] %} - # Cyclone forecasts, produced for both gdas and gfs cycles - ## Only created if tracking is on and there were systems to track - {% if path_exists(COMIN_ATMOS_TRACK ~ "/atcfunix." ~ RUN ~ "." ~ cycle_YMDH) %} - {% do det_files.append([COMIN_ATMOS_TRACK ~ "/atcfunix." ~ RUN ~ "." ~ cycle_YMDH, - ARCDIR ~"/atcfunix." ~ RUN ~ "." ~ cycle_YMDH ]) %} - {% do det_files.append([COMIN_ATMOS_TRACK ~ "/atcfunixp." ~ RUN ~ "." ~ cycle_YMDH, - ARCDIR ~ "/atcfunixp." ~ RUN ~ "." ~ cycle_YMDH]) %} - {% endif %} +# Build head string (e.g., 'gfs.t00z.') +{% set head = RUN + ".t" + cycle_HH + "z." %} - # Cyclone tracking data - {% for basin in ["epac", "natl"] %} - {% if path_exists(COMIN_ATMOS_TRACK + "/" + basin) %} - {% do det_files.append([COMIN_ATMOS_TRACK ~ "/" ~ basin, - ARCDIR ~ "/" ~ basin ]) %} - {% endif %} - {% endfor %} +mkdir: + - "{{ ARCDIR }}" +{% if DO_FIT2OBS == True %} + - "{{ VFYARC }}/{{ RUN }}.{{ cycle_YMD }}/{{ cycle_HH }}" +{% endif %} - # Deterministic analysis files (generated for cycled experiments) - {% set det_anl_files = [] %} - # Analysis data (if we are running in cycled mode) - {% do det_anl_files.append([COMIN_ATMOS_GRIB_1p00 ~ "/" ~ head ~ "pres_a.1p00.analysis.grib2", - ARCDIR ~ "/pgbanl." ~ RUN ~ "." ~ cycle_YMDH ~ ".grib2"]) %} +copy_opt: +# Cyclone tracking files (optional - only exist when storms are tracked) + - ["{{ COMIN_ATMOS_TRACK }}/atcfunix.{{ RUN }}.{{ cycle_YMDH }}", + "{{ ARCDIR }}/atcfunix.{{ RUN }}.{{ cycle_YMDH }}"] + - ["{{ COMIN_ATMOS_TRACK }}/atcfunixp.{{ RUN }}.{{ cycle_YMDH }}", + "{{ ARCDIR }}/atcfunixp.{{ RUN }}.{{ cycle_YMDH }}"] + +# Cyclone tracking data by basin +{% for basin in ["epac", "natl"] %} + - ["{{ COMIN_ATMOS_TRACK }}/{{ basin }}", + "{{ ARCDIR }}/{{ basin }}"] +{% endfor %} + +{% if MODE == "cycled" %} + # Deterministic analysis files (cycled mode only) + - ["{{ COMIN_ATMOS_GRIB_1p00 }}/{{ head }}pres_a.1p00.analysis.grib2", + "{{ ARCDIR }}/pgbanl.{{ RUN }}.{{ cycle_YMDH }}.grib2"] {% if DO_JEDIATMVAR == True %} - {% do det_anl_files.append([COMIN_ATMOS_ANALYSIS ~ "/" ~ head ~ "stat.atm.tar", - ARCDIR ~ "/atmstat." ~ RUN ~ "." ~ cycle_YMDH ]) %} + - ["{{ COMIN_ATMOS_ANALYSIS }}/{{ head }}stat.atm.tar", + "{{ ARCDIR }}/atmstat.{{ RUN }}.{{ cycle_YMDH }}"] {% else %} - {% do det_anl_files.append([COMIN_ATMOS_ANALYSIS ~ "/" ~ head ~ "gsistat.txt", - ARCDIR ~ "/gsistat." ~ RUN ~ "." ~ cycle_YMDH ]) %} + - ["{{ COMIN_ATMOS_ANALYSIS }}/{{ head }}gsistat.txt", + "{{ ARCDIR }}/gsistat.{{ RUN }}.{{ cycle_YMDH }}"] {% endif %} {% if DO_JEDISNOWDA == True %} - {% do det_anl_files.append([COMIN_SNOW_ANALYSIS ~ "/" ~ head ~ "snow_analysis.ioda_hofx.tar", - ARCDIR ~ "/snowstat." ~ RUN ~ "." ~ cycle_YMDH ~ ".tar"]) %} + - ["{{ COMIN_SNOW_ANALYSIS }}/{{ head }}snow_analysis.ioda_hofx.tar", + "{{ ARCDIR }}/snowstat.{{ RUN }}.{{ cycle_YMDH }}.tar"] {% endif %} {% if DO_AERO_ANL %} - {% do det_anl_files.append([COMIN_CHEM_ANALYSIS ~ "/" ~ head ~ "aerostat.tgz", - ARCDIR ~ "/aerostat." ~ RUN ~ "." ~ cycle_YMDH ~ ".tgz"]) %} + - ["{{ COMIN_CHEM_ANALYSIS }}/{{ head }}aerostat.tgz", + "{{ ARCDIR }}/aerostat.{{ RUN }}.{{ cycle_YMDH }}.tgz"] {% endif %} {% if DO_PREP_OBS_AERO == True %} - {% do det_anl_files.append([COMIN_OBS ~ "/" ~ head ~ "aeroobs", - ARCDIR ~ "/aeroobs." ~ RUN ~ "." ~ cycle_YMDH]) %} - {% do det_anl_files.append([COMIN_OBS ~ "/" ~ head ~ "aeroawobs", - ARCDIR ~ "/aeroawobs." ~ RUN ~ "." ~ cycle_YMDH]) %} + - ["{{ COMIN_OBS }}/{{ head }}aeroobs", + "{{ ARCDIR }}/aeroobs.{{ RUN }}.{{ cycle_YMDH }}"] + - ["{{ COMIN_OBS }}/{{ head }}aeroawobs", + "{{ ARCDIR }}/aeroawobs.{{ RUN }}.{{ cycle_YMDH }}"] {% endif %} +{% endif %} - # GFS-specific files - {% set gfs_files = [] %} +{% if RUN == "gfs" %} + # GFS forecast files {% for fhr in range(0, FHMAX_GFS + 1, FHOUT_GFS) %} - {% do gfs_files.append([COMIN_ATMOS_GRIB_1p00 ~ "/" ~ head ~ "pres_a.1p00.f" ~ '%03d'|format(fhr) ~ ".grib2", - ARCDIR ~ "/pgbf" ~ '%02d'|format(fhr) ~ "." ~ RUN ~ "." ~ cycle_YMDH ~ ".grib2"]) %} + - ["{{ COMIN_ATMOS_GRIB_1p00 }}/{{ head }}pres_a.1p00.f{{ '%03d'|format(fhr) }}.grib2", + "{{ ARCDIR }}/pgbf{{ '%02d'|format(fhr) }}.{{ RUN }}.{{ cycle_YMDH }}.grib2"] {% endfor %} - # Cyclone genesis data (only present if there are storms) - {% if path_exists(COMIN_ATMOS_GENESIS ~ "/storms.gfso.atcf_gen." ~ cycle_YMDH) %} - {% do gfs_files.append([COMIN_ATMOS_GENESIS ~ "/storms.gfso.atcf_gen." ~ cycle_YMDH, - ARCDIR ~ "/storms.gfso.atcf_gen." ~ cycle_YMDH ]) %} - {% do gfs_files.append([COMIN_ATMOS_GENESIS ~ "/storms.gfso.atcf_gen.altg." ~ cycle_YMDH, - ARCDIR ~ "/storms.gfso.atcf_gen.altg." ~ cycle_YMDH ]) %} - {% endif %} - - {% if path_exists(COMIN_ATMOS_GENESIS ~ "/trak.gfso.atcfunix." ~ cycle_YMDH) %} - {% do gfs_files.append([COMIN_ATMOS_GENESIS ~ "/trak.gfso.atcfunix." ~ cycle_YMDH, - ARCDIR ~ "/trak.gfso.atcfunix." ~ cycle_YMDH ]) %} - {% do gfs_files.append([COMIN_ATMOS_GENESIS ~ "/trak.gfso.atcfunix.altg." ~ cycle_YMDH, - ARCDIR ~ "/trak.gfso.atcfunix.altg." ~ cycle_YMDH ]) %} - {% endif %} + - ["{{ COMIN_ATMOS_GENESIS }}/storms.gfso.atcf_gen.{{ cycle_YMDH }}", + "{{ ARCDIR }}/storms.gfso.atcf_gen.{{ cycle_YMDH }}"] + - ["{{ COMIN_ATMOS_GENESIS }}/storms.gfso.atcf_gen.altg.{{ cycle_YMDH }}", + "{{ ARCDIR }}/storms.gfso.atcf_gen.altg.{{ cycle_YMDH }}"] + - ["{{ COMIN_ATMOS_GENESIS }}/trak.gfso.atcfunix.{{ cycle_YMDH }}", + "{{ ARCDIR }}/trak.gfso.atcfunix.{{ cycle_YMDH }}"] + - ["{{ COMIN_ATMOS_GENESIS }}/trak.gfso.atcfunix.altg.{{ cycle_YMDH }}", + "{{ ARCDIR }}/trak.gfso.atcfunix.altg.{{ cycle_YMDH }}"] + {% if DO_FIT2OBS == True %} # GFS Fit2Obs data - {% set fit2obs_files = [] %} - {% for fhr in range(0, FHMAX_FITS + 1, 6) %} - {% set sfcfile = "/" + head + "sfc.f" + '%03d'|format(fhr) + ".nc" %} - {% set sigfile = "/" + head + "atm.f" + '%03d'|format(fhr) + ".nc" %} - {% do fit2obs_files.append([COMIN_ATMOS_HISTORY ~ "/" ~ sfcfile, - VFYARC ~ "/" ~ RUN ~ "." ~ cycle_YMD ~ "/" ~ cycle_HH ~ "/" ~ sfcfile ]) %} - {% do fit2obs_files.append([COMIN_ATMOS_HISTORY ~ "/" ~ sigfile, - VFYARC ~ "/" ~ RUN ~ "." ~ cycle_YMD ~ "/" ~ cycle_HH ~ "/" ~ sigfile ]) %} - {% endfor %} + {% for fhr in range(0, FHMAX_FITS + 1, 6) %} + {% set sfcfile = head + "sfc.f" + '%03d'|format(fhr) + ".nc" %} + {% set sigfile = head + "atm.f" + '%03d'|format(fhr) + ".nc" %} + - ["{{ COMIN_ATMOS_HISTORY }}/{{ sfcfile }}", + "{{ VFYARC }}/{{ RUN }}.{{ cycle_YMD }}/{{ cycle_HH }}/{{ sfcfile }}"] + - ["{{ COMIN_ATMOS_HISTORY }}/{{ sigfile }}", + "{{ VFYARC }}/{{ RUN }}.{{ cycle_YMD }}/{{ cycle_HH }}/{{ sigfile }}"] + {% endfor %} + {% endif %} - # GDAS-specific files - {% set gdas_files = [] %} +{% elif RUN == "gdas" %} + # GDAS forecast files {% for fhr in range(0, FHMAX + 1, FHOUT) %} - {% do gdas_files.append([COMIN_ATMOS_GRIB_1p00 ~ "/" ~ head ~ "pres_a.1p00.f" ~ '%03d'|format(fhr) ~ ".grib2", - ARCDIR ~ "/pgbf" ~ '%02d'|format(fhr) ~ "." ~ RUN ~ "." ~ cycle_YMDH ~ ".grib2"]) %} + - ["{{ COMIN_ATMOS_GRIB_1p00 }}/{{ head }}pres_a.1p00.f{{ '%03d'|format(fhr) }}.grib2", + "{{ ARCDIR }}/pgbf{{ '%02d'|format(fhr) }}.{{ RUN }}.{{ cycle_YMDH }}.grib2"] {% endfor %} - {% do gdas_files.append([COMIN_ATMOS_ANALYSIS ~ "/" ~ head ~ "abias.txt", - ARCDIR ~ "/abias." ~ RUN ~ "." ~ cycle_YMDH ]) %} - {% do gdas_files.append([COMIN_ATMOS_ANALYSIS ~ "/" ~ head ~ "abias_pc.txt", - ARCDIR ~ "/abias_pc." ~ RUN ~ "." ~ cycle_YMDH ]) %} - {% do gdas_files.append([COMIN_ATMOS_ANALYSIS ~ "/" ~ head ~ "abias_air.txt", - ARCDIR ~ "/abias_air." ~ RUN ~ "." ~ cycle_YMDH ]) %} - {% do gdas_files.append([COMIN_ATMOS_ANALYSIS ~ "/" ~ head ~ "abias_int.txt", - ARCDIR ~ "/abias_int." ~ RUN ~ "." ~ cycle_YMDH ]) %} - {% do gdas_files.append([COMIN_ATMOS_ANALYSIS ~ "/" ~ head ~ "analysis.dtf.a006.nc", - ARCDIR ~ "/dtfanl." ~ RUN ~ "." ~ cycle_YMDH ~ ".nc"]) %} - - # Now append the necessary file pairs to file_set - # Common deterministic files - {% set file_set = file_set + det_files %} - {% if MODE == "cycled" %} - {% set file_set = file_set + det_anl_files %} - {% endif %} - - # Run-specific deterministic files - {% if RUN == "gfs" %} - {% set file_set = file_set + gfs_files %} - # Fit2Obs files - {% if DO_FIT2OBS == True %} - {% set file_set = file_set + fit2obs_files %} - {% endif %} - {% elif RUN == "gdas" %} - {% set file_set = file_set + gdas_files %} - {% endif %} - -{% else %} # End of deterministic files - - # Ensemble analysis files - {% set enkf_files = [] %} - {% if DO_JEDIATMENS == True %} - {% do enkf_files.append([COMIN_ATMOS_ANALYSIS_ENSSTAT ~ "/" ~ head ~ "stat.atm.tar", - ARCDIR ~ "/atmensstat." ~ RUN ~ "." ~ cycle_YMDH ]) %} - {% else %} - {% do enkf_files.append([COMIN_ATMOS_ANALYSIS_ENSSTAT ~ "/" ~ head ~ "enkfstat.txt", - ARCDIR ~ "/enkfstat." ~ RUN ~ "." ~ cycle_YMDH ]) %} - {% do enkf_files.append([COMIN_ATMOS_ANALYSIS_ENSSTAT ~ "/" ~ head ~ "gsistat_ensmean.txt", - ARCDIR ~ "/gsistat." ~ RUN ~ "." ~ cycle_YMDH ~ ".ensmean"]) %} - {% endif %} - - # Construct the final file set - {% set file_set = file_set + enkf_files %} + # GDAS bias correction files + - ["{{ COMIN_ATMOS_ANALYSIS }}/{{ head }}abias.txt", + "{{ ARCDIR }}/abias.{{ RUN }}.{{ cycle_YMDH }}"] + - ["{{ COMIN_ATMOS_ANALYSIS }}/{{ head }}abias_pc.txt", + "{{ ARCDIR }}/abias_pc.{{ RUN }}.{{ cycle_YMDH }}"] + - ["{{ COMIN_ATMOS_ANALYSIS }}/{{ head }}abias_air.txt", + "{{ ARCDIR }}/abias_air.{{ RUN }}.{{ cycle_YMDH }}"] + - ["{{ COMIN_ATMOS_ANALYSIS }}/{{ head }}abias_int.txt", + "{{ ARCDIR }}/abias_int.{{ RUN }}.{{ cycle_YMDH }}"] + - ["{{ COMIN_ATMOS_ANALYSIS }}/{{ head }}analysis.dtf.a006.nc", + "{{ ARCDIR }}/dtfanl.{{ RUN }}.{{ cycle_YMDH }}.nc"] {% endif %} - - -# Actually write the yaml -mkdir: - - "{{ ARCDIR }}" - - {% if DO_FIT2OBS == True %} - - "{{ VFYARC }}/{{ RUN }}.{{ cycle_YMD }}/{{ cycle_HH }}" - {% endif %} - -copy_opt: - {% for source_dest_pair in file_set %} - - {{ source_dest_pair }} - {% endfor %} diff --git a/ush/python/pygfs/task/archive.py b/ush/python/pygfs/task/archive.py index d6a60364041..389075472e2 100644 --- a/ush/python/pygfs/task/archive.py +++ b/ush/python/pygfs/task/archive.py @@ -72,7 +72,11 @@ def configure_vrfy(self, arch_dict: Dict[str, Any]) -> (Dict[str, Any]): archive_parm = os.path.join(arch_dict.PARMgfs, "archive") # Collect the dataset to archive locally - arcdir_j2yaml = os.path.join(archive_parm, f"{arch_dict.NET}_arcdir.yaml.j2") + # Select template based on RUN type: ensemble (enkf) or deterministic (NET) + if "enkf" in arch_dict.RUN: + arcdir_j2yaml = os.path.join(archive_parm, "enkf_arcdir.yaml.j2") + else: + arcdir_j2yaml = os.path.join(archive_parm, f"{arch_dict.NET}_arcdir.yaml.j2") # Add the glob.glob function for capturing log filenames arch_dict['glob'] = glob.glob diff --git a/ush/python/pygfs/utils/archive_vars.py b/ush/python/pygfs/utils/archive_vars.py new file mode 100644 index 00000000000..182d918e996 --- /dev/null +++ b/ush/python/pygfs/utils/archive_vars.py @@ -0,0 +1,186 @@ +#!/usr/bin/env python3 +""" +Archive Variables Utility Module + +Overview +-------- +This module provides utility functions to collect variables needed by YAML templates +for archiving verification (vrfy) data for GFS, GEFS, and GCAFS systems. File set +generation logic (loops, conditionals, path construction) is handled by the YAML +templates themselves. + +Architecture +------------ +Python provides VARIABLES → YAML templates build FILE SETS + +Python Code Responsibilities: + - Compute cycle-specific variables (cycle_HH, cycle_YMDH, cycle_YMD, head) + - Calculate COM directory paths with grid loops (0p25, 0p50, 1p00) + - Extract configuration keys (RUN, DO_* flags, FHMAX*, etc.) + +YAML Template Responsibilities (parm/archive/*_arcdir.yaml.j2): + - Build file sets with source → destination mappings + - Handle loops (forecast hours, grids, basins) + - Apply conditionals (DO_* flags, MODE, RUN type) + - Create mkdir lists for directory creation + +Key Functions +------------- +get_all_yaml_vars(config_dict): + Main entry point - collects all variables for YAML templates + +add_config_vars(config_dict): + Extracts configuration keys and COM* variables (created in job scripts) + +_get_cycle_vars(config_dict): + Computes cycle-specific variables (cycle_HH, cycle_YMDH, cycle_YMD, head) + +Logging +------- +All public operational functions are decorated with @logit(logger). +""" +import os +from logging import getLogger +from typing import Any, Dict +from wxflow import AttrDict, logit, to_YMD, to_YMDH + +logger = getLogger(__name__.split('.')[-1]) + + +class ArchiveVrfyVars: + """ + Utility class for collecting archive verification variables. + + This class provides variables for YAML templates that handle archiving + for three systems: + - GFS: Global Forecast System + - GEFS: Global Ensemble Forecast System + - GCAFS: Global Climate Analysis Forecast System + + The YAML templates (parm/archive/*_arcdir.yaml.j2) contain all file set + generation logic. This class only provides the variables they need. + """ + + @staticmethod + @logit(logger) + def get_all_yaml_vars(config_dict: AttrDict) -> Dict[str, Any]: + """Collect all variables needed for YAML templates. + + This method provides only the VARIABLES needed by the YAML templates + (cycle vars, COM paths, config keys). + + Parameters + ---------- + config_dict : AttrDict + Configuration dictionary from Archive.task_config + + Returns + ------- + Dict[str, Any] + Dictionary containing variables for Jinja2 templates: + - cycle_HH, cycle_YMDH, cycle_YMD, head: Cycle-specific variables + - COMIN_*, COMOUT_*, COM_*: All COM directory paths (from job scripts) + - Config keys: RUN, PSLOT, ROTDIR, DO_* flags, FHMAX*, etc. + """ + # Build arch_dict with variables for Jinja2 templates + arch_dict = AttrDict() + + # Add config variables (config keys, COM* variables from job scripts) + arch_dict.update(ArchiveVrfyVars.add_config_vars(config_dict)) + + # Add cycle-specific variables + arch_dict.update(ArchiveVrfyVars._get_cycle_vars(config_dict)) + + logger.info(f"Collected {len(arch_dict)} variables for YAML templates") + logger.debug(f"arch_dict keys: {list(arch_dict.keys())}") + + return arch_dict + + @staticmethod + @logit(logger) + def add_config_vars(config_dict: AttrDict) -> Dict[str, Any]: + """Collect configuration keys and COM* variables for archive operations. + + Formats resolution variables (OCNRES, ICERES) to 3 digits and extracts + all configuration keys and COM* directory paths needed for archiving. + + Parameters + ---------- + config_dict : AttrDict + Configuration dictionary from Archive.task_config + + Returns + ------- + Dict[str, Any] + Dictionary with config keys and all COM_*, COMIN_*, COMOUT_* variables + """ + general_dict = {} + + # Update resolution keys to be 3 digits if they are part of config_dict + for key in ['OCNRES', 'ICERES']: + if key in config_dict: + config_dict[key] = f"{config_dict[key]:03d}" + + # Configuration keys to extract (if present) + config_keys = ['current_cycle', 'RUN', 'PSLOT', 'ROTDIR', 'PARMgfs', + 'ARCDIR', 'MODE', 'DO_JEDIATMENS', 'DO_FIT2OBS', 'DO_JEDIATMVAR', + 'DO_JEDISNOWDA', 'DO_AERO_ANL', 'DO_PREP_OBS_AERO', 'NET', + 'FHOUT_GFS', 'FHMAX_HF_GFS', 'FHMAX_FITS', 'FHMAX', 'FHOUT', + 'FHMAX_GFS', 'DO_GSISOILDA', 'DO_LAND_IAU'] + + # Add FHMIN_GFS only if RUN does not contain 'enkf' + if 'enkf' not in config_dict.get('RUN', ''): + config_keys.append('FHMIN_GFS') + + # Extract keys if they exist in config_dict + for key in config_keys: + if key in config_dict: + general_dict[key] = config_dict[key] + else: + logger.warning(f"Config key '{key}' not found in config_dict; skipping.") + + # Import COM* directory and template variables created by job scripts + # Job scripts use declare_from_tmpl -rx which exports variables to environment + for key in config_dict.keys(): + if key.startswith(("COM_", "COMIN_", "COMOUT_")): + general_dict[key] = config_dict.get(key) + + logger.info(f"Collected {len(general_dict)} general archive variables") + logger.debug(f"General variables: {list(general_dict.keys())}") + + return general_dict + + @staticmethod + @logit(logger) + def _get_cycle_vars(config_dict: AttrDict) -> Dict[str, Any]: + """Calculate cycle-specific variables. + + Parameters + ---------- + config_dict : AttrDict + Configuration dictionary from Archive.task_config + + Returns + ------- + Dict[str, Any] + Dictionary containing: + - cycle_HH: Cycle hour (e.g., '00', '06') + - cycle_YMDH: Full cycle timestamp (YYYYMMDDHH) + - cycle_YMD: Cycle date (YYYYMMDD) + - head: System head designation (e.g., 'gfs.t00z.') + - VFYARC: Verification archive directory (ROTDIR/vrfyarch) + """ + current_cycle = config_dict.current_cycle + cycle_HH = current_cycle.strftime("%H") + cycle_YMDH = to_YMDH(current_cycle) + cycle_YMD = to_YMD(current_cycle) + + # Archive directory (used by all systems) + VFYARC = os.path.join(config_dict.ROTDIR, "vrfyarch") + + return { + 'cycle_HH': cycle_HH, + 'cycle_YMDH': cycle_YMDH, + 'cycle_YMD': cycle_YMD, + 'VFYARC': VFYARC + }