diff --git a/dev/jobs/JGLOBAL_ANALYSIS_STATS b/dev/jobs/JGLOBAL_ANALYSIS_STATS index 488392112be..21d624d4f8e 100755 --- a/dev/jobs/JGLOBAL_ANALYSIS_STATS +++ b/dev/jobs/JGLOBAL_ANALYSIS_STATS @@ -11,20 +11,34 @@ source "${HOMEgfs}/ush/jjob_header.sh" -e "anlstat" -c "base anlstat" ############################################## # Generate COM variables from templates -YMD=${PDY} HH=${cyc} declare_from_tmpl -rx COMIN_OBS:COM_OBS_TMPL \ + +YMD=${PDY} HH=${cyc} declare_from_tmpl -rx \ + COMIN_OBS:COM_OBS_TMPL \ + COMOUT_CONF:COM_CONF_TMPL \ COMIN_ATMOS_ANALYSIS:COM_ATMOS_ANALYSIS_TMPL \ - COMIN_OCEAN_ANALYSIS:COM_OCEAN_ANALYSIS_TMPL \ - COMIN_AERO_ANALYSIS:COM_CHEM_ANALYSIS_TMPL \ - COMIN_SNOW_ANALYSIS:COM_SNOW_ANALYSIS_TMPL \ COMOUT_ATMOS_ANALYSIS:COM_ATMOS_ANALYSIS_TMPL \ - COMOUT_OCEAN_ANALYSIS:COM_OCEAN_ANALYSIS_TMPL \ - COMOUT_AERO_ANALYSIS:COM_CHEM_ANALYSIS_TMPL \ - COMOUT_SNOW_ANALYSIS:COM_SNOW_ANALYSIS_TMPL \ - COMOUT_CONF:COM_CONF_TMPL \ - COMOUT_ATMOS_ANLMON:COM_ATMOS_ANLMON_TMPL \ - COMOUT_OCEAN_ANLMON:COM_OCEAN_ANLMON_TMPL \ - COMOUT_AERO_ANLMON:COM_CHEM_ANLMON_TMPL \ - COMOUT_SNOW_ANLMON:COM_SNOW_ANLMON_TMPL + COMOUT_ATMOS_ANLMON:COM_ATMOS_ANLMON_TMPL +mkdir -m 755 -p "${COMOUT_ATMOS_ANALYSIS}" +mkdir -m 755 -p "${COMOUT_ATMOS_ANLMON}" +mkdir -m 755 -p "${COMOUT_CONF}" + +if [[ "${DO_AERO_ANL:-NO}" == "YES" ]]; then + YMD=${PDY} HH=${cyc} declare_from_tmpl -rx \ + COMIN_AERO_ANALYSIS:COM_CHEM_ANALYSIS_TMPL \ + COMOUT_AERO_ANALYSIS:COM_CHEM_ANALYSIS_TMPL \ + COMOUT_AERO_ANLMON:COM_CHEM_ANLMON_TMPL + mkdir -m 755 -p "${COMOUT_AERO_ANALYSIS}" + mkdir -m 755 -p "${COMOUT_AERO_ANLMON}" +fi + +if [[ "${DO_JEDISNOWDA:-NO}" == "YES" ]]; then + YMD=${PDY} HH=${cyc} declare_from_tmpl -rx \ + COMIN_SNOW_ANALYSIS:COM_SNOW_ANALYSIS_TMPL \ + COMOUT_SNOW_ANALYSIS:COM_SNOW_ANALYSIS_TMPL \ + COMOUT_SNOW_ANLMON:COM_SNOW_ANLMON_TMPL + mkdir -m 755 -p "${COMOUT_SNOW_ANALYSIS}" + mkdir -m 755 -p "${COMOUT_SNOW_ANLMON}" +fi ############################################################### # Run relevant script diff --git a/dev/parm/config/gcafs/config.aeroanl.j2 b/dev/parm/config/gcafs/config.aeroanl.j2 index fb4a3de649f..09ca53ee0d8 100644 --- a/dev/parm/config/gcafs/config.aeroanl.j2 +++ b/dev/parm/config/gcafs/config.aeroanl.j2 @@ -23,7 +23,6 @@ export STATICB_TYPE='diffusion' export TASK_CONFIG_YAML="${PARMgfs}/gdas/aero/aero_det_config.yaml.j2" export OBS_LIST_YAML="${PARMgfs}/gdas/aero/aero_obs_list.yaml.j2" -export BIAS_FILES_YAML="${PARMgfs}/gdas/aero/aero_bias_files.yaml.j2" export io_layout_x="{{ IO_LAYOUT_X }}" export io_layout_y="{{ IO_LAYOUT_Y }}" diff --git a/dev/parm/config/gcafs/config.aeroanlgenb b/dev/parm/config/gcafs/config.aeroanlgenb index d9c8dbc2862..87353213761 100644 --- a/dev/parm/config/gcafs/config.aeroanlgenb +++ b/dev/parm/config/gcafs/config.aeroanlgenb @@ -10,7 +10,6 @@ source "${EXPDIR}/config.resources" aeroanlgenb export TASK_CONFIG_YAML="${PARMgfs}/gdas/aero/aero_bmat_config.yaml.j2" export OBS_LIST_YAML="${PARMgfs}/gdas/aero/aero_obs_list.yaml.j2" -export BIAS_FILES_YAML="${PARMgfs}/gdas/aero/aero_bias_files.yaml.j2" export aero_diffusion_iter=200 export aero_diffusion_horiz_len=300e3 diff --git a/dev/parm/config/gcafs/config.anlstat b/dev/parm/config/gcafs/config.anlstat index 2eb8bcabb1b..c8afee9d95c 100644 --- a/dev/parm/config/gcafs/config.anlstat +++ b/dev/parm/config/gcafs/config.anlstat @@ -8,7 +8,6 @@ echo "BEGIN: config.anlstat" # Get task specific resources source "${EXPDIR}/config.resources" anlstat -export JEDI_CONFIG_YAML="${PARMgfs}/gdas/anlstat/anlstat_jedi_config.yaml.j2" -export BASE_CONFIG_YAML="${PARMgfs}/gdas/anlstat/anlstat_base_config.yaml.j2" +export TASK_CONFIG_YAML="${PARMgfs}/gdas/anlstat/anlstat_config.yaml.j2" echo "END: config.anlstat" diff --git a/dev/parm/config/gfs/config.aeroanl.j2 b/dev/parm/config/gfs/config.aeroanl.j2 index ae4efd8bcc4..9ab7d302a7e 100644 --- a/dev/parm/config/gfs/config.aeroanl.j2 +++ b/dev/parm/config/gfs/config.aeroanl.j2 @@ -23,7 +23,6 @@ export STATICB_TYPE='diffusion' export TASK_CONFIG_YAML="${PARMgfs}/gdas/aero/aero_det_config.yaml.j2" export OBS_LIST_YAML="${PARMgfs}/gdas/aero/aero_obs_list.yaml.j2" -export BIAS_FILES_YAML="${PARMgfs}/gdas/aero/aero_bias_files.yaml.j2" export io_layout_x="{{ IO_LAYOUT_X }}" export io_layout_y="{{ IO_LAYOUT_Y }}" diff --git a/dev/parm/config/gfs/config.aeroanlgenb b/dev/parm/config/gfs/config.aeroanlgenb index d9c8dbc2862..87353213761 100644 --- a/dev/parm/config/gfs/config.aeroanlgenb +++ b/dev/parm/config/gfs/config.aeroanlgenb @@ -10,7 +10,6 @@ source "${EXPDIR}/config.resources" aeroanlgenb export TASK_CONFIG_YAML="${PARMgfs}/gdas/aero/aero_bmat_config.yaml.j2" export OBS_LIST_YAML="${PARMgfs}/gdas/aero/aero_obs_list.yaml.j2" -export BIAS_FILES_YAML="${PARMgfs}/gdas/aero/aero_bias_files.yaml.j2" export aero_diffusion_iter=200 export aero_diffusion_horiz_len=300e3 diff --git a/dev/parm/config/gfs/config.anlstat b/dev/parm/config/gfs/config.anlstat index 2eb8bcabb1b..c8afee9d95c 100644 --- a/dev/parm/config/gfs/config.anlstat +++ b/dev/parm/config/gfs/config.anlstat @@ -8,7 +8,6 @@ echo "BEGIN: config.anlstat" # Get task specific resources source "${EXPDIR}/config.resources" anlstat -export JEDI_CONFIG_YAML="${PARMgfs}/gdas/anlstat/anlstat_jedi_config.yaml.j2" -export BASE_CONFIG_YAML="${PARMgfs}/gdas/anlstat/anlstat_base_config.yaml.j2" +export TASK_CONFIG_YAML="${PARMgfs}/gdas/anlstat/anlstat_config.yaml.j2" echo "END: config.anlstat" diff --git a/dev/parm/config/gfs/config.atmanl.j2 b/dev/parm/config/gfs/config.atmanl.j2 index 29e7279bd63..fd396fe7538 100644 --- a/dev/parm/config/gfs/config.atmanl.j2 +++ b/dev/parm/config/gfs/config.atmanl.j2 @@ -15,7 +15,6 @@ export io_layout_x="{{ IO_LAYOUT_X }}" export io_layout_y="{{ IO_LAYOUT_Y }}" export TASK_CONFIG_YAML="${PARMgfs}/gdas/atm/atm_det_config.yaml.j2" -export BIAS_FILES_YAML="${PARMgfs}/gdas/atm/atm_bias_files.yaml.j2" export LOCALIZATION_TYPE="bump" diff --git a/dev/parm/config/gfs/config.atmensanl.j2 b/dev/parm/config/gfs/config.atmensanl.j2 index d6b869151ab..adb1108267f 100644 --- a/dev/parm/config/gfs/config.atmensanl.j2 +++ b/dev/parm/config/gfs/config.atmensanl.j2 @@ -16,6 +16,5 @@ export io_layout_x="{{ IO_LAYOUT_X }}" export io_layout_y="{{ IO_LAYOUT_Y }}" export TASK_CONFIG_YAML="${PARMgfs}/gdas/atm/atm_ens_config.yaml.j2" -export BIAS_FILES_YAML="${PARMgfs}/gdas/atm/atm_bias_files.yaml.j2" echo "END: config.atmensanl" diff --git a/dev/scripts/exglobal_analysis_stats.py b/dev/scripts/exglobal_analysis_stats.py index 8500bf98a00..3497a8afc0f 100755 --- a/dev/scripts/exglobal_analysis_stats.py +++ b/dev/scripts/exglobal_analysis_stats.py @@ -20,23 +20,24 @@ # Take configuration from environment and cast it as python dictionary config = cast_strdict_as_dtypedict(os.environ) - # Instantiate the atm analysis task - AnlStats = AnalysisStats(config) - # Create list based on DA components - AnlStats.task_config['STAT_ANALYSES'] = [] - if AnlStats.task_config.DO_AERO_ANL: - AnlStats.task_config['STAT_ANALYSES'].append('aero') - if AnlStats.task_config.DO_JEDISNOWDA: - AnlStats.task_config['STAT_ANALYSES'].append('snow') - if AnlStats.task_config.DO_JEDIATMVAR: - AnlStats.task_config['STAT_ANALYSES'].append('atmos') + config.STAT_ANALYSES = [] + if config.DO_AERO_ANL: + config.STAT_ANALYSES.append('aero') + if config.DO_JEDISNOWDA: + config.STAT_ANALYSES.append('snow') + if config.DO_JEDIATMVAR: + config.STAT_ANALYSES.append('atmos') else: - AnlStats.task_config['STAT_ANALYSES'].append('atmos_gsi') - AnlStats.convert_gsi_diags() + config.STAT_ANALYSES.append('atmos_gsi') + + # Instantiate the analysis stats task + AnlStats = AnalysisStats(config) # Initialize JEDI variational analysis + if not config.DO_JEDIATMVAR: + AnlStats.convert_gsi_diags() AnlStats.initialize() - for anl in AnlStats.task_config.STAT_ANALYSES: + for anl in config.STAT_ANALYSES: AnlStats.execute(anl) AnlStats.finalize(anl) diff --git a/parm/archive/enkf.yaml.j2 b/parm/archive/enkf.yaml.j2 index 73c92aeb806..b5ced5e5898 100644 --- a/parm/archive/enkf.yaml.j2 +++ b/parm/archive/enkf.yaml.j2 @@ -83,7 +83,7 @@ enkf: "correction_increment.yaml", "ensemble_recenter.yaml"] %} {% else %} - {% set da_stat_files = ["stat.atm.tar"]%} + {% set da_stat_files = ["atmos_analysis.ioda_hofx.ens_mean.tar.gz"]%} {% set da_conf_files = ["atmensanlletkf.yaml", "atmensanlfv3inc.yaml", "correction_increment.yaml", @@ -98,7 +98,7 @@ enkf: {% endfor %} {% if DO_JEDISNOWDA %} - - "{{ COMIN_SNOW_ANALYSIS_ENSSTAT | relpath(ROTDIR) }}/{{ head }}snow_analysis.ioda_hofx.ensmean.tar" + - "{{ COMIN_SNOW_ANALYSIS_ENSSTAT | relpath(ROTDIR) }}/{{ head }}snow_analysis.ioda_hofx.ensmean.tar.gz" {% for itile in range(1,7) %} # Snow analysis is 3dvar - "{{ COMIN_SNOW_ANALYSIS_ENSSTAT | relpath(ROTDIR) }}/{{ cycle_YMD }}.{{ cycle_HH }}0000.snow_increment.sfc_data.tile{{ itile }}.nc" diff --git a/parm/archive/gcafs_arcdir.yaml.j2 b/parm/archive/gcafs_arcdir.yaml.j2 index c7b3d8dff65..1216755bf65 100644 --- a/parm/archive/gcafs_arcdir.yaml.j2 +++ b/parm/archive/gcafs_arcdir.yaml.j2 @@ -45,7 +45,7 @@ copy_req: {% if MODE == "cycled" %} # Deterministic analysis files (cycled mode only) - REQUIRED {% if DO_AERO_ANL %} - - ["{{ COMIN_CHEM_ANALYSIS }}/{{ head }}aerostat.tgz", + - ["{{ COMIN_CHEM_ANALYSIS }}/{{ head }}aero_analysis.ioda_hofx.tar.gz", "{{ ARCDIR }}/aerostat.{{ RUN }}.{{ cycle_YMDH }}.tgz"] {% endif %} diff --git a/parm/archive/gdas.yaml.j2 b/parm/archive/gdas.yaml.j2 index c0be51bad2a..44299fdd945 100644 --- a/parm/archive/gdas.yaml.j2 +++ b/parm/archive/gdas.yaml.j2 @@ -85,7 +85,7 @@ gdas: {% if DO_JEDIATMVAR %} - "{{ COMIN_CONF | relpath(ROTDIR) }}/{{ head }}anlvar.atm.yaml" - "{{ COMIN_CONF | relpath(ROTDIR) }}/{{ head }}anlvar.fv3.atm.yaml" - - "{{ COMIN_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ head }}stat.atm.tar" + - "{{ COMIN_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ head }}atmos_analysis.ioda_hofx.tar.gz" - "{{ COMIN_ATMOS_ANLMON | relpath(ROTDIR) }}/{{ head }}atmos_analysis.ioda_hofx_stats.tar.gz" {% else %} - "{{ COMIN_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ head }}gsistat.txt" @@ -96,7 +96,8 @@ gdas: - "{{ COMIN_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ head }}abias_int.txt" {% endif %} {% if DO_AERO_ANL %} - - "{{ COMIN_CHEM_ANALYSIS | relpath(ROTDIR) }}/{{ head }}aerostat.tgz" + - "{{ COMIN_CHEM_ANALYSIS | relpath(ROTDIR) }}/{{ head }}aero_analysis.ioda_hofx.tar.gz" + - "{{ COMIN_CHEM_ANLMON | relpath(ROTDIR) }}/{{ head }}aero_analysis.ioda_hofx_stats.tar.gz" - "{{ COMIN_CONF | relpath(ROTDIR) }}/{{ head }}aerovar.yaml" {% for itile in range(1,7) %} - "{{ COMIN_CHEM_ANALYSIS | relpath(ROTDIR) }}/aeroinc.{{ cycle_YMD }}.{{ cycle_HH }}0000.fv_tracer.res.tile{{ itile }}.nc" @@ -106,13 +107,16 @@ gdas: - "{{ COMIN_OBS | relpath(ROTDIR) }}/{{ head }}aeroobs" - "{{ COMIN_OBS | relpath(ROTDIR) }}/{{ head }}aerorawobs" {% endif %} + {% if DO_JEDIOCNVAR %} + - "{{ COMIN_OCEAN_ANALYSIS | relpath(ROTDIR) }}/{{ head }}marine_analysis.ioda_hofx.tar.gz" + {% endif %} # Snow analysis data {% if DO_JEDISNOWDA %} {% for itile in range(1,7) %} - "{{ COMIN_SNOW_ANALYSIS | relpath(ROTDIR) }}/{{ current_cycle | to_fv3time }}.snow_increment.sfc_data.tile{{ itile }}.nc" {% endfor %} - - "{{ COMIN_SNOW_ANALYSIS | relpath(ROTDIR) }}/{{ head }}snow_analysis.ioda_hofx.tar" + - "{{ COMIN_SNOW_ANALYSIS | relpath(ROTDIR) }}/{{ head }}snow_analysis.ioda_hofx.tar.gz" - "{{ COMIN_CONF | relpath(ROTDIR) }}/{{ head }}snowanlvar.yaml" - "{{ COMIN_SNOW_ANLMON | relpath(ROTDIR) }}/{{ head }}snow_analysis.ioda_hofx_stats.tar.gz" {% endif %} diff --git a/parm/archive/gfs_arcdir.yaml.j2 b/parm/archive/gfs_arcdir.yaml.j2 index 9fffe5a1b7e..a61fd19b7d7 100644 --- a/parm/archive/gfs_arcdir.yaml.j2 +++ b/parm/archive/gfs_arcdir.yaml.j2 @@ -34,7 +34,7 @@ copy_opt: "{{ ARCDIR }}/pgbanl.{{ RUN }}.{{ cycle_YMDH }}.grib2"] {% if DO_JEDIATMVAR == True %} - - ["{{ COMIN_ATMOS_ANALYSIS }}/{{ head }}stat.atm.tar", + - ["{{ COMIN_ATMOS_ANALYSIS }}/{{ head }}atmos_analysis.ioda_hofx.tar.gz", "{{ ARCDIR }}/atmstat.{{ RUN }}.{{ cycle_YMDH }}"] {% else %} - ["{{ COMIN_ATMOS_ANALYSIS }}/{{ head }}gsistat.txt", @@ -47,10 +47,15 @@ copy_opt: {% endif %} {% if DO_AERO_ANL %} - - ["{{ COMIN_CHEM_ANALYSIS }}/{{ head }}aerostat.tgz", + - ["{{ COMIN_CHEM_ANALYSIS }}/{{ head }}aero_analysis.ioda_hofx.tar.gz", "{{ ARCDIR }}/aerostat.{{ RUN }}.{{ cycle_YMDH }}.tgz"] {% endif %} + {% if DO_JEDIOCNVAR %} + - ["{{ COMIN_OCEAN_ANALYSIS }}/{{ head }}marine_analysis.ioda_hofx.tar.gz", + "{{ ARCDIR }}/marinestat.{{ RUN }}.{{ cycle_YMDH }}.tgz"] + {% endif %} + {% if DO_PREP_OBS_AERO == True %} - ["{{ COMIN_OBS }}/{{ head }}aeroobs", "{{ ARCDIR }}/aeroobs.{{ RUN }}.{{ cycle_YMDH }}"] diff --git a/parm/archive/gfsa.yaml.j2 b/parm/archive/gfsa.yaml.j2 index 20196f68f5a..b64687b5b89 100644 --- a/parm/archive/gfsa.yaml.j2 +++ b/parm/archive/gfsa.yaml.j2 @@ -37,12 +37,15 @@ gfsa: {% if DO_JEDIATMVAR %} - "{{ COMIN_CONF | relpath(ROTDIR) }}/{{ head }}anlvar.atm.yaml" - "{{ COMIN_CONF | relpath(ROTDIR) }}/{{ head }}anlvar.fv3.atm.yaml" - - "{{ COMIN_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ head }}stat.atm.tar" + - "{{ COMIN_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ head }}atmos_analysis.ioda_hofx.tar.gz" {% else %} - "{{ COMIN_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ head }}gsistat.txt" {% endif %} {% if DO_AERO_ANL %} - - "{{ COMIN_CHEM_ANALYSIS | relpath(ROTDIR) }}/{{ head }}aerostat.tgz" + - "{{ COMIN_CHEM_ANALYSIS | relpath(ROTDIR) }}/{{ head }}aero_analysis.ioda_hofx.tar.gz" + {% endif %} + {% if DO_JEDIOCNVAR %} + - "{{ COMIN_OCEAN_ANALYSIS | relpath(ROTDIR) }}/{{ head }}marine_analysis.ioda_hofx.tar.gz" {% endif %} {% if DO_PREP_OBS_AERO %} - "{{ COMIN_OBS | relpath(ROTDIR) }}/{{ head }}aeroobs" @@ -54,7 +57,7 @@ gfsa: {% for itile in range(1,7) %} - "{{ COMIN_SNOW_ANALYSIS | relpath(ROTDIR) }}/{{ current_cycle | to_fv3time }}.snow_increment.sfc_data.tile{{ itile }}.nc" {% endfor %} - - "{{ COMIN_SNOW_ANALYSIS | relpath(ROTDIR) }}/{{ head }}snow_analysis.ioda_hofx.tar" + - "{{ COMIN_SNOW_ANALYSIS | relpath(ROTDIR) }}/{{ head }}snow_analysis.ioda_hofx.tar.gz" - "{{ COMIN_CONF | relpath(ROTDIR) }}/{{ head }}snowanlvar.yaml" {% endif %} diff --git a/parm/archive/master_gcdas.yaml.j2 b/parm/archive/master_gcdas.yaml.j2 index 88c704c5ef6..32814f4aeb3 100644 --- a/parm/archive/master_gcdas.yaml.j2 +++ b/parm/archive/master_gcdas.yaml.j2 @@ -14,7 +14,7 @@ datasets: - "logs/{{ cycle_YMDH }}/{{ RUN }}_aeroanlinit.log" - "logs/{{ cycle_YMDH }}/{{ RUN }}_aeroanlvar.log" - "logs/{{ cycle_YMDH }}/{{ RUN }}_aeroanlfinal.log" - - "{{ COMIN_CHEM_ANALYSIS | relpath(ROTDIR) }}/{{ head }}aerostat.tgz" + - "{{ COMIN_CHEM_ANALYSIS | relpath(ROTDIR) }}/{{ head }}aero_analysis.ioda_hofx.tar.gz" - "{{ COMIN_CONF | relpath(ROTDIR) }}/{{ head }}aerovar.yaml" {% for itile in range(1,7) %} - "{{ COMIN_CHEM_ANALYSIS | relpath(ROTDIR) }}/aeroinc.{{ cycle_YMD }}.{{ cycle_HH }}0000.fv_tracer.res.tile{{ itile }}.nc" diff --git a/sorc/gdas.cd b/sorc/gdas.cd index 58da1e3bcc8..ab8fadd714f 160000 --- a/sorc/gdas.cd +++ b/sorc/gdas.cd @@ -1 +1 @@ -Subproject commit 58da1e3bcc863fddaf32319efe6b409565dd568b +Subproject commit ab8fadd714fb891cc016a2112af131da94829951 diff --git a/ush/python/pygfs/jedi/jedi.py b/ush/python/pygfs/jedi/jedi.py index 2cb1d5a1aff..2762de36584 100644 --- a/ush/python/pygfs/jedi/jedi.py +++ b/ush/python/pygfs/jedi/jedi.py @@ -1,21 +1,22 @@ #!/usr/bin/env python3 +import glob +import gzip import os import tarfile from logging import getLogger from typing import List, Dict, Any, Optional -from pprint import pformat from jcb import render from wxflow import (AttrDict, FileHandler, Task, Executable, WorkflowException, WorkflowKeyError, WorkflowTypeError, - chdir, rm_p, + chdir, parse_j2yaml, save_as_yaml, logit) logger = getLogger(__name__.split('.')[-1]) -required_jedi_keys = ['rundir', 'exe_src', 'mpi_cmd'] -optional_jedi_keys = ['jedi_args', 'jcb_base_yaml', 'jcb_algo', 'jcb_algo_yaml'] +required_jedi_keys = ['jedi_app_name', 'rundir', 'exe_src', 'mpi_cmd', 'jcb_base_yaml'] +optional_jedi_keys = ['jedi_args', 'jcb_algo', 'jcb_algo_yaml', 'obs_list_yaml', 'bias_files_yaml'] class Jedi: @@ -24,63 +25,146 @@ class Jedi: """ @logit(logger, name="Jedi") - def __init__(self, config: Dict[str, Any]) -> None: + def __init__(self, config: Dict[str, Any], task_config: AttrDict) -> None: """Constructor for JEDI objects This method will construct a Jedi object. This includes: - create the jedi_config AttrDict and extend it with additional required entries - - save a copy of jedi_config + - create the jcb_config AttrDict by parsing the jcb_base_yaml and jcb_algo_yaml files + - save a copy of jedi_config and jcb_config Parameters ---------- - config: AttrDict - Attribute-dictionary of all configuration variables required for the Jedi class + config: Dict[str, Any] + Dictionary of all configuration variables required for the Jedi class + task_config: AttrDict + Attribute-dictionary of all configuration variables associated with a GDAS task. Returns ---------- None """ - # Make sure input dictionary for Jedi class constructor has the required keys - if 'yaml_name' not in config: - raise WorkflowKeyError(f"Required key 'yaml_name' not found in config") - for key in required_jedi_keys: - if key not in config: - raise WorkflowKeyError(f"Required key '{key}' not found in config") + # Parse inputs + # ------------ # Create the configuration dictionary for JEDI object local_dict = AttrDict( { - 'exe': config.exe_src, - 'yaml': os.path.join(config.rundir, config.yaml_name + '.yaml'), - 'input_config': None + 'exe_config_yaml': os.path.join(config.rundir, config.jedi_app_name + '.yaml'), } ) self.jedi_config = AttrDict(**config, **local_dict) + # Make sure input dictionary for Jedi class constructor has the required keys + for key in required_jedi_keys: + if key not in self.jedi_config: + raise WorkflowKeyError(f"Required key '{key}' not found in jedi_config") + # Set optional keys in jedi_config to None if not already present for key in optional_jedi_keys: if key not in self.jedi_config: self.jedi_config[key] = None - # Save a copy of jedi_config + # Makes sure either jcb_algo or jcb_algo_yaml is specified, but not both + if 'jcb_algo' not in config and 'jcb_algo_yaml' not in config: + raise WorkflowKeyError("Either 'jcb_algo' or 'jcb_algo_yaml' must be specified in JEDI config") + if 'jcb_algo' in config and 'jcb_algo_yaml' in config: + raise WorkflowKeyError("Either 'jcb_algo' or 'jcb_algo_yaml' must be specified in JEDI config, but not both") + + # Construct JCB config dictionary + # ------------------------------- + + # Render JCB base config YAML + self._jcb_base_config = parse_j2yaml(self.jedi_config.jcb_base_yaml, task_config) + + # Construct JCB config dictionary + if self.jedi_config.jcb_algo_yaml is not None: + # Render JCB algorithm config YAML if specified + self._jcb_algo_config = parse_j2yaml(self.jedi_config.jcb_algo_yaml, task_config) + if 'algorithm' not in self._jcb_algo_config: + raise WorkflowKeyError("JCB algorithm not specified in jcb_algo_yaml") + + self.jcb_config = AttrDict({**self._jcb_base_config, **self._jcb_algo_config}) + else: + self.jcb_config = AttrDict(self._jcb_base_config) + + # Set JCB algorithm if not already specified in JCB algorithm config YAML + if self.jedi_config.jcb_algo is not None: + self.jcb_config.algorithm = self.jedi_config.jcb_algo + + # Set observations list in JCB config if obs_list_yaml specified + if self.jedi_config.obs_list_yaml is not None: + self.jcb_config['observations'] = parse_j2yaml(self.jedi_config.obs_list_yaml, task_config)['observations'] + + # Set object attributes + # --------------------- + + # Set model attribute, checking that "app_path_observations" is present in jcb_config + if 'app_path_model' in self.jcb_config: + self.component = self.jcb_config['app_path_model'].split('/')[-1] + elif 'app_path_observations' in self.jcb_config: + self.component = self.jcb_config['app_path_observations'].split('/')[-1] + else: + raise WorkflowKeyError(f"Required key 'app_path_model' or 'app_path_observations' not found in JCB config") + + # Initialize JEDI application configuration dictionary to None + self.exe_config = None + + # Save a copy of jedi_config and jcb_config self._jedi_config = self.jedi_config.deepcopy() + self._jcb_config = self.jcb_config.deepcopy() + @staticmethod @logit(logger) - def initialize(self, task_config: AttrDict, clean_empty_obsspaces=False) -> None: + def get_jedi_dict(jedi_config_dict: dict, task_config: AttrDict, expected_block_names: Optional[list] = None): + """Get dictionary of Jedi objects from YAML specifying their configuration dictionaries + + Parameters + ---------- + jedi_config_dict : dict + dictionary parsed from a Jinja2-YAML file specifying configuration dictionaries for JEDI objects + expected_block_names (optional) : str + list of names of blocks expected to be in jedi_config_yaml YAML file + + Returns + ---------- + None + """ + + # Initialize dictionary of Jedi objects + jedi_dict = AttrDict() + + # Loop through dictionary of Jedi configuration dictionaries + for block_name in jedi_config_dict: + # jedi_app_name key is set to name for this block + jedi_config_dict[block_name]['jedi_app_name'] = block_name + + # Construct JEDI object + jedi_dict[block_name] = Jedi(jedi_config_dict[block_name], task_config) + + # Make sure jedi_dict has the blocks we expect + if expected_block_names: + for block_name in expected_block_names: + if block_name not in jedi_dict: + raise WorkflowKeyError(f"Expected block key {block_name} not present {jedi_config_yaml}") + + # Return dictionary of JEDI objects + return jedi_dict + + @logit(logger) + def initialize(self, clean_empty_obsspaces: Optional[bool] = False) -> None: """Initialize JEDI application This method will initialize a JEDI application. This includes: - generating JEDI input YAML config + - cleaning empty observation spaces from JEDI input config dictionary - saving JEDI input YAML config to run directory - - linking the JEDI executable to run directory Parameters ---------- - task_config: AttrDict - Attribute-dictionary of all configuration variables associated with a GDAS task. clean_empty_obsspaces: bool Flag to clean empty observation spaces from JEDI input configuration dictionary. Default is False. @@ -90,19 +174,19 @@ def initialize(self, task_config: AttrDict, clean_empty_obsspaces=False) -> None None """ - # Render JEDI config dictionary - logger.info(f"Generating JEDI YAML config: {self.jedi_config.yaml}") - self.jedi_config.input_config = self.render_jcb(task_config) - logger.debug(f"JEDI config:\n{pformat(self.jedi_config.input_config)}") + # Render JEDI executable config dictionary + logger.info(f"Generating JEDI YAML config: {self.jedi_config.exe_config_yaml}") + self.exe_config = self.render_jcb_template() + logger.debug(f"JEDI config:\n{self.exe_config}") - # Remove obs spaces from JEDI config dictionary with missing obs files + # Remove obs spaces from JEDI executable config dictionary with missing obs files if clean_empty_obsspaces: - logger.info(f"Clean empty obs spaces from JEDI YAML config: {self.jedi_config.yaml}") + logger.info(f"Clean empty obs spaces from JEDI YAML config: {self.jedi_config.exe_config_yaml}") self.clean_empty_obsspaces() - # Save JEDI config dictionary to YAML in run directory - logger.debug(f"Writing JEDI YAML config to: {self.jedi_config.yaml}") - save_as_yaml(self.jedi_config.input_config, self.jedi_config.yaml) + # Save JEDI exectuable config dictionary to YAML in run directory + logger.debug(f"Writing JEDI YAML config to: {self.jedi_config.exe_config_yaml}") + save_as_yaml(self.exe_config, self.jedi_config.exe_config_yaml) @logit(logger) def execute(self) -> None: @@ -117,14 +201,15 @@ def execute(self) -> None: None """ + # TODO: not sure if this chdir does anyththing chdir(self.jedi_config.rundir) exec_cmd = Executable(self.jedi_config.mpi_cmd) - exec_cmd.add_default_arg(self.jedi_config.exe) + exec_cmd.add_default_arg(self.jedi_config.exe_src) if self.jedi_config.jedi_args is not None: for arg in self.jedi_config.jedi_args: exec_cmd.add_default_arg(arg) - exec_cmd.add_default_arg(self.jedi_config.yaml) + exec_cmd.add_default_arg(self.jedi_config.exe_config_yaml) logger.info(f"Executing {exec_cmd}") try: @@ -133,102 +218,36 @@ def execute(self) -> None: raise WorkflowException(f"An error occurred during execution of {exec_cmd}:\n{e}") from e @logit(logger) - def render_jcb(self, task_config: AttrDict, algorithm_in: Optional[str] = None) -> AttrDict: + def render_jcb_template(self, algorithm_in: Optional[str] = None) -> AttrDict: """Compile a JEDI configuration dictionary from a template file and save to a YAML file Parameters ---------- - task_config : AttrDict - Dictionary of all configuration variables associated with a GDAS task. algorithm (optional) : str Name of the algorithm used to generate the JEDI configuration dictionary. It will override the algorithm set in the jedi_config.jcb_algo_yaml file. Returns ---------- - jedi_input_config: AttrDict + exe_config: AttrDict Attribute-dictionary of JEDI configuration rendered from a template. """ - # Fill JCB base YAML template and build JCB config dictionary - if self.jedi_config.jcb_base_yaml is not None: - jcb_config = parse_j2yaml(self.jedi_config.jcb_base_yaml, task_config) - else: - raise WorkflowKeyError("JCB base YAML not specified as key 'jcb_base_yaml' in JEDI-class config dictionary") - - # Add JCB algorithm YAML, if it exists, to JCB config dictionary - if self.jedi_config.jcb_algo_yaml is not None: - jcb_config.update(parse_j2yaml(self.jedi_config.jcb_algo_yaml, task_config)) - - # Set algorithm in JCB config dictionary (method input algorithm takes precedence) + # Set algorithm (method input algorithm takes precedence) if algorithm_in is not None: algorithm = algorithm_in - elif self.jedi_config.jcb_algo is not None: - algorithm = self.jedi_config.jcb_algo - elif 'algorithm' in jcb_config: - algorithm = jcb_config.algorithm + elif 'algorithm' in self.jcb_config: + algorithm = self.jcb_config['algorithm'] else: raise WorkflowKeyError("JCB algorithm not specified") - jcb_config['algorithm'] = algorithm # Generate JEDI YAML config by rendering JCB config dictionary try: - jedi_input_config = render(jcb_config) + exe_config = render({**self.jcb_config, **{'algorithm': algorithm}}) except Exception as e: raise WorkflowException(f"An error occurred while rendering JCB template for algorithm {algorithm}:\n{e}") from e - return jedi_input_config - - @staticmethod - @logit(logger) - def get_jedi_dict(jedi_config_dict: dict, task_config: AttrDict, expected_block_names: Optional[list] = None): - """Get dictionary of Jedi objects from YAML specifying their configuration dictionaries - - Parameters - ---------- - jedi_config_dict : dict - dictionary parsed from a J2-YAML file specifying configuration dictionaries for JEDI objects - task_config : str - attribute-dictionary of all configuration variables associated with a GDAS task - expected_block_names (optional) : str - list of names of blocks expected to be in jedi_config_yaml YAML file - - Returns - ---------- - None - """ - - # Initialize dictionary of Jedi objects - jedi_dict = AttrDict() - - # Loop through dictionary of Jedi configuration dictionaries - for block_name in jedi_config_dict: - # yaml_name key is set to name for this block - jedi_config_dict[block_name]['yaml_name'] = block_name - - # Make sure all required keys present - for key in required_jedi_keys: - if key not in jedi_config_dict[block_name]: - raise WorkflowKeyError(f"Required key {key} not found in {jedi_config_yaml} for block {block_name}.") - - # Set optional keys to None - for key in optional_jedi_keys: - if key not in jedi_config_dict[block_name]: - jedi_config_dict[block_name][key] = None - - # Construct JEDI object - jedi_dict[block_name] = Jedi(jedi_config_dict[block_name]) - - # Make sure jedi_dict has the blocks we expect - if expected_block_names: - for block_name in expected_block_names: - if block_name not in jedi_dict: - raise WorkflowKeyError(f"Expected block key {block_name} not present {jedi_config_yaml}") - if len(jedi_dict) > len(expected_block_names): - raise WorkflowException(f"{jedi_config_yaml} specifies more Jedi objects than expected.") - - # Return dictionary of JEDI objects - return jedi_dict + return exe_config @logit(logger) def clean_empty_obsspaces(self): @@ -246,7 +265,7 @@ def clean_empty_obsspaces(self): """ # Get observers from JEDI input config - observers = find_value_in_nested_dict(self.jedi_config.input_config, 'observers') + observers = find_value_in_nested_dict(self.exe_config, 'observers') # Check if observers list actually present if observers: @@ -257,7 +276,7 @@ def clean_empty_obsspaces(self): if os.path.isfile(fname): cleaned_observers.append(obs_space) else: - logger.warning(f"WARNING: {fname} does not exist, removing obs space") + logger.warning(f"{fname} does not exist, removing obs space") # Clear observers list in dictionary and replace with new list observers.clear() @@ -267,88 +286,242 @@ def clean_empty_obsspaces(self): if observers == []: logger.warning(f"No observers found in JEDI input config") - @staticmethod @logit(logger) - def remove_redundant(input_list: List) -> List: - """Remove reduncancies from list with possible redundant, non-mutable elements + def stage_obsdatain(self, comin) -> None: + """Stage observation input files specified in JCB configuration dictionary + + This method will stage observation data files specified in the JCB configuration + dictionary Parameters ---------- - input_list : List - List with possible redundant, non-mutable elements + comin: str + path to COM input directory Returns ---------- - output_list : List - Input list but with redundancies removed + None """ - output_list = [] - for item in input_list: - if item not in output_list: - output_list.append(item) + # Check that other required keys are present in jcb_config + for stem in ['obsdatain_path', 'obsdataout_path', 'obsdatain_prefix', 'obsdatain_suffix']: + key = f'{self.component}_{stem}' + if key not in self.jcb_config: + raise WorkflowKeyError(f"Required key {key} not found in JCB config") - return output_list + # Initialize FileHandler input dictionary + fh_dict = {'mkdir': [], 'copy_opt': []} + + # Make directories + fh_dict['mkdir'].append(self.jcb_config[f'{self.component}_obsdatain_path']) + fh_dict['mkdir'].append(self.jcb_config[f'{self.component}_obsdataout_path']) + + # Copy files + ob_dest = self.jcb_config[f'{self.component}_obsdatain_path'] + for observation_from_jcb in self.jcb_config['observations']: + # Observations + ob_src = os.path.join(comin, + self.jcb_config[f'{self.component}_obsdatain_prefix'] + + observation_from_jcb + + self.jcb_config[f'{self.component}_obsdatain_suffix']) + + fh_dict['copy_opt'].append([ob_src, ob_dest]) + + # Execute FileHandler sync + FileHandler(fh_dict).sync() + + @logit(logger) + def save_obsdataout(self, comout: str, archive_name: str) -> None: + """Archive observation output files and compress archive into COM directory + + Parameters + ---------- + comout: str + path to COM output directory + archive_name: str + name of output tar file + + Returns + ---------- + None + """ + + # Check that other required keys are present in jcb_config + for stem in ['obsdataout_path', 'obsdataout_prefix', 'obsdataout_suffix']: + key = f'{self.component}_{stem}' + if key not in self.jcb_config: + raise WorkflowKeyError(f"Required key {key} not found in JCB config") + + # Set paths of output tar files + tarball = os.path.join(self.jcb_config[f"{self.component}_obsdataout_path"], f"{archive_name}.tar.gz") + + # Create compressed tarball of obs output files in COM + logger.info(f"Archiving observation output files to {tarball}") + with tarfile.open(tarball, "w:gz") as archive: + for observation_from_jcb in self.jcb_config['observations']: + obsdataout_file = os.path.join(self.jcb_config[f"{self.component}_obsdataout_path"], + self.jcb_config[f"{self.component}_obsdataout_prefix"] + + observation_from_jcb + + self.jcb_config[f"{self.component}_obsdataout_suffix"]) + if os.path.exists(obsdataout_file): + logger.info(f"Adding observation output file {obsdataout_file} to {tarball}") + archive.add(obsdataout_file, arcname=os.path.basename(obsdataout_file)) + else: + logger.warning(f"Observation output file {obsdataout_file} does not exist and will be skipped") + + # Copy files to COM + FileHandler({'copy_opt': [[tarball, comout]]}).sync() - @staticmethod @logit(logger) - def extract_tar_from_filehandler_dict(filehandler_dict) -> None: - """Extract tarballs from FileHandler input dictionary + def stage_obsbiasin(self, comin) -> None: + """Stage bias correction files specified in JEDI input configuration dictionary - This method extracts files from tarballs specified in a FileHander - input dictionary for the 'copy' action. + This method will stage bias correction files specified in the JEDI input + configuration dictionary using a FileHandler object, and then extract the + bias correction files from the tar files. Parameters ---------- - filehandler_dict - Input dictionary for FileHandler + comin: str + path to COMIN directory Returns ---------- None """ - for item in filehandler_dict['copy']: - # Use the filename from the destination entry if it's a file path - # Otherwise, it's a directory, so use the source entry filename - if os.path.isfile(item[1]): - filename = os.path.basename(item[1]) - else: - filename = os.path.basename(item[0]) + # Check that other required keys are present in jcb_config + for stem in ['obsbiasin_path', 'obsbiasout_path', 'obsbiasin_prefix']: + key = f'{self.component}_{stem}' + if key not in self.jcb_config: + raise WorkflowKeyError(f"Required key {key} not found in JCB config") + + # Initialize FileHandler input dictionary + fh_dict = {'mkdir': [], 'copy_opt': []} + + # Make directories + fh_dict['mkdir'].append(self.jcb_config[f'{self.component}_obsbiasin_path']) + fh_dict['mkdir'].append(self.jcb_config[f'{self.component}_obsbiasout_path']) + + # Copy files + files_already_copied = [] + bias_dest = self.jcb_config[f'{self.component}_obsbiasin_path'] + for observation_from_jcb in self.jcb_config['observations']: + if observation_from_jcb in self.jcb_config.bias_files_dict and observation_from_jcb not in files_already_copied: + bias_src = os.path.join(comin, self.jcb_config[f'{self.component}_obsbiasin_prefix'] + self.jcb_config.bias_files_dict[observation_from_jcb]) + + fh_dict['copy_opt'].append([bias_src, bias_dest]) + + # Don't copy same file multiple times + files_already_copied.append(observation_from_jcb) + + # Execute FileHandler sync + FileHandler(fh_dict).sync() + + # Untar bias corrections + bias_file_list = [] + for ob in self.jcb_config['observations']: + if ob in self.jcb_config.bias_files_dict and not self.jcb_config.bias_files_dict[ob] in bias_file_list: + bias_file_list.append(self.jcb_config.bias_files_dict[ob]) + bias_file_path = os.path.join(self.jcb_config[f"{self.component}_obsbiasin_path"], + self.jcb_config[f"{self.component}_obsbiasin_prefix"] + self.jcb_config.bias_files_dict[ob]) + if os.path.exists(bias_file_path): + Jedi.extract_tar(bias_file_path) + else: + logger.warning(f"Bias correction file {bias_file_path} does not exist and will be skipped") - # Check if file is a tar ball - if os.path.splitext(filename)[1] == '.tar': - tar_file = f"{os.path.dirname(item[1])}/{filename}" + @logit(logger) + def save_obsbiasout(self, comout: str, archive_name: str) -> None: + """Tar bias correction files and into COM directory - # Extract tarball - logger.info(f"Extract files from {tar_file}") - extract_tar(tar_file) + Parameters + ---------- + comout: str + path to COM output directory + archive_name: str + name of output tar file + Returns + ---------- + None + """ -@logit(logger) -def extract_tar(tar_file: str) -> None: - """Extract files from a tarball + # Check that other required keys are present in jcb_config + for stem in ['obsbiasin_path', 'obsbiasout_path', 'obsbiasin_prefix', + 'obsbiasout_prefix', 'obsbiasout_suffix', 'obsbiascovout_suffix', + 'obstlapsein_suffix']: + key = f'{self.component}_{stem}' + if key not in self.jcb_config: + raise WorkflowKeyError(f"Required key {key} not found in JCB config") + + # Set paths of output tar files + tarball = f"{archive_name}.tar" + + # Get lists of files to put in tarballs + satlist = [] + satcovlist = [] + tlaplist = [] + for ob in self.jcb_config['observations']: + # Sat bias file + satfile = os.path.join(self.jcb_config[f"{self.component}_obsbiasout_path"], + self.jcb_config[f"{self.component}_obsbiasout_prefix"] + ob + self.jcb_config[f"{self.component}_obsbiasout_suffix"]) + if os.path.exists(satfile): + satlist.append(satfile) + + # Sat bias cov file + satcovfile = os.path.join(self.jcb_config[f"{self.component}_obsbiasout_path"], + self.jcb_config[f"{self.component}_obsbiasout_prefix"] + ob + self.jcb_config[f"{self.component}_obsbiascovout_suffix"]) + if os.path.exists(satcovfile): + satcovlist.append(satcovfile) + + # Temperature lapse rate file + tlapfile = os.path.join(self.jcb_config[f"{self.component}_obsbiasin_path"], + self.jcb_config[f"{self.component}_obsbiasin_prefix"] + ob + self.jcb_config[f"{self.component}_obstlapsein_suffix"]) + if os.path.exists(tlapfile): + tlaplist.append(tlapfile) + + # Create tarball of bias correction files + logger.info(f"Creating bias correction tarball {tarball}") + with tarfile.open(tarball, 'w') as bcor: + logger.info(f"Adding {bcor.getnames()}") + for satfile in satlist + satcovlist: + logger.info(f"Adding satellite bias correction file {satfile} to {tarball}") + bcor.add(satfile, arcname=os.path.basename(satfile)) + for tlapfile in tlaplist: + # Change GPREFIX to APREFIX in tlapse file name when adding to tarball + tlapfile_rename = tlapfile.replace(self.jcb_config[f"{self.component}_obsbiasin_prefix"], + self.jcb_config[f"{self.component}_obsbiasout_prefix"]) + logger.info(f"Adding temperature lapse rate file {tlapfile_rename} to {tarball}") + bcor.add(tlapfile, arcname=os.path.basename(tlapfile_rename)) + + # Copy files to COM + FileHandler({'copy_opt': [[tarball, comout]]}).sync() - This method extract files from a tarball + @staticmethod + @logit(logger) + def extract_tar(tar_file: str) -> None: + """Extract files from a tarball - Parameters - ---------- - tar_file - path/name of tarball + This method extract files from a tarball - Returns - ---------- - None - """ + Parameters + ---------- + tar_file + path/name of tarball + + Returns + ---------- + None + """ - # extract files from tar file - tar_path = os.path.dirname(tar_file) - try: - with tarfile.open(tar_file, "r") as tarball: - tarball.extractall(path=tar_path) - logger.info(f"Extract {tarball.getnames()}") - except Exception as e: - raise WorkflowException(f"An error occurred while extracting {tar_file}:\n{e}") from e + # extract files from tar file + tar_path = os.path.dirname(tar_file) + try: + with tarfile.open(tar_file, "r") as tarball: + tarball.extractall(path=tar_path) + logger.info(f"Extract {tarball.getnames()}") + except Exception as e: + raise WorkflowException(f"An error occurred while extracting {tar_file}:\n{e}") from e @logit(logger) diff --git a/ush/python/pygfs/task/aero_analysis.py b/ush/python/pygfs/task/aero_analysis.py index d5408e34e68..1b9e7b4926f 100644 --- a/ush/python/pygfs/task/aero_analysis.py +++ b/ush/python/pygfs/task/aero_analysis.py @@ -77,7 +77,7 @@ def __init__(self, config): # Create dictionary of Jedi objects expected_keys = ['aeroanlvar'] - self.jedi_dict = Jedi.get_jedi_dict(self.task_config.jedi_config, expected_keys) + self.jedi_dict = Jedi.get_jedi_dict(self.task_config.jedi_config, self.task_config, expected_keys) @logit(logger) def initialize(self) -> None: @@ -86,7 +86,8 @@ def initialize(self) -> None: This method will initialize a global aerosol analysis using JEDI. This includes: - stage input files from COM and create output directories - - extract bias corrections from tar files + - stage observation files + - stage bias correction files - initialize JEDI application """ @@ -94,13 +95,17 @@ def initialize(self) -> None: logger.info(f"Staging files from COM") FileHandler(self.task_config.data_in).sync() - # Extract bias corrections from tar files - logger.info(f"Extracting bias corrections from tar files") - self.untar_bias_corrections() + # Stage observation files + logger.info(f"Staging observation files") + self.jedi_dict['aeroanlvar'].stage_obsdatain(f"{self.task_config.COMIN_OBS}/chem") - # initialize JEDI variational application + # Stage bias correction files + logger.info(f"Staging bias correction files") + self.jedi_dict['aeroanlvar'].stage_obsbiasin(self.task_config.COMIN_CHEM_ANALYSIS_PREV) + + # Initialize JEDI variational application logger.info(f"Initializing JEDI variational DA application") - self.jedi_dict['aeroanlvar'].initialize(self.task_config, clean_empty_obsspaces=True) + self.jedi_dict['aeroanlvar'].initialize(clean_empty_obsspaces=True) @logit(logger) def execute(self, jedi_dict_key: str) -> None: @@ -125,23 +130,24 @@ def finalize(self) -> None: This method will finalize a global aerosol analysis using JEDI. This includes: - apply increments to the original RESTART files - - compress and tar output diag files in COM - - tar radiative bias correction files in COM + - archive, compress, and save diag files to COM + - archive and save radiative bias correction files to COM - save output files and YAMLs to COM - """ # ---- add increments to RESTART files logger.info('Adding increments to RESTART files') self._add_fms_cube_sphere_increments() - # Compress and tar diag files in COM directory - self.tar_diag_files(self.task_config.COMOUT_CHEM_ANALYSIS, - f"{self.task_config['APREFIX']}aerostat.tgz") + # Archive, compress, and save diag files in COM directory + logger.info(f"Saving observation diag files to COM") + self.jedi_dict['aeroanlvar'].save_obsdataout(self.task_config.COMOUT_CHEM_ANALYSIS, + f"{self.task_config.APREFIX}aero_analysis.ioda_hofx") - # Tar radiative bias correction files into COM directory - self.tar_radiative_bias_corrections(self.task_config.COMOUT_CHEM_ANALYSIS, - f"{self.task_config.APREFIX}aero_varbc_params.tar") + # Archive and save radiative bias correction files into COM directory + logger.info(f"Saving radiative bias correction files to COM") + self.jedi_dict['aeroanlvar'].save_obsbiasout(self.task_config.COMOUT_CHEM_ANALYSIS, + f"{self.task_config.APREFIX}aero_varbc_params") # Save files from COM logger.info(f"Saving files to COM") diff --git a/ush/python/pygfs/task/aero_bmatrix.py b/ush/python/pygfs/task/aero_bmatrix.py index 511b7dcdcf5..2b8332deb73 100644 --- a/ush/python/pygfs/task/aero_bmatrix.py +++ b/ush/python/pygfs/task/aero_bmatrix.py @@ -83,9 +83,9 @@ def initialize(self) -> None: # initialize JEDI applications logger.info(f"Initializing JEDI applications") - self.jedi_dict['aero_interpbkg'].initialize(self.task_config) - self.jedi_dict['aero_diagb'].initialize(self.task_config) - self.jedi_dict['aero_diffusion'].initialize(self.task_config) + self.jedi_dict['aero_interpbkg'].initialize() + self.jedi_dict['aero_diagb'].initialize() + self.jedi_dict['aero_diffusion'].initialize() @logit(logger) def execute(self) -> None: diff --git a/ush/python/pygfs/task/analysis.py b/ush/python/pygfs/task/analysis.py index 0111fca0444..4286b28f756 100644 --- a/ush/python/pygfs/task/analysis.py +++ b/ush/python/pygfs/task/analysis.py @@ -1,12 +1,9 @@ #!/usr/bin/env python3 -import glob -import gzip -from logging import getLogger import os -import tarfile +from logging import getLogger from typing import Any, Dict -from wxflow import (AttrDict, Task, WorkflowException, +from wxflow import (AttrDict, Task, add_to_datetime, to_timedelta, to_isotime, parse_j2yaml, logit) @@ -20,9 +17,9 @@ class Analysis(Task): """ @logit(logger, name="Analysis") def __init__(self, config: Dict[str, Any]): - """Constructor global atm analysis task + """Constructor global analysis task - This method will construct a global atm analysis task. + This method will construct a global analysis task. This includes: - extending the task_config attribute AttrDict to include parameters required for this task @@ -47,18 +44,6 @@ def __init__(self, config: Dict[str, Any]): for hour in self.task_config.IAUFHRS: _iau_times_iso.append(to_isotime(_window_begin + to_timedelta(f"{str(hour)}H") - to_timedelta(f"{self.task_config.assim_freq}H") / 2)) - # Get observations list from obs list yaml - if 'OBS_LIST_YAML' in self.task_config: - _observations = parse_j2yaml(self.task_config.OBS_LIST_YAML, self.task_config)['observations'] - else: - _observations = [] - - # Get bias correction dict from bias files yaml - if 'BIAS_FILES_YAML' in self.task_config: - _bias_files = parse_j2yaml(self.task_config.BIAS_FILES_YAML, self.task_config)['bias_files'] - else: - _bias_files = AttrDict - # Set prefix needed for GPREFIX, depedning on the model if self.task_config.NET == 'gcafs': _da_prefix = 'gcdas' @@ -80,8 +65,7 @@ def __init__(self, config: Dict[str, Any]): 'GPREFIX_ENS': f"enkf{_da_prefix}.t{self.task_config.previous_cycle.hour:02d}z.", 'OCNRES': f"{self.task_config.OCNRES:03d}", 'iau_times_iso': _iau_times_iso, - 'observations': _observations, - 'bias_files': _bias_files, + 'snow_bkg_path': os.path.join('.', 'bkg/'), # TODO: remove this line } )) @@ -96,121 +80,3 @@ def finalize(self) -> None: def clean(self) -> None: super().clean() - - @logit(logger) - def untar_bias_corrections(self) -> None: - """Extract bias correction files from tarballs - This method will extract bias correction files from tarballs - - Parameters - ---------- - None - - Returns - ---------- - None - """ - - bias_file_list = [] - for ob in self.task_config.observations: - if ob in self.task_config.bias_files and not self.task_config.bias_files[ob] in bias_file_list: - bias_file_list.append(self.task_config.bias_files[ob]) - bias_file_path = f'{self.task_config.DATA}/obs/{self.task_config.GPREFIX}{self.task_config.bias_files[ob]}' - if os.path.exists(bias_file_path): - extract_tar(bias_file_path) - else: - logger.warning(f"Bias correction file {bias_file_path} does not exist and will be skipped") - - @logit(logger) - def tar_diag_files(self, comout: str, tarball_name: str) -> None: - """Compress and tar diag files into COM directory - - Parameters - ---------- - comout: str - path to COM output directory - tarball_name: str - name of output tar file - - Returns - ---------- - None - """ - - # Set paths of output tar files - diagtar = os.path.join(comout, tarball_name) - - # Get lists of files to put in tarballs - diaglist = glob.glob(os.path.join(self.task_config.DATA, 'diags', 'diag*nc')) - - # Compress diag files - logger.info(f"Compressing {len(diaglist)} diag files") - for diagfile in diaglist: - with open(diagfile, 'rb') as f_in, gzip.open(f"{diagfile}.gz", 'wb') as f_out: - f_out.writelines(f_in) - - # Create tarball of compressed diag files in COM - logger.debug(f"Creating tarball {diagtar} with {len(diaglist)} compressed diag files") - with tarfile.open(diagtar, "w") as archive: - for diagfile in diaglist: - diaggzip = f"{diagfile}.gz" - archive.add(diaggzip, arcname=os.path.basename(diaggzip)) - - @logit(logger) - def tar_radiative_bias_corrections(self, comout: str, tarball_name: str) -> None: - """Tar radiative bias correction files and into COM directory - - Parameters - ---------- - comout: str - path to COM output directory - tarball_name: str - name of output tar file - - Returns - ---------- - None - """ - - # Set paths of output tar files - radtar = os.path.join(comout, tarball_name) - - # Get lists of files to put in tarballs - satlist = glob.glob(os.path.join(self.task_config.DATA, 'bc', '*satbias*nc')) - tlaplist = glob.glob(os.path.join(self.task_config.DATA, 'obs', '*tlapse.txt')) - - # Create tarball of radiance bias correction files - logger.info(f"Creating radiance bias correction tarball {radtar}") - with tarfile.open(radtar, 'w') as radbcor: - logger.info(f"Adding {radbcor.getnames()}") - for satfile in satlist: - radbcor.add(satfile, arcname=os.path.basename(satfile)) - for tlapfile in tlaplist: - # Change OPREFIX to APREFIX in tlapse file name when adding to tarball - radbcor.add(tlapfile, arcname=os.path.basename(tlapfile.replace(self.task_config.GPREFIX, self.task_config.APREFIX))) - - -@logit(logger) -def extract_tar(tar_file: str) -> None: - """Extract files from a tarball - - This method extract files from a tarball - - Parameters - ---------- - tar_file - path/name of tarball - - Returns - ---------- - None - """ - - # extract files from tar file - tar_path = os.path.dirname(tar_file) - try: - with tarfile.open(tar_file, "r") as tarball: - tarball.extractall(path=tar_path) - logger.info(f"Extract {tarball.getnames()}") - except Exception as e: - raise WorkflowException(f"An error occurred while extracting {tar_file}:\n{e}") from e diff --git a/ush/python/pygfs/task/analysis_stats.py b/ush/python/pygfs/task/analysis_stats.py index f13ad8a6af4..44c4f9cfcad 100644 --- a/ush/python/pygfs/task/analysis_stats.py +++ b/ush/python/pygfs/task/analysis_stats.py @@ -11,16 +11,16 @@ from wxflow import (AttrDict, FileHandler, - add_to_datetime, to_timedelta, - Task, + add_to_datetime, to_timedelta, to_YMDH, parse_j2yaml, logit) from pygfs.jedi import Jedi +from pygfs.task.analysis import Analysis logger = getLogger(__name__.split('.')[-1]) -class AnalysisStats(Task): +class AnalysisStats(Analysis): """ Class for JEDI-based global analysis stats tasks """ @@ -36,26 +36,44 @@ def __init__(self, config: Dict[str, Any]): ---------- config: Dict dictionary object containing task configuration + analysis: str + type of analysis stats to be performed Returns ---------- None """ super().__init__(config) - _window_begin = add_to_datetime(self.task_config.current_cycle, -to_timedelta(f"{self.task_config.assim_freq}H") / 2) + _outdir = { + 'atmos': self.task_config.COMOUT_ATMOS_ANLMON, + 'atmos_gsi': self.task_config.COMOUT_ATMOS_ANLMON, + } + _anldir = { + 'atmos': self.task_config.COMOUT_ATMOS_ANALYSIS, + 'atmos_gsi': self.task_config.COMOUT_ATMOS_ANALYSIS, + } + if self.task_config.DO_AERO_ANL: + _outdir['aero'] = self.task_config.COMOUT_AERO_ANLMON + _anldir['aero'] = self.task_config.COMOUT_AERO_ANALYSIS + if self.task_config.DO_JEDISNOWDA: + _outdir['snow'] = self.task_config.COMOUT_SNOW_ANLMON + _anldir['snow'] = self.task_config.COMOUT_SNOW_ANALYSIS # Create a local dictionary that is repeatedly used across this class - local_dict = AttrDict( + self.task_config.update(AttrDict( { - 'STAT_WINDOW_BEGIN': _window_begin, - 'STAT_WINDOW_LENGTH': f"PT{self.task_config.assim_freq}H", - 'OPREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", - 'APREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", - 'GPREFIX': f"gdas.t{self.task_config.previous_cycle.hour:02d}z." + # + 'outdir': _outdir, + 'anldir': _anldir, } - ) - # Extend task_config with local_dict - self.task_config = AttrDict(**self.task_config, **local_dict) + )) + + # Extend task_config with content of config yaml for this task + self.task_config.update(parse_j2yaml(self.task_config.TASK_CONFIG_YAML, self.task_config)) + + # Create dictionary of Jedi objects + expected_keys = self.task_config.STAT_ANALYSES + self.jedi_dict = Jedi.get_jedi_dict(self.task_config.jedi_config, self.task_config, expected_keys) @logit(logger) def initialize(self) -> None: @@ -71,76 +89,33 @@ def initialize(self) -> None: ---------- None """ - # Create dictionary of Jedi objects - # Expected keys are what must be included from the JEDI config file. We can - # then loop through ob space list from scripts/exglobal_analysis_stats.py - expected_keys = ['aero', 'atmos', 'atmos_gsi', 'snow'] - jedi_config_dict = parse_j2yaml(self.task_config.JEDI_CONFIG_YAML, self.task_config) - self.jedi_dict = Jedi.get_jedi_dict(jedi_config_dict, self.task_config, expected_keys) - - logger.info(f"Copying files to {self.task_config.DATA}/stats") - - # Extract info from stat config file - analysis_config_dict = parse_j2yaml(self.task_config.BASE_CONFIG_YAML, self.task_config) - # Loop through a copy of ob space list - for analysis in self.task_config.STAT_ANALYSES[:]: + for analysis in self.task_config.STAT_ANALYSES: + # Loop through a copy of ob space list logger.info(f"Working on analysis type: {analysis}") - # Copy stat files to DATA path - input_tar = os.path.join(analysis_config_dict[analysis]['stat_file_path'], - f"{self.task_config['APREFIX']}{analysis_config_dict[analysis]['stat_file_name']}") - diag_dir_path = os.path.join(self.task_config.DATA, analysis) - - dest = os.path.join(diag_dir_path, analysis_config_dict[analysis]['stat_file_name']) - logger.info(f"Copying {input_tar} to {dest} ...") - tarball_list = [[input_tar, dest]] - FileHandler({'mkdir': [diag_dir_path], 'copy': tarball_list}).sync() - - # Open tar file - logger.info(f"Open tarred diagnostic files in {dest}") - with tarfile.open(dest, "r") as tar: - # Check if tar file is empty - if not tar.getnames(): - logger.warning(f"WARNING. The tar file {dest} is empty. No files to extract.") - logger.warning("Moving to next analysis ...") - # Remove analysis from STAT_ANALYSES and move to next - self.task_config.STAT_ANALYSES.remove(analysis) - logger.info(f"current analysis list: {self.task_config.STAT_ANALYSES}") - continue - # Extract all files to the current directory - tar.extractall(path=diag_dir_path) + # Stage files from COM + logger.info(f"Staging files from COM and creating output directories") + FileHandler(self.task_config.data_in).sync() + + # Extract diag tar file + jcb_config = self.jedi_dict[analysis].jcb_config + component = self.jedi_dict[analysis].component + diag_archive = os.path.join(jcb_config[f"{component}_obsdatain_path"], + f"{self.task_config.APREFIX}{analysis}_analysis.ioda_hofx.tar.gz") + Jedi.extract_tar(diag_archive) - self.task_config.OBSSPACES_LIST = [] - for obsspace_dict in analysis_config_dict[analysis]['obs spaces']: - # Gunzip .nc files - gz_file = os.path.join(diag_dir_path, (obsspace_dict['input file'] + ".gz")) - - # Check if the file exists - if os.path.exists(gz_file): - logger.info(f"Now processing {gz_file}") - output_file = os.path.join(diag_dir_path, obsspace_dict['input file']) - # Open the .gz file - with gzip.open(gz_file, 'rb') as f_in: - with open(output_file, 'wb') as f_out: - f_out.write(f_in.read()) - self.task_config.OBSSPACES_LIST.append(obsspace_dict['name']) - else: - logger.warning(f"WARNING. {gz_file} does not exist to extract.") - logger.warning("Moving to next obs space ...") - continue # Skip current obs space and move to next - - # initialize JEDI application + # Initialize JEDI application logger.info(f"Initializing JEDI ioda-stats extraction application") - self.jedi_dict[analysis].initialize(self.task_config) + self.jedi_dict[analysis].initialize(clean_empty_obsspaces=True) @logit(logger) - def execute(self, jedi_dict_key: str) -> None: + def execute(self, analysis: str) -> None: """Execute JEDI application of analysis stats Parameters ---------- - jedi_dict_key + analysis key specifying particular Jedi object in self.jedi_dict Returns @@ -148,10 +123,10 @@ def execute(self, jedi_dict_key: str) -> None: None """ - self.jedi_dict[jedi_dict_key].execute() + self.jedi_dict[analysis].execute() @logit(logger) - def finalize(self, jedi_dict_key: str) -> None: + def finalize(self, analysis: str) -> None: """Finalize the analysis statistics job. This method will finalize the analysis statistics job using JEDI. @@ -161,7 +136,7 @@ def finalize(self, jedi_dict_key: str) -> None: Parameters ---------- - jedi_dict_key + analysis key specifying particular Jedi object in self.jedi_dict Returns @@ -169,52 +144,28 @@ def finalize(self, jedi_dict_key: str) -> None: None """ - analysis_config_dict = parse_j2yaml(self.task_config.BASE_CONFIG_YAML, self.task_config) - - if jedi_dict_key == 'atmos_gsi': - outdir = self.task_config['COMOUT_ATMOS_ANLMON'] - anldir = self.task_config['COMOUT_ATMOS_ANALYSIS'] - else: - outdir = self.task_config['COMOUT_' + jedi_dict_key.upper() + '_ANLMON'] - anldir = self.task_config['COMOUT_' + jedi_dict_key.upper() + '_ANALYSIS'] - # Check if the directory exists; if not, create it - if not os.path.exists(outdir): - FileHandler({'mkdir': [outdir]}).sync() - - copy_list = [] - for obsspace_dict in analysis_config_dict[jedi_dict_key]['obs spaces']: - statfile = os.path.join(self.task_config.DATA, obsspace_dict['output file']) - dest = os.path.join(outdir, f"{obsspace_dict['output file']}") - copy_list.append((statfile, dest)) - FileHandler({'copy_opt': copy_list}).sync() - - # path of output tar statfile - iodastatzipfile = os.path.join(outdir, f"{self.task_config.APREFIX}{jedi_dict_key}_analysis.ioda_hofx_stats.tar.gz") - - logger.info(f"Compressing ioda-stats generated files to {iodastatzipfile}") - - # get list of iodastat files to put in tarball - iodastatfiles = glob.glob(os.path.join(outdir, '*output*nc')) - - logger.info(f"Gathering {len(iodastatfiles)} ioda-stat files to {iodastatzipfile}") - - with tarfile.open(iodastatzipfile, "w|gz") as archive: - for targetfile in iodastatfiles: - archive.add(targetfile, arcname=os.path.basename(targetfile)) - - # concatenate text files into one summary file - summaryfile = os.path.join(anldir, f"{self.task_config.APREFIX}{jedi_dict_key}_stats.txt") - with open(summaryfile, 'w') as outfile: - for obsspace_dict in analysis_config_dict[jedi_dict_key]['obs spaces']: - obsspace_name = obsspace_dict['name'] - textfile = os.path.join(self.task_config.DATA, f"{obsspace_name}_ioda_stats.txt") - if os.path.exists(textfile): - logger.info(f"Concatenating {textfile} to {summaryfile}") - with open(textfile, 'r') as infile: - outfile.write(infile.read()) - else: - logger.warning(f"WARNING: {textfile} does not exist to concatenate.") - logger.warning("Skipping this file ...") + for analysis in self.task_config.STAT_ANALYSES: + self.jedi_dict[analysis].save_obsdataout(self.task_config.outdir[analysis], + f"{self.task_config.APREFIX}{analysis}_analysis.ioda_hofx_stats") + + # concatenate text files into one summary file + jcb_config = self.jedi_dict[analysis].jcb_config + component = self.jedi_dict[analysis].component + summaryfile = os.path.join(jcb_config[f"{component}_obsdataout_path"], f"{self.task_config.APREFIX}{analysis}_stats.txt") + with open(summaryfile, 'w') as outfile: + for ob in self.jedi_dict[analysis].jcb_config.observations: + textfile = os.path.join(jcb_config[f"{component}_obsdataout_path"], f"{ob}_ioda_stats.txt") + if os.path.exists(textfile): + logger.info(f"Concatenating {textfile} to {summaryfile}") + with open(textfile, 'r') as infile: + outfile.write(infile.read()) + else: + logger.warning(f"{textfile} does not exist to concatenate.") + logger.warning("Skipping this file ...") + + # Save files from COM + logger.info(f"Saving files to COM") + FileHandler(self.task_config.data_out).sync() @logit(logger) def convert_gsi_diags(self) -> None: @@ -237,19 +188,19 @@ def convert_gsi_diags(self) -> None: logger.info("Converting GSI diag files to IODA files for analysis stats") # copy GSI diag files to DATA path diag_tars = ['cnvstat', 'radstat', 'oznstat'] - diag_dir_ges_path = os.path.join(self.task_config.DATA, 'atmos_gsi_ges') - diag_dir_anl_path = os.path.join(self.task_config.DATA, 'atmos_gsi_anl') - diag_dir_path = os.path.join(self.task_config.DATA, 'atmos_gsi_diags') + diag_dir_ges_path = os.path.join(self.task_config.DATA, 'atmos_gsi', 'atmos_gsi_ges') + diag_dir_anl_path = os.path.join(self.task_config.DATA, 'atmos_gsi', 'atmos_gsi_anl') + diag_dir_path = os.path.join(self.task_config.DATA, 'atmos_gsi', 'atmos_gsi_diags') FileHandler({'mkdir': [diag_dir_path, diag_dir_ges_path, diag_dir_anl_path]}).sync() - diag_ioda_dir_ges_path = os.path.join(self.task_config.DATA, 'atmos_gsi_ioda_ges') - diag_ioda_dir_anl_path = os.path.join(self.task_config.DATA, 'atmos_gsi_ioda_anl') - output_dir_path = os.path.join(self.task_config.DATA, 'atmos_gsi_ioda') + diag_ioda_dir_ges_path = os.path.join(self.task_config.DATA, 'atmos_gsi', 'atmos_gsi_ioda_ges') + diag_ioda_dir_anl_path = os.path.join(self.task_config.DATA, 'atmos_gsi', 'atmos_gsi_ioda_anl') + output_dir_path = os.path.join(self.task_config.DATA, 'atmos_gsi', 'atmos_gsi_ioda') FileHandler({'mkdir': [diag_ioda_dir_ges_path, diag_ioda_dir_anl_path, output_dir_path]}).sync() diag_tar_copy_list = [] for diag in diag_tars: input_tar_basename = f"{self.task_config.APREFIX}{diag}" input_tar = os.path.join(self.task_config.COMIN_ATMOS_ANALYSIS, - input_tar_basename) + f"{input_tar_basename}.tar") dest = os.path.join(diag_dir_path, input_tar_basename) if os.path.exists(input_tar): diag_tar_copy_list.append([input_tar, dest]) @@ -296,12 +247,12 @@ def convert_gsi_diags(self) -> None: out_ioda_file = os.path.join(output_dir_path, os.path.basename(ges_ioda_file).replace('_ges_', '_gsi_')) gsid.combine_ges_anl_ioda(ges_ioda_file, anl_ioda_file, out_ioda_file) else: - logger.warning(f"WARNING: {anl_ioda_file} does not exist to combine with {ges_ioda_file}") + logger.warning(f"{anl_ioda_file} does not exist to combine with {ges_ioda_file}") logger.warning("Skipping this file ...") # Tar up the ioda files - iodastatzipfile = os.path.join(self.task_config.DATA, 'atmos_gsi_ioda', - f"{self.task_config.APREFIX}atmos_gsi_ioda_diags.tar.gz") + iodastatzipfile = os.path.join(self.task_config.DATA, 'atmos_gsi', 'atmos_gsi_ioda', + f"{self.task_config.APREFIX}atmos_gsi_analysis.ioda_hofx.tar.gz") logger.info(f"Compressing GSI IODA files to {iodastatzipfile}") # get list of iodastat files to put in tarball iodastatfiles = glob.glob(os.path.join(output_dir_path, '*nc4')) diff --git a/ush/python/pygfs/task/atm_analysis.py b/ush/python/pygfs/task/atm_analysis.py index 13d1069b620..d2fd0f820a5 100644 --- a/ush/python/pygfs/task/atm_analysis.py +++ b/ush/python/pygfs/task/atm_analysis.py @@ -74,7 +74,8 @@ def initialize(self) -> None: This method will initialize a global atm analysis. This includes: - stage input files from COM and create output directories - - extract bias corrections from tar files + - stage observation files + - stage bias correction files - initialize JEDI applications Parameters @@ -90,14 +91,18 @@ def initialize(self) -> None: logger.info(f"Staging files from COM and creating output directories") FileHandler(self.task_config.data_in).sync() - # Extract bias corrections from tar files - logger.info(f"Extracting bias corrections from tar files") - self.untar_bias_corrections() + # Stage observation files + logger.info(f"Staging observation files") + self.jedi_dict['atmanlvar'].stage_obsdatain(f"{self.task_config.COMIN_OBS}/atmos") + + # Stage bias correction files + logger.info(f"Staging bias correction files") + self.jedi_dict['atmanlvar'].stage_obsbiasin(self.task_config.COMIN_ATMOS_ANALYSIS_PREV) # Initialize JEDI variational application logger.info(f"Initializing JEDI applications") - self.jedi_dict['atmanlvar'].initialize(self.task_config, clean_empty_obsspaces=True) - self.jedi_dict['atmanlfv3inc'].initialize(self.task_config) + self.jedi_dict['atmanlvar'].initialize(clean_empty_obsspaces=True) + self.jedi_dict['atmanlfv3inc'].initialize() @logit(logger) def execute(self, jedi_dict_key: str) -> None: @@ -121,8 +126,8 @@ def finalize(self) -> None: This method will finalize a global atm analysis using JEDI. This includes: - - compress and tar output diag files in COM - - tar radiative bias correction files and place in COM + - archive, compress, and save diag files to COM directory + - tar radiative bias correction files to COM directory - save output files and YAMLs to COM Parameters @@ -134,13 +139,15 @@ def finalize(self) -> None: None """ - # Compress and tar diag files in COM directory - self.tar_diag_files(self.task_config.COMOUT_ATMOS_ANALYSIS, - f"{self.task_config.APREFIX}stat.atm.tar") + # Archive, compress, and save diag files to COM directory + logger.info(f"Saving observation diag files to COM") + self.jedi_dict['atmanlvar'].save_obsdataout(self.task_config.COMOUT_ATMOS_ANALYSIS, + f"{self.task_config.APREFIX}atmos_analysis.ioda_hofx") - # Tar radiative bias correction files into COM directory - self.tar_radiative_bias_corrections(self.task_config.COMOUT_ATMOS_ANALYSIS, - f"{self.task_config.APREFIX}rad_varbc_params.tar") + # Tar radiative bias correction files to COM directory + logger.info(f"Saving radiative bias correction files to COM") + self.jedi_dict['atmanlvar'].save_obsbiasout(self.task_config.COMOUT_ATMOS_ANALYSIS, + f"{self.task_config.APREFIX}rad_varbc_params") # Save files from COM logger.info(f"Saving files to COM") diff --git a/ush/python/pygfs/task/atmens_analysis.py b/ush/python/pygfs/task/atmens_analysis.py index 99cc210408c..f6534b1b7df 100644 --- a/ush/python/pygfs/task/atmens_analysis.py +++ b/ush/python/pygfs/task/atmens_analysis.py @@ -60,7 +60,8 @@ def initialize(self) -> None: This method will initialize a global atmens analysis. This includes: - stage input files from COM and create output directories - - extract bias corrections from tar files + - stage observation files + - stage bias correction files - initialize JEDI applications Parameters @@ -76,15 +77,19 @@ def initialize(self) -> None: logger.info(f"Staging files from COM") FileHandler(self.task_config.data_in).sync() - # Extract bias corrections from tar files - logger.info(f"Extracting bias corrections from tar files") - self.untar_bias_corrections() + # Stage observation files + logger.info(f"Staging observation files") + self.jedi_dict['atmensanlobs'].stage_obsdatain(f"{self.task_config.COMIN_OBS}/atmos") + + # Stage bias correction files + logger.info(f"Staging bias correction files") + self.jedi_dict['atmensanlobs'].stage_obsbiasin(self.task_config.COMIN_ATMOS_ANALYSIS_PREV) # initialize JEDI applications logger.info(f"Initializing JEDI LETKF observer application") - self.jedi_dict['atmensanlobs'].initialize(self.task_config, clean_empty_obsspaces=True) - self.jedi_dict['atmensanlsol'].initialize(self.task_config) - self.jedi_dict['atmensanlfv3inc'].initialize(self.task_config) + self.jedi_dict['atmensanlobs'].initialize(clean_empty_obsspaces=True) + self.jedi_dict['atmensanlsol'].initialize() + self.jedi_dict['atmensanlfv3inc'].initialize() @logit(logger) def initialize_letkf(self) -> None: @@ -126,7 +131,7 @@ def finalize(self) -> None: This method will finalize a global atmens analysis using JEDI. This includes: - - compress and tar output diag files and place in COM + - archive, compress, and save diag files in COM directory - save output files and YAMLs to COM Parameters @@ -138,9 +143,10 @@ def finalize(self) -> None: None """ - # Compress and tar diag files in COM directory - self.tar_diag_files(self.task_config.COMOUT_ATMOS_ANALYSIS_ENS, - f"{self.task_config.APREFIX_ENS}stat.atm.tar") + # Archive, compress, and save diag files in COM directory + logger.info(f"Saving observation diag files to COM") + self.jedi_dict['atmensanlobs'].save_obsdataout(self.task_config.COMOUT_ATMOS_ANALYSIS_ENS, + f"{self.task_config.APREFIX_ENS}atmos_analysis.ens_mean.ioda_hofx") # Save files from COM logger.info(f"Saving files to COM") diff --git a/ush/python/pygfs/task/ensemble_recenter.py b/ush/python/pygfs/task/ensemble_recenter.py index 54ba1d12339..69c98c7fd9c 100644 --- a/ush/python/pygfs/task/ensemble_recenter.py +++ b/ush/python/pygfs/task/ensemble_recenter.py @@ -80,8 +80,8 @@ def initialize(self) -> None: # Initialize JEDI ensemble increment recentering application logger.info(f"Initializing JEDI applications") - self.jedi_dict['correction_increment'].initialize(self.task_config) - self.jedi_dict['ensemble_recenter'].initialize(self.task_config) + self.jedi_dict['correction_increment'].initialize() + self.jedi_dict['ensemble_recenter'].initialize() @logit(logger) def execute(self) -> None: diff --git a/ush/python/pygfs/task/fv3_analysis_calc.py b/ush/python/pygfs/task/fv3_analysis_calc.py index 2dca788ce29..863c97e0a32 100644 --- a/ush/python/pygfs/task/fv3_analysis_calc.py +++ b/ush/python/pygfs/task/fv3_analysis_calc.py @@ -87,11 +87,11 @@ def initialize(self) -> None: # Initialize GDASApp JEDI addincrement application logger.info(f"Initializing GDASApp JEDI addincrement applications") - self.jedi_dict['atm_addincrement'].initialize(self.task_config) + self.jedi_dict['atm_addincrement'].initialize() if self.task_config.DO_AERO_ANL: - self.jedi_dict['aero_addincrement'].initialize(self.task_config) + self.jedi_dict['aero_addincrement'].initialize() if self.task_config.DO_JEDISNOWDA: - self.jedi_dict['snow_addincrement'].initialize(self.task_config) + self.jedi_dict['snow_addincrement'].initialize() @logit(logger) def execute(self) -> None: diff --git a/ush/python/pygfs/task/marine_analysis.py b/ush/python/pygfs/task/marine_analysis.py index 872be06d2ab..04f6156b463 100644 --- a/ush/python/pygfs/task/marine_analysis.py +++ b/ush/python/pygfs/task/marine_analysis.py @@ -89,6 +89,7 @@ def initialize(self) -> None: This method will initialize the marine analysis. This includes: - staging input files from COM and create output directories + - staging observation files - preparing the namelists for deterministic MOM6 and analysis geometry - asserting that dates of the history files are correct - initializing all the JEDI applications required for the marine analysis @@ -107,6 +108,10 @@ def initialize(self) -> None: logger.info(f"Staging files from COM and creating input/output directories") FileHandler(self.task_config.data_in).sync() + # Stage observation files + logger.info(f"Staging observations") + self.jedi_dict['var'].stage_obsdatain(self.task_config.COMIN_OBS) + # prepare the deterministic MOM6 input.nml logger.info(f"Preparing deterministic MOM6 input namelist") mdau.prep_input_nml(self.task_config) @@ -124,8 +129,8 @@ def initialize(self) -> None: # initialize JEDI applications logger.info(f"Initializing JEDI applications") - self.jedi_dict['var'].initialize(self.task_config, clean_empty_obsspaces=True) - self.jedi_dict['soca_incpostproc'].initialize(self.task_config) + self.jedi_dict['var'].initialize(clean_empty_obsspaces=True) + self.jedi_dict['soca_incpostproc'].initialize() # This method is a bit of a hack that will be removed in the future when the anlstat # job fully replaces the SOCA obs_diag_stats application @@ -157,7 +162,8 @@ def finalize(self) -> None: This method will finalize a global marine analysis. This includes: - Saving output files to COM - - Saving observation statistics to COM + - Archiving, compressing, and saving diag files in COM directory + - Saving (legacy) observation statistics to COM Parameters ---------- @@ -172,10 +178,15 @@ def finalize(self) -> None: logger.info(f"Saving files to COM") FileHandler(self.task_config.data_out).sync() - # Save obs diag statistics to COM (success is optional) - logger.info(f"Copy observation statistics from {self.task_config.DATA} to {self.task_config.COMOUT_OCEAN_ANALYSIS}") + # Archive, compress, and save diag files in COM directory + logger.info(f"Saving observation diag files to COM") + self.jedi_dict['var'].save_obsdataout(self.task_config.COMOUT_OCEAN_ANALYSIS, + f"{self.task_config.APREFIX}marine_analysis.ioda_hofx") + + # Save obs diag statistics to COM (this is for legacy obs monitoring) + logger.info(f"Copy (legacy) observation statistics from {self.task_config.DATA} to {self.task_config.COMOUT_OCEAN_ANALYSIS}") try: - diags_list = self.jedi_dict['soca_diag_stats'].render_jcb(self.task_config, 'soca_diags_finalize') + diags_list = self.jedi_dict['soca_diag_stats'].render_jcb_template(algorithm_in='soca_diags_finalize') except Exception as e: logger.warning(f"Failed to render JCB template, 'soca_diags_finalize': {e}") FileHandler(diags_list).sync() diff --git a/ush/python/pygfs/task/marine_bmat.py b/ush/python/pygfs/task/marine_bmat.py index 297d5e608a4..057d2acef89 100644 --- a/ush/python/pygfs/task/marine_bmat.py +++ b/ush/python/pygfs/task/marine_bmat.py @@ -98,19 +98,19 @@ def initialize(self) -> None: simple_geom=True, mom_input="./anl_geom/MOM_input") # initialize vtscales python script - vtscales_config = self.jedi_dict['soca_parameters_diffusion_vt'].render_jcb(self.task_config, 'soca_vtscales') + vtscales_config = self.jedi_dict['soca_parameters_diffusion_vt'].render_jcb_template('soca_vtscales') save_as_yaml(vtscales_config, os.path.join(self.task_config.DATA, 'soca_vtscales.yaml')) # initialize JEDI applications - self.jedi_dict['gridgen'].initialize(self.task_config) - self.jedi_dict['soca_diagb'].initialize(self.task_config) - self.jedi_dict['soca_chgres'].initialize(self.task_config) - self.jedi_dict['soca_parameters_diffusion_vt'].initialize(self.task_config) - self.jedi_dict['soca_setcorscales'].initialize(self.task_config) - self.jedi_dict['soca_parameters_diffusion_hz'].initialize(self.task_config) + self.jedi_dict['gridgen'].initialize() + self.jedi_dict['soca_diagb'].initialize() + self.jedi_dict['soca_chgres'].initialize() + self.jedi_dict['soca_parameters_diffusion_vt'].initialize() + self.jedi_dict['soca_setcorscales'].initialize() + self.jedi_dict['soca_parameters_diffusion_hz'].initialize() if self.task_config.DOHYBVAR_OCN == "YES" or self.task_config.NMEM_ENS >= 2: - self.jedi_dict['soca_ensb'].initialize(self.task_config) - self.jedi_dict['soca_ensweights'].initialize(self.task_config) + self.jedi_dict['soca_ensb'].initialize() + self.jedi_dict['soca_ensweights'].initialize() @logit(logger) def execute(self) -> None: diff --git a/ush/python/pygfs/task/marine_letkf.py b/ush/python/pygfs/task/marine_letkf.py index 79f87fa1e72..8acd81493a7 100644 --- a/ush/python/pygfs/task/marine_letkf.py +++ b/ush/python/pygfs/task/marine_letkf.py @@ -69,6 +69,7 @@ def initialize(self): This method will initialize the marine analysis. This includes: - staging input files from COM and create output directories + - staging observation files - preparing the namelist for MOM6 - initializing all the JEDI applications required for marine LETKF @@ -84,14 +85,18 @@ def initialize(self): logger.info(f"Staging files from COM and creating input/output directories") FileHandler(self.task_config.data_in).sync() + # Stage observation files + logger.info(f"Staging observations") + self.jedi_dict['letkf'].stage_obsdatain(self.task_config.COMIN_OBS) + # prepare the ensemble MOM6 input.nml logger.info(f"Preparing ensemble MOM6 input namelist") mdau.prep_input_nml(self.task_config) # initialize JEDI applications logger.info(f"Initializing JEDI applications") - self.jedi_dict['gridgen'].initialize(self.task_config) - self.jedi_dict['letkf'].initialize(self.task_config, clean_empty_obsspaces=True) + self.jedi_dict['gridgen'].initialize() + self.jedi_dict['letkf'].initialize(clean_empty_obsspaces=True) @logit(logger) def execute(self) -> None: @@ -116,6 +121,7 @@ def finalize(self): This method will finalize a global marine analysis. This includes: - Saving output files to COM + - Archive, compress, and save diag files in COM directory Parameters: ------------ @@ -128,3 +134,8 @@ def finalize(self): # Save files from COM logger.info(f"Saving files to COM") FileHandler(self.task_config.data_out).sync() + + # Archive, compress, and save diag files in COM directory + logger.info(f"Saving observation diag files to COM") + self.jedi_dict['letkf'].save_obsdataout(self.task_config.COMOUT_OCEAN_LETKF, + f"{self.task_config.APREFIX}marine_analysis.ioda_hofx.ens_mean") diff --git a/ush/python/pygfs/task/marine_recenter.py b/ush/python/pygfs/task/marine_recenter.py index d66a47e7202..5039960c9b4 100644 --- a/ush/python/pygfs/task/marine_recenter.py +++ b/ush/python/pygfs/task/marine_recenter.py @@ -93,8 +93,8 @@ def initialize(self): # initialize JEDI applications logger.info(f"Initializing JEDI applications") - self.jedi_dict['gridgen'].initialize(self.task_config) - self.jedi_dict['ens_handler'].initialize(self.task_config) + self.jedi_dict['gridgen'].initialize() + self.jedi_dict['ens_handler'].initialize() @logit(logger) def execute(self, jedi_dict_key: str) -> None: diff --git a/ush/python/pygfs/task/snow_analysis.py b/ush/python/pygfs/task/snow_analysis.py index a854696bda6..5109070f2b5 100644 --- a/ush/python/pygfs/task/snow_analysis.py +++ b/ush/python/pygfs/task/snow_analysis.py @@ -57,14 +57,6 @@ def __init__(self, config: Dict[str, Any]): else: _DO_IMS_SCF = False - # Check if SNOCVR or SNOMAD file exists, do SNOCVR_SNOMAD preprocessing - _snocvr_file = os.path.join(self.task_config.COMIN_OBS, f'{self.task_config.OPREFIX}snocvr.tm00.bufr_d') - _snomad_file = os.path.join(self.task_config.COMIN_OBS, f'{self.task_config.OPREFIX}snomad.tm00.bufr_d') - _DO_SNOCVR_SNOMAD = ( - "snocvr_snomad" in self.task_config.observations and - (os.path.exists(_snocvr_file) or os.path.exists(_snomad_file)) - ) - # Extend task_config with variables repeatedly used across this class self.task_config.update(AttrDict( { @@ -76,7 +68,6 @@ def __init__(self, config: Dict[str, Any]): 'snow_prepobs_path': os.path.join(self.task_config.DATA, 'prep'), 'ims_file': _ims_file, 'DO_IMS_SCF': _DO_IMS_SCF, # Boolean to decide if IMS snow cover processing is done - 'DO_SNOCVR_SNOMAD': _DO_SNOCVR_SNOMAD, # Boolean to decide if SNOCVR_SNOMAD processing is done } )) @@ -87,12 +78,21 @@ def __init__(self, config: Dict[str, Any]): expected_keys = ['scf_to_ioda', 'snowanlvar'] self.jedi_dict = Jedi.get_jedi_dict(self.task_config.jedi_config, self.task_config, expected_keys) + # Boolean to decide if SNOCVR_SNOMAD processing is done + _snocvr_file = os.path.join(self.task_config.COMIN_OBS, f'{self.task_config.OPREFIX}snocvr.tm00.bufr_d') + _snomad_file = os.path.join(self.task_config.COMIN_OBS, f'{self.task_config.OPREFIX}snomad.tm00.bufr_d') + self.task_config.DO_SNOCVR_SNOMAD = ( + "snocvr_snomad" in self.jedi_dict.snowanlvar.jcb_config.observations and + (os.path.exists(_snocvr_file) or os.path.exists(_snomad_file)) + ) + @logit(logger) def initialize(self) -> None: """Initialize a global snow analysis This method will initialize a global snow analysis. This includes: + - stage observation files - stage input files from COM and create output directories - initialize JEDI applications @@ -105,15 +105,19 @@ def initialize(self) -> None: None """ + # Stage observation files + logger.info(f"Staging observation files") + self.jedi_dict['snowanlvar'].stage_obsdatain(self.task_config.COMIN_OBS) + # Stage files from COM logger.info(f"Staging files from COM and creating output directories") FileHandler(self.task_config.data_in).sync() # initialize JEDI variational application logger.info(f"Initializing JEDI applications") - self.jedi_dict['snowanlvar'].initialize(self.task_config, clean_empty_obsspaces=False) + self.jedi_dict['snowanlvar'].initialize(clean_empty_obsspaces=False) if self.task_config.DO_IMS_SCF: - self.jedi_dict['scf_to_ioda'].initialize(self.task_config) + self.jedi_dict['scf_to_ioda'].initialize() @logit(logger) def execute(self, jedi_dict_key: str) -> None: @@ -137,7 +141,7 @@ def execute(self, jedi_dict_key: str) -> None: def finalize(self) -> None: """Performs closing actions of the Snow analysis task This method: - - compress and tar output diag files in COM + - archive, compress, and save diag files in COM directory - save output files and YAMLs to COM Parameters @@ -146,9 +150,10 @@ def finalize(self) -> None: Instance of the SnowAnalysis object """ - # Compress and tar diag files into COM directory - self.tar_diag_files(self.task_config.COMOUT_SNOW_ANALYSIS, - f"{self.task_config.APREFIX}snow_analysis.ioda_hofx.tar") + # Archive, compress, and save diag files in COM directory + logger.info(f"Saving observation diag files to COM") + self.jedi_dict['snowanlvar'].save_obsdataout(self.task_config.COMOUT_SNOW_ANALYSIS, + f"{self.task_config.APREFIX}snow_analysis.ioda_hofx") # Save files to COM logger.info(f"Saving files to COM") diff --git a/ush/python/pygfs/task/snowens_analysis.py b/ush/python/pygfs/task/snowens_analysis.py index 8016576779b..c876d94f290 100644 --- a/ush/python/pygfs/task/snowens_analysis.py +++ b/ush/python/pygfs/task/snowens_analysis.py @@ -60,14 +60,6 @@ def __init__(self, config: Dict[str, Any]): else: _DO_IMS_SCF = False - # Check if SNOCVR or SNOMAD file exists, do SNOCVR_SNOMAD preprocessing - _snocvr_file = os.path.join(self.task_config.COMIN_OBS, f'{self.task_config.OPREFIX}snocvr.tm00.bufr_d') - _snomad_file = os.path.join(self.task_config.COMIN_OBS, f'{self.task_config.OPREFIX}snomad.tm00.bufr_d') - _DO_SNOCVR_SNOMAD = ( - "snocvr_snomad" in self.task_config.observations and - (os.path.exists(_snocvr_file) or os.path.exists(_snomad_file)) - ) - # Extend task_config with variables repeatedly used across this class self.task_config.update(AttrDict( { @@ -79,7 +71,6 @@ def __init__(self, config: Dict[str, Any]): 'snow_bkg_path': os.path.join('.', 'bkg', 'ensmean/'), 'ims_file': _ims_file, 'DO_IMS_SCF': _DO_IMS_SCF, # Boolean to decide if IMS snow cover processing is done - 'DO_SNOCVR_SNOMAD': _DO_SNOCVR_SNOMAD, # Boolean to decide if SNOCVR_SNOMAD processing is done } )) @@ -90,12 +81,21 @@ def __init__(self, config: Dict[str, Any]): expected_keys = ['scf_to_ioda', 'snowanlvar', 'esnowanlensmean'] self.jedi_dict = Jedi.get_jedi_dict(self.task_config.jedi_config, self.task_config, expected_keys) + # Boolean to decide if SNOCVR_SNOMAD processing is done + _snocvr_file = os.path.join(self.task_config.COMIN_OBS, f'{self.task_config.OPREFIX}snocvr.tm00.bufr_d') + _snomad_file = os.path.join(self.task_config.COMIN_OBS, f'{self.task_config.OPREFIX}snomad.tm00.bufr_d') + self.task_config.DO_SNOCVR_SNOMAD = ( + "snocvr_snomad" in self.jedi_dict.snowanlvar.jcb_config.observations and + (os.path.exists(_snocvr_file) or os.path.exists(_snomad_file)) + ) + @logit(logger) def initialize(self) -> None: """Initialize a global snow ensemble analysis This method will initialize a global snow ensemble analysis. This includes: + - stage observation files - stage input files from COM and create output directories - initialize JEDI applications @@ -108,16 +108,20 @@ def initialize(self) -> None: None """ + # Stage observation files + logger.info(f"Staging observation files") + self.jedi_dict['snowanlvar'].stage_obsdatain(self.task_config.COMIN_OBS) + # Stage files from COM logger.info(f"Staging files from COM and creating output directories") FileHandler(self.task_config.data_in).sync() # Initialize JEDI applications logger.info(f"Initializing JEDI applications") - self.jedi_dict['snowanlvar'].initialize(self.task_config, clean_empty_obsspaces=False) - self.jedi_dict['esnowanlensmean'].initialize(self.task_config) + self.jedi_dict['snowanlvar'].initialize(clean_empty_obsspaces=False) + self.jedi_dict['esnowanlensmean'].initialize() if self.task_config.DO_IMS_SCF: - self.jedi_dict['scf_to_ioda'].initialize(self.task_config) + self.jedi_dict['scf_to_ioda'].initialize() @logit(logger) def execute(self, jedi_dict_key: str) -> None: @@ -141,7 +145,7 @@ def execute(self, jedi_dict_key: str) -> None: def finalize(self) -> None: """Performs closing actions of the Snow analysis task This method: - - compress and tar output diag files in COM + - archive, compress, and save diag files in COM directory - save output files and YAMLs to COM Parameters @@ -150,9 +154,10 @@ def finalize(self) -> None: Instance of the SnowEnsAnalysis object """ - # Compress and tar diag files into COM directory - self.tar_diag_files(self.task_config.COMOUT_SNOW_ANALYSIS, - f"{self.task_config.APREFIX_ENS}snow_analysis.ioda_hofx.ensmean.tar") + # Archive, compress, and save diag files in COM directory + logger.info(f"Saving observation diag files to COM") + self.jedi_dict['snowanlvar'].save_obsdataout(self.task_config.COMOUT_SNOW_ANALYSIS, + f"{self.task_config.APREFIX_ENS}snow_analysis.ioda_hofx.ensmean") # Save files to COM logger.info(f"Saving files to COM")