Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ sorc/ocnicepost.fd
#------------------------------
# jobs symlinks
# scripts symlinks
scripts/exglobal_prep_ocean_obs.py
dev/scripts/exglobal_prep_ocean_obs.py
# ush symlinks
ush/bufr2ioda_insitu_profile_argo.py
ush/bufr2ioda_insitu_profile_bathy.py
Expand Down
1 change: 0 additions & 1 deletion dev/workflow/rocoto/gfs_cycled_xml.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ def get_cycledefs(self):
interval_metp_str = interval_gfs_str

strings.append(f'\t<cycledef group="metp">{sdate_metp_str} {edate_metp_str} {interval_metp_str}</cycledef>')
strings.append(f'\t<cycledef group="last_gfs">{edate_gfs_str} {edate_gfs_str} 24:00:00</cycledef>')

strings.append('')
strings.append('')
Expand Down
54 changes: 39 additions & 15 deletions dev/workflow/rocoto/gfs_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2001,24 +2001,48 @@ def metp(self):
deps = []
dep_dict = {'type': 'task', 'name': f'{self.run}_arch_vrfy'}
deps.append(rocoto.add_dependency(dep_dict))
if self._base["interval_gfs"] < to_timedelta("24H"):
n_lookback = self._base["interval_gfs"] // to_timedelta("6H")
for lookback in range(1, n_lookback + 1):
deps2 = []
interval_gfs = self._base.get('interval_gfs')
assim_freq = self._base['assim_freq']

if interval_gfs < to_timedelta("24H"):
n_lookback = int(interval_gfs // to_timedelta(f"{assim_freq}H")) - 1
# Check if the previous up to `n_lookback` arch_vrfy tasks have completed
# For interval=6, there are no lookbacks
# For interval=12, check lookback=1
# For interval=18, check lookback=1,2
# Only lookback if arch_vrfy is not valid for this cycle
if n_lookback > 0:
dep_dict = {'type': 'taskvalid', 'name': f'{self.run}_arch_vrfy', 'condition': 'not'}
deps2 = []
deps2.append(rocoto.add_dependency(dep_dict))
for lookback2 in range(1, lookback):
offset = timedelta_to_HMS(-to_timedelta(f'{6 * lookback2}H'))
dep_dict = {'type': 'cycleexist', 'condition': 'not', 'offset': offset}
deps2.append(rocoto.add_dependency(dep_dict))

edate_gfs = self._base['EDATE']
edate_metp = edate_gfs.replace(hour=18)
edate_metp_diff = edate_metp - edate_gfs
offset = timedelta_to_HMS(-to_timedelta(f'{edate_metp_diff}H'))
dep_dict = {'type': 'task', 'name': f'{self.run}_arch_vrfy', 'offset': offset}
deps3 = []
for lookback in range(n_lookback):
offset = timedelta_to_HMS(-to_timedelta(f'{assim_freq * (lookback+1)}H'))
dep_dict = {'type': 'task', 'name': f'{self.run}_arch_vrfy', 'offset': offset}
deps3.append(rocoto.add_dependency(dep_dict))

deps2.append(rocoto.create_dependency(dep=deps3, dep_condition='or'))
deps.append(rocoto.create_dependency(dep=deps2, dep_condition='and'))

# Lastly, check that the last arch_vrfy job is done
# This only happens if the metp cycle is not aligned with the last_gfs cycle
sdate_gfs = self._base.get('SDATE_GFS')
edate = self._base.get('EDATE')
edate_metp = self._base.get('EDATE').replace(hour=(24 - assim_freq))
n_intervals = int((edate - sdate_gfs) // interval_gfs)
edate_gfs = sdate_gfs + n_intervals * interval_gfs
metp_gfs_offset = edate_metp - edate_gfs
if metp_gfs_offset > to_timedelta("0H") and metp_gfs_offset < to_timedelta("24H"):
deps2 = []
dep_dict = {'type': 'taskvalid', 'name': f'{self.run}_arch_vrfy', 'condition': 'not'}
deps2.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type': 'task', 'name': f'{self.run}_arch_vrfy', 'offset': timedelta_to_HMS(-metp_gfs_offset)}
deps2.append(rocoto.add_dependency(dep_dict))
for i in range(1, int((metp_gfs_offset.seconds / 3600) // assim_freq)):
dep_dict = {'type': 'cycleexist', 'offset': timedelta_to_HMS(-to_timedelta(f'{assim_freq * i}H')), 'condition': 'not'}
deps2.append(rocoto.add_dependency(dep_dict))
deps.append(rocoto.create_dependency(dep_condition='and', dep=deps2))

deps.append(rocoto.create_dependency(dep=deps2, dep_condition='and'))

dependencies = rocoto.create_dependency(dep_condition='or', dep=deps)

Expand Down
2 changes: 1 addition & 1 deletion dev/workflow/rocoto/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ def get_resource(self, task_name):
service_task = task_name in Tasks.SERVICE_TASKS

if task_name not in Tasks.VALID_TASKS:
raise KeyError(f"ERROR {task_name} is not a valid tasks!")
raise KeyError(f"ERROR {task_name} is not a valid task!")

# Combine the task configuration with the system configuration
if service_task:
Expand Down