From 703696cb8c8e25e9e65e0ffac249fd8049149338 Mon Sep 17 00:00:00 2001 From: David Huber Date: Thu, 4 Dec 2025 10:47:06 -0500 Subject: [PATCH 1/9] Correct metplus backwards checks --- dev/workflow/rocoto/gfs_tasks.py | 38 ++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/dev/workflow/rocoto/gfs_tasks.py b/dev/workflow/rocoto/gfs_tasks.py index 2eb76eea6cc..ceba16c66bf 100644 --- a/dev/workflow/rocoto/gfs_tasks.py +++ b/dev/workflow/rocoto/gfs_tasks.py @@ -2032,24 +2032,28 @@ def metp(self): deps = [] dep_dict = {'type': 'task', 'name': f'{self.run}_arch_vrfy'} deps.append(rocoto.add_dependency(dep_dict)) - if self._base["interval_gfs"] < to_timedelta("24H"): - n_lookback = self._base["interval_gfs"] // to_timedelta("6H") - for lookback in range(1, n_lookback + 1): - deps2 = [] + interval_gfs = self._base.get('interval_gfs') + + if interval_gfs < to_timedelta("24H"): + assim_freq=self._base['assim_freq'] + n_lookback = int(interval_gfs // to_timedelta(f"{assim_freq}H")) - 1 + # Check if the previous up to `n_lookback` arch_vrfy tasks have completed + # For interval=6, there are no lookbacks + # For interval=12, check lookback=1 + # For interval=18, check lookback=1,2 + # Only lookback if arch_vrfy is not valid for this cycle + if n_lookback > 0: dep_dict = {'type': 'taskvalid', 'name': f'{self.run}_arch_vrfy', 'condition': 'not'} + deps2 = [] deps2.append(rocoto.add_dependency(dep_dict)) - for lookback2 in range(1, lookback): - offset = timedelta_to_HMS(-to_timedelta(f'{6 * lookback2}H')) - dep_dict = {'type': 'cycleexist', 'condition': 'not', 'offset': offset} - deps2.append(rocoto.add_dependency(dep_dict)) - - edate_gfs = self._base['EDATE'] - edate_metp = edate_gfs.replace(hour=18) - edate_metp_diff = edate_metp - edate_gfs - offset = timedelta_to_HMS(-to_timedelta(f'{edate_metp_diff}H')) - dep_dict = {'type': 'task', 'name': f'{self.run}_arch_vrfy', 'offset': offset} - deps2.append(rocoto.add_dependency(dep_dict)) - deps.append(rocoto.create_dependency(dep_condition='and', dep=deps2)) + deps3 = [] + for lookback in range(0, n_lookback): + offset = timedelta_to_HMS(-to_timedelta(f'{assim_freq * (lookback+1)}H')) + dep_dict = {'type': 'task', 'name': f'{self.run}_arch_vrfy', 'offset': offset} + deps3.append(rocoto.add_dependency(dep_dict)) + + deps2.append(rocoto.create_dependency(dep=deps3, dep_condition='or')) + deps.append(rocoto.create_dependency(dep=deps2, dep_condition='and')) dependencies = rocoto.create_dependency(dep_condition='or', dep=deps) @@ -2071,7 +2075,7 @@ def metp(self): 'resources': resources, 'dependency': dependencies, 'envars': metpenvars, - 'cycledef': 'metp,last_gfs', + 'cycledef': 'metp', 'command': f'{self.HOMEgfs}/dev/jobs/metp.sh', 'job_name': f'{self.pslot}_{task_name}_@H', 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', From 2acd441df86d51f725f1e8dc87f272f197da3b4a Mon Sep 17 00:00:00 2001 From: David Huber Date: Thu, 4 Dec 2025 12:19:36 -0500 Subject: [PATCH 2/9] Check for end cycle --- dev/workflow/rocoto/gfs_cycled_xml.py | 1 - dev/workflow/rocoto/gfs_tasks.py | 24 ++++++++++++++++++++++-- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/dev/workflow/rocoto/gfs_cycled_xml.py b/dev/workflow/rocoto/gfs_cycled_xml.py index 6b4542251de..f3792ad43eb 100644 --- a/dev/workflow/rocoto/gfs_cycled_xml.py +++ b/dev/workflow/rocoto/gfs_cycled_xml.py @@ -100,7 +100,6 @@ def get_cycledefs(self): interval_metp_str = interval_gfs_str strings.append(f'\t{sdate_metp_str} {edate_metp_str} {interval_metp_str}') - strings.append(f'\t{edate_gfs_str} {edate_gfs_str} 24:00:00') strings.append('') strings.append('') diff --git a/dev/workflow/rocoto/gfs_tasks.py b/dev/workflow/rocoto/gfs_tasks.py index 9b1447ee843..cd442f0cff7 100644 --- a/dev/workflow/rocoto/gfs_tasks.py +++ b/dev/workflow/rocoto/gfs_tasks.py @@ -2016,7 +2016,7 @@ def metp(self): deps2 = [] deps2.append(rocoto.add_dependency(dep_dict)) deps3 = [] - for lookback in range(0, n_lookback): + for lookback in range(n_lookback): offset = timedelta_to_HMS(-to_timedelta(f'{assim_freq * (lookback+1)}H')) dep_dict = {'type': 'task', 'name': f'{self.run}_arch_vrfy', 'offset': offset} deps3.append(rocoto.add_dependency(dep_dict)) @@ -2024,6 +2024,26 @@ def metp(self): deps2.append(rocoto.create_dependency(dep=deps3, dep_condition='or')) deps.append(rocoto.create_dependency(dep=deps2, dep_condition='and')) + # Lastly, check that the last arch_vrfy job is done + # This only happens if the metp cycle is not aligned with the last_gfs cycle + sdate_gfs = self._base.get('SDATE_GFS') + edate = self._base.get('EDATE') + edate_metp = self._base.get('EDATE').replace(hour=(24 - assim_freq)) + n_intervals = int((edate - sdate_gfs) // interval_gfs) + edate_gfs = sdate_gfs + n_intervals * interval_gfs + metp_gfs_offset = edate_metp - edate_gfs + if metp_gfs_offset > to_timedelta("0H"): + deps2 = [] + dep_dict = {'type': 'taskvalid', 'name': f'{self.run}_arch_vrfy', 'condition': 'not'} + deps2.append(rocoto.add_dependency(dep_dict)) + dep_dict = {'type': 'task', 'name': f'{self.run}_arch_vrfy', 'offset': timedelta_to_HMS(-metp_gfs_offset)} + deps2.append(rocoto.add_dependency(dep_dict)) + for i in range(1, int((metp_gfs_offset.seconds/3600) // assim_freq)): + dep_dict = {'type': 'cycleexist', 'offset': timedelta_to_HMS(-to_timedelta(f'{assim_freq * i}H')), 'condition': 'not'} + deps2.append(rocoto.add_dependency(dep_dict)) + + deps.append(rocoto.create_dependency(dep=deps2, dep_condition='and')) + dependencies = rocoto.create_dependency(dep_condition='or', dep=deps) metpenvars = self.envars.copy() @@ -2035,7 +2055,7 @@ def metp(self): varname1 = 'metpcase' varval1 = 'g2g1 g2o1 pcp1' - var_dict = {varname1: varval1} + var_dict = {varname1 : varval1} resources = self.get_resource('metp') From 451792d78e47eba507fea8a98195403267101e83 Mon Sep 17 00:00:00 2001 From: David Huber Date: Thu, 4 Dec 2025 14:43:29 -0500 Subject: [PATCH 3/9] Check for edge cases --- dev/workflow/rocoto/gfs_tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dev/workflow/rocoto/gfs_tasks.py b/dev/workflow/rocoto/gfs_tasks.py index cd442f0cff7..a9f994f9dc6 100644 --- a/dev/workflow/rocoto/gfs_tasks.py +++ b/dev/workflow/rocoto/gfs_tasks.py @@ -2002,9 +2002,9 @@ def metp(self): dep_dict = {'type': 'task', 'name': f'{self.run}_arch_vrfy'} deps.append(rocoto.add_dependency(dep_dict)) interval_gfs = self._base.get('interval_gfs') + assim_freq=self._base['assim_freq'] if interval_gfs < to_timedelta("24H"): - assim_freq=self._base['assim_freq'] n_lookback = int(interval_gfs // to_timedelta(f"{assim_freq}H")) - 1 # Check if the previous up to `n_lookback` arch_vrfy tasks have completed # For interval=6, there are no lookbacks @@ -2032,7 +2032,7 @@ def metp(self): n_intervals = int((edate - sdate_gfs) // interval_gfs) edate_gfs = sdate_gfs + n_intervals * interval_gfs metp_gfs_offset = edate_metp - edate_gfs - if metp_gfs_offset > to_timedelta("0H"): + if metp_gfs_offset > to_timedelta("0H") and metp_gfs_offset < to_timedelta("24H"): deps2 = [] dep_dict = {'type': 'taskvalid', 'name': f'{self.run}_arch_vrfy', 'condition': 'not'} deps2.append(rocoto.add_dependency(dep_dict)) From 1403fb51bcfa42d13e5986afa859d74a31bbe8d4 Mon Sep 17 00:00:00 2001 From: David Huber Date: Thu, 4 Dec 2025 20:18:36 +0000 Subject: [PATCH 4/9] Pynorms --- dev/workflow/rocoto/gfs_tasks.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dev/workflow/rocoto/gfs_tasks.py b/dev/workflow/rocoto/gfs_tasks.py index a9f994f9dc6..b5869dccbcb 100644 --- a/dev/workflow/rocoto/gfs_tasks.py +++ b/dev/workflow/rocoto/gfs_tasks.py @@ -2002,7 +2002,7 @@ def metp(self): dep_dict = {'type': 'task', 'name': f'{self.run}_arch_vrfy'} deps.append(rocoto.add_dependency(dep_dict)) interval_gfs = self._base.get('interval_gfs') - assim_freq=self._base['assim_freq'] + assim_freq = self._base['assim_freq'] if interval_gfs < to_timedelta("24H"): n_lookback = int(interval_gfs // to_timedelta(f"{assim_freq}H")) - 1 @@ -2038,7 +2038,7 @@ def metp(self): deps2.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'task', 'name': f'{self.run}_arch_vrfy', 'offset': timedelta_to_HMS(-metp_gfs_offset)} deps2.append(rocoto.add_dependency(dep_dict)) - for i in range(1, int((metp_gfs_offset.seconds/3600) // assim_freq)): + for i in range(1, int((metp_gfs_offset.seconds / 3600) // assim_freq)): dep_dict = {'type': 'cycleexist', 'offset': timedelta_to_HMS(-to_timedelta(f'{assim_freq * i}H')), 'condition': 'not'} deps2.append(rocoto.add_dependency(dep_dict)) @@ -2055,7 +2055,7 @@ def metp(self): varname1 = 'metpcase' varval1 = 'g2g1 g2o1 pcp1' - var_dict = {varname1 : varval1} + var_dict = {varname1: varval1} resources = self.get_resource('metp') From b8416ce395c2d20996989db75368b666258dc346 Mon Sep 17 00:00:00 2001 From: David Huber Date: Mon, 8 Dec 2025 13:51:45 +0000 Subject: [PATCH 5/9] Test for invalid GFS intervals --- dev/workflow/applications/applications.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/dev/workflow/applications/applications.py b/dev/workflow/applications/applications.py index 3d0ebc6e36c..3107db4fe5b 100644 --- a/dev/workflow/applications/applications.py +++ b/dev/workflow/applications/applications.py @@ -215,6 +215,14 @@ def _get_run_options(self, conf: Configuration) -> Dict[str, Any]: if not AppConfig.is_monotonic(run_options[run]['fcst_segments']): raise ValueError(f'Forecast segments do not increase monotonically: {",".join(self.fcst_segments)}') + # Test if valid gfs options were specified + if run == "gfs": + interval = run_base.get('INTERVAL_GFS') + assim_freq = run_base.get('assim_freq') + if interval <= 0 or interval % assim_freq != 0: + raise ValueError(f'Invalid cycle interval specified for GFS forecasts: {interval}\n' + f'INTERVAL_GFS must be greater than 0 and a multiple of {assim_freq}') + if run_options[run]['do_globusarch'] and not globus_checked: self._check_globus(conf) globus_checked = True From c018b05e413141d4432af02ca1dbcf46cec7891d Mon Sep 17 00:00:00 2001 From: David Huber Date: Mon, 8 Dec 2025 13:52:07 +0000 Subject: [PATCH 6/9] Fix typo --- dev/workflow/rocoto/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/workflow/rocoto/tasks.py b/dev/workflow/rocoto/tasks.py index 5a892af4d21..b5d48e994bb 100644 --- a/dev/workflow/rocoto/tasks.py +++ b/dev/workflow/rocoto/tasks.py @@ -415,7 +415,7 @@ def get_resource(self, task_name): service_task = task_name in Tasks.SERVICE_TASKS if task_name not in Tasks.VALID_TASKS: - raise KeyError(f"ERROR {task_name} is not a valid tasks!") + raise KeyError(f"ERROR {task_name} is not a valid task!") # Combine the task configuration with the system configuration if service_task: From 02d27ef994eea9b2d3c2a096eea981898728aa7d Mon Sep 17 00:00:00 2001 From: David Huber Date: Mon, 8 Dec 2025 14:02:50 +0000 Subject: [PATCH 7/9] Remove error check --- dev/workflow/applications/applications.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/dev/workflow/applications/applications.py b/dev/workflow/applications/applications.py index 3107db4fe5b..0e4392502ef 100644 --- a/dev/workflow/applications/applications.py +++ b/dev/workflow/applications/applications.py @@ -166,7 +166,9 @@ def _get_run_options(self, conf: Configuration) -> Dict[str, Any]: """ run_options = {run: {} for run in dict.fromkeys(self.runs)} globus_checked = False + print(self.runs) for run in self.runs: + print("here", run) # Read config.base with RUN specified run_base = conf.parse_config('config.base', RUN=run) @@ -215,14 +217,6 @@ def _get_run_options(self, conf: Configuration) -> Dict[str, Any]: if not AppConfig.is_monotonic(run_options[run]['fcst_segments']): raise ValueError(f'Forecast segments do not increase monotonically: {",".join(self.fcst_segments)}') - # Test if valid gfs options were specified - if run == "gfs": - interval = run_base.get('INTERVAL_GFS') - assim_freq = run_base.get('assim_freq') - if interval <= 0 or interval % assim_freq != 0: - raise ValueError(f'Invalid cycle interval specified for GFS forecasts: {interval}\n' - f'INTERVAL_GFS must be greater than 0 and a multiple of {assim_freq}') - if run_options[run]['do_globusarch'] and not globus_checked: self._check_globus(conf) globus_checked = True From 0779b52ec538c7a8c86ea5fab549f934b0f770dc Mon Sep 17 00:00:00 2001 From: David Huber Date: Mon, 8 Dec 2025 14:06:51 +0000 Subject: [PATCH 8/9] Remove debug print statements --- dev/workflow/applications/applications.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/dev/workflow/applications/applications.py b/dev/workflow/applications/applications.py index 0e4392502ef..3d0ebc6e36c 100644 --- a/dev/workflow/applications/applications.py +++ b/dev/workflow/applications/applications.py @@ -166,9 +166,7 @@ def _get_run_options(self, conf: Configuration) -> Dict[str, Any]: """ run_options = {run: {} for run in dict.fromkeys(self.runs)} globus_checked = False - print(self.runs) for run in self.runs: - print("here", run) # Read config.base with RUN specified run_base = conf.parse_config('config.base', RUN=run) From c62c925d93546de126b29ad2a487c08dc994c9fb Mon Sep 17 00:00:00 2001 From: David Huber Date: Mon, 8 Dec 2025 15:41:53 +0000 Subject: [PATCH 9/9] Update scripts ignore path --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index ac94c46ff30..f83211291ef 100644 --- a/.gitignore +++ b/.gitignore @@ -150,7 +150,7 @@ sorc/ocnicepost.fd #------------------------------ # jobs symlinks # scripts symlinks -scripts/exglobal_prep_ocean_obs.py +dev/scripts/exglobal_prep_ocean_obs.py # ush symlinks ush/bufr2ioda_insitu_profile_argo.py ush/bufr2ioda_insitu_profile_bathy.py