Skip to content
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 121 additions & 35 deletions ush/python/pygfs/task/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import shutil
import tarfile
from logging import getLogger
from typing import List
from typing import List, Tuple, Union
from wxflow import (AttrDict, FileHandler, Hsi, Htar, Task, to_timedelta,
chgrp, get_gid, logit, mkdir_p, parse_j2yaml, rm_p, rmdir,
strftime, to_YMDH, which, chdir, ProcessError, save_as_yaml,
Expand Down Expand Up @@ -749,65 +749,151 @@ def _pop_git_info(self, arch_dict: AttrDict) -> None:

return

def _normalize_arch_cyc(self, arch_cyc: Union[int, List[int], Tuple[int, ...]]) -> List[int]:
"""
Normalizes ARCH_CYC configuration to a list of integers.

Parameters
----------
arch_cyc : int, list of int, or tuple of int
Cycle hour(s) for archiving

Returns
-------
List[int]
List of cycle hours as integers

Raises
------
ValueError
If arch_cyc is not an int, list, or tuple, or contains non-integer values
"""
if isinstance(arch_cyc, int):
return [arch_cyc]
elif isinstance(arch_cyc, (list, tuple)):
try:
return [int(cyc) for cyc in arch_cyc]
except (ValueError, TypeError) as e:
raise ValueError(f"ARCH_CYC list must contain only integers: {e}")
else:
raise ValueError("ARCH_CYC must be an int or list/tuple of ints.")

def _arch_warm_start_increments(self, arch_dict: AttrDict) -> bool:
"""
This method determines if warm restart increments are to be archived based on the
configuration settings ARCH_CYC (integer cycle number to archive on) and
ARCH_FCSTICFREQ (integer frequency in days) and the current cycle.
Determines whether warm restart increments should be archived for the current cycle.

Parameters
----------
arch_dict : AttrDict
Dictionary containing configuration options, including:
- ARCH_CYC (int or list of int): Valid cycle hours for archiving
- ARCH_FCSTICFREQ (int): Frequency in days for archiving forecast ICs
- current_cycle (datetime): The current cycle datetime
- SDATE (datetime): Reference start date
- assim_freq (int or str): Assimilation frequency in hours

Returns
-------
bool
True if warm restart increments should be archived, False otherwise.
"""

# Get the variables need to determine if warm restart increments should be archived
# Normalize ARCH_CYC to a list of integers
cycle_hours = self._normalize_arch_cyc(arch_dict.ARCH_CYC)

# Get the current cycle and the ARCH_CYC
# Check if current cycle hour matches any configured cycle hour
cycle_HH = int(strftime(arch_dict.current_cycle, "%H"))
arch_cyc = arch_dict.ARCH_CYC
SDATE = arch_dict.SDATE
assim_freq = arch_dict.assim_freq
if cycle_HH not in cycle_hours:
return False

# Validate archiving frequency
try:
fcsticfreq = int(arch_dict.ARCH_FCSTICFREQ)
except (ValueError, TypeError) as e:
raise ValueError(f"ARCH_FCSTICFREQ must be an integer: {e}")

if cycle_HH != arch_cyc:
# Not the right cycle hour
if fcsticfreq <= 0:
return False

ics_offset_cycle = add_to_datetime(arch_dict.current_cycle, to_timedelta(f"+{assim_freq}H"))
# Calculate offset cycle and check day frequency
try:
SDATE = arch_dict.SDATE
assim_freq = int(arch_dict.assim_freq)
ics_offset_cycle = add_to_datetime(
arch_dict.current_cycle,
to_timedelta(f"+{assim_freq}H")
)
except (AttributeError, KeyError, ValueError, TypeError) as e:
raise ValueError(f"Invalid configuration for date calculations: {e}")

days_since_sdate = (ics_offset_cycle - SDATE).days
if arch_dict.ARCH_FCSTICFREQ > 0 and days_since_sdate % arch_dict.ARCH_FCSTICFREQ == 0:
# We are on the right cycle hour and the right day
if days_since_sdate % fcsticfreq == 0:
return True

# Otherwise, do not archive warm restarts
return False

def _arch_warm_restart_ics(self, arch_dict: AttrDict) -> bool:
"""
This method determines if warm ICs are to be archived based on the
configuration settings ARCH_CYC (integer cycle number to archive on) and
ARCH_WARMICFREQ (integer frequency in days) and the current cycle.
Determines whether warm initial conditions (ICs) should be archived for the current cycle.

For GDAS and EnKFGDAS runs, the archive cycle hours are adjusted by subtracting the
assimilation frequency, as ICs lag forecast increments by that amount.

Parameters
----------
arch_dict : AttrDict
Dictionary containing configuration options, including:
- ARCH_CYC (int or list of int): Target cycle hour(s) for archiving
- ARCH_WARMICFREQ (int): Frequency in days for archiving warm ICs
- current_cycle (datetime): The current cycle datetime
- SDATE (datetime): Reference start date
- assim_freq (int or str): Assimilation frequency in hours
- RUN (str): Run type identifier (e.g., "gdas", "gfs")

Returns
-------
bool
True if warm ICs should be archived, False otherwise.
"""
HOURS_PER_DAY = 24

# Get the variables need to determine if warm restart ICs should be archived
# Extract and validate basic configuration
try:
cycle_HH = int(strftime(arch_dict.current_cycle, "%H"))
SDATE = arch_dict.SDATE
RUN = arch_dict.RUN.lower()
assim_freq = int(arch_dict.assim_freq)
warmicfreq = int(arch_dict.ARCH_WARMICFREQ)
except (AttributeError, KeyError, ValueError, TypeError) as e:
raise ValueError(f"Invalid or missing configuration in arch_dict: {e}")

# Validate frequency
if warmicfreq <= 0:
return False

cycle_HH = int(strftime(arch_dict.current_cycle, "%H"))
SDATE = arch_dict.SDATE
RUN = arch_dict.RUN
assim_freq = int(arch_dict.assim_freq)
arch_cyc_val = int(arch_dict.ARCH_CYC)

# The GDAS and EnKFGDAS ICs always lag the forecast increments by assim_freq hours
if "gdas" in RUN:
arch_cyc = (arch_cyc_val - assim_freq) % 24
else:
arch_cyc = arch_cyc_val
# Normalize ARCH_CYC to a list of integers
cycle_hours = self._normalize_arch_cyc(arch_dict.ARCH_CYC)

# Adjust cycle hours for GDAS runs
# GDAS and EnKFGDAS ICs lag forecast increments by assim_freq hours
is_gdas_run = RUN.startswith("gdas") or RUN.startswith("enkfgdas")
adjusted_cycle_hours = []
for cyc_hour in cycle_hours:
if is_gdas_run:
adjusted_hour = (cyc_hour - assim_freq) % HOURS_PER_DAY
else:
adjusted_hour = cyc_hour
adjusted_cycle_hours.append(adjusted_hour)

if cycle_HH != arch_cyc:
# Not the right cycle hour
# Check if current cycle hour matches any adjusted cycle hour
if cycle_HH not in adjusted_cycle_hours:
return False

# Check if the day frequency criterion is met
days_since_sdate = (arch_dict.current_cycle - SDATE).days
if arch_dict.ARCH_WARMICFREQ > 0 and days_since_sdate % arch_dict.ARCH_WARMICFREQ == 0:
# We are on the right cycle hour and the right day
if days_since_sdate % warmicfreq == 0:
return True

# Otherwise, do not archive warm restarts
return False

def _arch_restart(self, arch_dict: AttrDict) -> bool:
Expand Down
15 changes: 12 additions & 3 deletions ush/python/pygfs/utils/archive_tar_vars.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,17 +415,26 @@ def _get_enkf_specific_cyc_vars(config_dict: AttrDict, current_cycle) -> AttrDic
# Both archive groups require: is_gdas AND SDATE AND specific day/cycle conditions
sdate = config_dict.get('SDATE')
arch_warmicfreq = config_dict.get('ARCH_WARMICFREQ', 1)
arch_cyc = config_dict.get('ARCH_CYC', 0)
arch_cyc_raw = config_dict.get('ARCH_CYC', 0)
if isinstance(arch_cyc_raw, int):
arch_cyc_list = [arch_cyc_raw]
elif isinstance(arch_cyc_raw, str):
arch_cyc_list = [int(val) for val in arch_cyc_raw.strip().split()]
elif isinstance(arch_cyc_raw, (list, tuple)):
arch_cyc_list = [int(val) for val in arch_cyc_raw]
else:
arch_cyc_list = [0]
assim_freq = config_dict.get('assim_freq', 6)

# Archive timing booleans - increments (group a)
# Logic: (current_cycle - SDATE).days % ARCH_WARMICFREQ == 0 AND is_gdas AND ARCH_CYC == cycle_HH
enkf_vars['archive_increments'] = False
current_cycle_days = (current_cycle - sdate).days
cycle_hour = int(current_cycle.strftime("%H"))
enkf_vars['archive_increments'] = (
(current_cycle_days % arch_warmicfreq == 0) and
enkf_vars.get('is_gdas', False) and
(arch_cyc == int(current_cycle.strftime("%H")))
(cycle_hour in arch_cyc_list)
)

# Archive timing booleans - ICs (group b)
Expand All @@ -436,7 +445,7 @@ def _get_enkf_specific_cyc_vars(config_dict: AttrDict, current_cycle) -> AttrDic
enkf_vars['archive_ics'] = (
(ics_offset_days % arch_warmicfreq == 0) and
enkf_vars.get('is_gdas', False) and
((arch_cyc - assim_freq) % 24 == int(current_cycle.strftime("%H")))
any((val - assim_freq) % 24 == cycle_hour for val in arch_cyc_list)
)

# Warm start flags (placeholders for future use)
Expand Down
Loading