Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,10 @@ def operational_kubernetes_resources(self, image_tag: str) -> Iterable[CronJob]:

def validators(self) -> Sequence[validation.DataValidator]:
max_expected_delay = timedelta(hours=3, minutes=30)
# Pass 1 and Pass 2 multi-sensor products have additional gauge-collection latency;
# radar-only and precipitation_surface (which falls back to radar) are always current.
lagged_vars = [
# Pass 1 and Pass 2 have gauge-collection latency (~60 min); all other vars are always current.
gauge_latency_vars = [
"precipitation_pass_1_surface",
"precipitation_pass_2_surface",
"categorical_precipitation_type_surface",
]
return (
partial(
Expand All @@ -67,13 +65,21 @@ def validators(self) -> Sequence[validation.DataValidator]:
partial(
validation.check_analysis_recent_nans,
max_expected_delay=max_expected_delay,
max_nan_percentage=15,
exclude_vars=lagged_vars,
# precipitation_surface worst-case quarter-sampled NaN is ~30% (most recent
# timestamp falls back to radar-only with ~34% structural coverage gaps).
# radar_only worst-case quarter-sampled NaN is ~34% (structural coverage gaps).
# categorical (PrecipFlag) is radar-derived with no gauge latency, similar coverage to radar_only.
max_nan_percentage=35,
spatial_sampling="quarter",
exclude_vars=gauge_latency_vars,
),
partial(
validation.check_analysis_recent_nans,
max_expected_delay=max_expected_delay,
max_nan_percentage=50,
include_vars=lagged_vars,
# pass_1 and pass_2 worst-case quarter-sampled NaN is ~38% (1 of 3 checked
# timestamps is 100% NaN due to gauge-collection latency, rest are ~6%).
max_nan_percentage=40,
spatial_sampling="quarter",
include_vars=gauge_latency_vars,
),
)
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,10 @@ def operational_update_jobs(
]:
existing_ds = xr.open_zarr(primary_store, chunks=None, decode_timedelta=True)
ds_max_time = existing_ds[append_dim].max().item()
append_dim_start = pd.Timestamp(ds_max_time)
# Start 3 hours before the dataset's latest timestamp so precipitation_surface
# (which falls back to radar-only when pass_2 is unavailable) gets reprocessed
# and overwritten with pass_2 data once it becomes available (~60-min latency).
append_dim_start = pd.Timestamp(ds_max_time) - pd.Timedelta(hours=3)

append_dim_end = pd.Timestamp.now()
template_ds = get_template_fn(append_dim_end)
Expand Down