Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 47 additions & 17 deletions src/reformatters/noaa/mrms/conus_analysis_hourly/region_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from reformatters.common.deaccumulation import deaccumulate_to_rates_inplace
from reformatters.common.download import http_download_to_disk
from reformatters.common.logging import get_logger
from reformatters.common.pydantic import replace
from reformatters.common.region_job import (
CoordinateValueOrRange,
RegionJob,
Expand All @@ -36,7 +37,8 @@
class NoaaMrmsSourceFileCoord(SourceFileCoord):
time: Timestamp
product: str
level: str = "00.00"
level: str
fallback_products: tuple[str, ...]

def get_url(self, source: DownloadSource = "s3") -> str:
date_str = self.time.strftime("%Y%m%d")
Expand Down Expand Up @@ -81,6 +83,7 @@ def generate_source_file_coords(
processing_region_ds: xr.Dataset,
data_var_group: Sequence[NoaaMrmsDataVar],
) -> Sequence[NoaaMrmsSourceFileCoord]:
assert len(data_var_group) == 1
times = pd.to_datetime(processing_region_ds["time"].values)
data_var = data_var_group[0]
internal = data_var.internal_attrs
Expand All @@ -92,16 +95,19 @@ def generate_source_file_coords(
continue

# Use pre-v12 product name for times before v12 launch
if internal.mrms_product_pre_v12 is not None and time < MRMS_V12_START:
if time < MRMS_V12_START:
product = internal.mrms_product_pre_v12
fallback_products = internal.mrms_fallback_products_pre_v12
else:
product = internal.mrms_product
fallback_products = internal.mrms_fallback_products

coords.append(
NoaaMrmsSourceFileCoord(
time=time,
product=product,
level=internal.mrms_level,
fallback_products=fallback_products,
)
)
return coords
Expand All @@ -113,14 +119,31 @@ def _download_from_source(
return _decompress_gzip(gz_path)

def download_file(self, coord: NoaaMrmsSourceFileCoord) -> Path:
if coord.time < MRMS_V12_START:
return self._download_from_source(coord, source="iowa")
try:
return self._download_from_source(coord, source="s3")
except FileNotFoundError:
if coord.time > (pd.Timestamp.now() - pd.Timedelta(hours=12)):
return self._download_from_source(coord, source="ncep")
raise
is_pre_v12 = coord.time < MRMS_V12_START
is_recent = coord.time > (pd.Timestamp.now() - pd.Timedelta(hours=12))

sources: tuple[DownloadSource, ...]
if is_pre_v12:
sources = ("iowa",)
elif is_recent:
sources = ("s3", "ncep")
else:
sources = ("s3",)

products = (coord.product, *coord.fallback_products)

last_exception: FileNotFoundError | None = None
for product in products:
for source in sources:
try:
return self._download_from_source(
replace(coord, product=product), source=source
)
except FileNotFoundError as exc:
last_exception = exc

assert last_exception is not None
raise last_exception

def read_data(
self,
Expand All @@ -130,18 +153,25 @@ def read_data(
assert coord.downloaded_path is not None
with rasterio.open(coord.downloaded_path) as reader:
if reader.count == 2 and coord.time < MRMS_V12_START:
# Some pre-v12 Iowa Mesonet files have a duplicate GRIB message with
# standard meteorological discipline (0) alongside the MRMS-specific one (209).
# Band 1 (discipline 209) is always the authoritative MRMS data.
band2_discipline = reader.tags(2).get("GRIB_DISCIPLINE", "")
assert band2_discipline == "0(Meteorological)", (
f"Expected band 2 GRIB_DISCIPLINE '0(Meteorological)', found '{band2_discipline}' in {coord.downloaded_path}"
rasterio_band = next(
(
band
for band in (1, 2)
if reader.tags(band)
.get("GRIB_DISCIPLINE", "")
.startswith("209")
),
None,
)
assert rasterio_band is not None, (
f"Expected one band with GRIB_DISCIPLINE 209 in {coord.downloaded_path}"
)
else:
assert reader.count == 1, (
f"Expected exactly 1 band, found {reader.count} in {coord.downloaded_path}"
)
result: ArrayFloat32 = reader.read(1, out_dtype=np.float32)
rasterio_band = 1
result: ArrayFloat32 = reader.read(rasterio_band, out_dtype=np.float32)
return result

def apply_data_transformations(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@

class NoaaMrmsInternalAttrs(BaseInternalAttrs):
mrms_product: str
# Pre-v12 product name on Iowa Mesonet (e.g. GaugeCorr_QPE_01H for precipitation_surface)
mrms_product_pre_v12: str | None = None
mrms_level: str = "00.00"
mrms_product_pre_v12: str
mrms_fallback_products_pre_v12: tuple[str, ...]
mrms_fallback_products: tuple[str, ...]
mrms_level: str
# Product only available from this time onwards; earlier times emit NaN
available_from: Timestamp | None = None
# For deaccumulation: MRMS hourly QPE is a 1-hour fixed window accumulation
Expand Down Expand Up @@ -217,11 +218,17 @@ def data_vars(self) -> Sequence[NoaaMrmsDataVar]:
long_name="Precipitation rate",
units="kg m-2 s-1",
step_type="avg",
comment="Average precipitation rate over the previous hour. Derived from MultiSensor_QPE_01H_Pass2 from October 2020, GaugeCorr_QPE_01H before. Units equivalent to mm/s.",
comment="Average precipitation rate over the previous hour. Derived from MultiSensor_QPE_01H_Pass2 from October 2020, GaugeCorr_QPE_01H before. If primary product is unavailable, falls back to MultiSensor_QPE_01H_Pass1 and then RadarOnly_QPE_01H. Units equivalent to mm/s.",
),
internal_attrs=NoaaMrmsInternalAttrs(
mrms_product="MultiSensor_QPE_01H_Pass2",
mrms_product_pre_v12="GaugeCorr_QPE_01H",
mrms_fallback_products_pre_v12=("RadarOnly_QPE_01H",),
mrms_fallback_products=(
"MultiSensor_QPE_01H_Pass1",
"RadarOnly_QPE_01H",
),
mrms_level="00.00",
deaccumulate_to_rate=True,
window_reset_frequency=qpe_window_reset_frequency,
keep_mantissa_bits=default_keep_mantissa_bits,
Expand All @@ -240,6 +247,10 @@ def data_vars(self) -> Sequence[NoaaMrmsDataVar]:
),
internal_attrs=NoaaMrmsInternalAttrs(
mrms_product="MultiSensor_QPE_01H_Pass1",
mrms_product_pre_v12="MultiSensor_QPE_01H_Pass1",
mrms_fallback_products_pre_v12=(),
mrms_fallback_products=(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default these source file coord attrs to () and remove setting it explicitly to ()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in 19b2f7d: mrms_fallback_products_pre_v12 and mrms_fallback_products now default to (), and I removed explicit =() assignments from the vars that were setting empties.

mrms_level="00.00",
available_from=MRMS_V12_START,
deaccumulate_to_rate=True,
window_reset_frequency=qpe_window_reset_frequency,
Expand All @@ -259,6 +270,10 @@ def data_vars(self) -> Sequence[NoaaMrmsDataVar]:
),
internal_attrs=NoaaMrmsInternalAttrs(
mrms_product="RadarOnly_QPE_01H",
mrms_product_pre_v12="RadarOnly_QPE_01H",
mrms_fallback_products_pre_v12=(),
mrms_fallback_products=(),
mrms_level="00.00",
deaccumulate_to_rate=True,
window_reset_frequency=qpe_window_reset_frequency,
keep_mantissa_bits=default_keep_mantissa_bits,
Expand All @@ -276,6 +291,10 @@ def data_vars(self) -> Sequence[NoaaMrmsDataVar]:
),
internal_attrs=NoaaMrmsInternalAttrs(
mrms_product="PrecipFlag",
mrms_product_pre_v12="PrecipFlag",
mrms_fallback_products_pre_v12=(),
mrms_fallback_products=(),
mrms_level="00.00",
keep_mantissa_bits="no-rounding",
),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
"short_name": "prate",
"standard_name": "precipitation_flux",
"units": "kg m-2 s-1",
"comment": "Average precipitation rate over the previous hour. Derived from MultiSensor_QPE_01H_Pass2 from October 2020, GaugeCorr_QPE_01H before. Units equivalent to mm/s.",
"comment": "Average precipitation rate over the previous hour. Derived from MultiSensor_QPE_01H_Pass2 from October 2020, GaugeCorr_QPE_01H before. If primary product is unavailable, falls back to MultiSensor_QPE_01H_Pass1 and then RadarOnly_QPE_01H. Units equivalent to mm/s.",
"step_type": "avg",
"coordinates": "spatial_ref",
"_FillValue": "AAAAAAAA+H8="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@
"short_name": "prate",
"standard_name": "precipitation_flux",
"units": "kg m-2 s-1",
"comment": "Average precipitation rate over the previous hour. Derived from MultiSensor_QPE_01H_Pass2 from October 2020, GaugeCorr_QPE_01H before. Units equivalent to mm/s.",
"comment": "Average precipitation rate over the previous hour. Derived from MultiSensor_QPE_01H_Pass2 from October 2020, GaugeCorr_QPE_01H before. If primary product is unavailable, falls back to MultiSensor_QPE_01H_Pass1 and then RadarOnly_QPE_01H. Units equivalent to mm/s.",
"step_type": "avg",
"coordinates": "spatial_ref",
"_FillValue": "AAAAAAAA+H8="
Expand Down
108 changes: 108 additions & 0 deletions tests/noaa/mrms/conus_analysis_hourly/region_job_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from pathlib import Path
from typing import Self
from unittest.mock import Mock

import numpy as np
Expand Down Expand Up @@ -27,6 +28,8 @@ def test_source_file_coord_out_loc() -> None:
coord = NoaaMrmsSourceFileCoord(
time=pd.Timestamp("2024-01-15T12:00"),
product="MultiSensor_QPE_01H_Pass2",
level="00.00",
fallback_products=(),
)
assert coord.out_loc() == {"time": pd.Timestamp("2024-01-15T12:00")}

Expand All @@ -35,6 +38,8 @@ def test_source_file_coord_get_url_s3() -> None:
coord = NoaaMrmsSourceFileCoord(
time=pd.Timestamp("2024-01-15T12:00"),
product="MultiSensor_QPE_01H_Pass2",
level="00.00",
fallback_products=(),
)
url = coord.get_url(source="s3")
assert url == (
Expand All @@ -48,6 +53,8 @@ def test_source_file_coord_get_url_iowa() -> None:
coord = NoaaMrmsSourceFileCoord(
time=pd.Timestamp("2019-06-15T12:00"),
product="GaugeCorr_QPE_01H",
level="00.00",
fallback_products=(),
)
url = coord.get_url(source="iowa")
assert url == (
Expand All @@ -61,6 +68,8 @@ def test_source_file_coord_get_url_ncep() -> None:
coord = NoaaMrmsSourceFileCoord(
time=pd.Timestamp("2024-01-15T23:00"),
product="MultiSensor_QPE_01H_Pass2",
level="00.00",
fallback_products=(),
)
url = coord.get_url(source="ncep")
assert url == (
Expand Down Expand Up @@ -105,6 +114,10 @@ def test_generate_source_file_coords_post_v12(

assert len(coords) == 3
assert all(c.product == "MultiSensor_QPE_01H_Pass2" for c in coords)
assert all(
c.fallback_products == ("MultiSensor_QPE_01H_Pass1", "RadarOnly_QPE_01H")
for c in coords
)


def test_generate_source_file_coords_pre_v12(
Expand Down Expand Up @@ -133,6 +146,7 @@ def test_generate_source_file_coords_pre_v12(
assert len(coords) == 3
# Pre-v12 should use GaugeCorr_QPE_01H
assert all(c.product == "GaugeCorr_QPE_01H" for c in coords)
assert all(c.fallback_products == ("RadarOnly_QPE_01H",) for c in coords)


def test_generate_source_file_coords_pass_1_pre_v12_skipped(
Expand Down Expand Up @@ -162,6 +176,94 @@ def test_generate_source_file_coords_pass_1_pre_v12_skipped(
assert len(coords) == 0


def test_download_file_uses_fallback_products(monkeypatch: pytest.MonkeyPatch) -> None:
region_job = NoaaMrmsRegionJob.model_construct(
template_ds=Mock(),
data_vars=[],
append_dim="time",
region=slice(0, 1),
reformat_job_name="test",
)

attempts: list[tuple[str, str]] = []

def fake_download(coord: NoaaMrmsSourceFileCoord, source: str) -> Path:
attempts.append((coord.product, source))
if coord.product == "RadarOnly_QPE_01H" and source == "s3":
return Path("fake.grib2")
raise FileNotFoundError(coord.product)

monkeypatch.setattr(
pd.Timestamp, "now", classmethod(lambda *args: pd.Timestamp("2024-01-16T12:00"))
)
monkeypatch.setattr(region_job, "_download_from_source", fake_download)

path = region_job.download_file(
NoaaMrmsSourceFileCoord(
time=pd.Timestamp("2024-01-15T12:00"),
product="MultiSensor_QPE_01H_Pass2",
level="00.00",
fallback_products=("MultiSensor_QPE_01H_Pass1", "RadarOnly_QPE_01H"),
)
)

assert path == Path("fake.grib2")
assert attempts == [
("MultiSensor_QPE_01H_Pass2", "s3"),
("MultiSensor_QPE_01H_Pass1", "s3"),
("RadarOnly_QPE_01H", "s3"),
]


def test_read_data_pre_v12_two_band_selects_discipline_209(
monkeypatch: pytest.MonkeyPatch,
) -> None:
region_job = NoaaMrmsRegionJob.model_construct(
template_ds=Mock(),
data_vars=[],
append_dim="time",
region=slice(0, 1),
reformat_job_name="test",
)

data = np.ones((2, 2), dtype=np.float32)

class FakeReader:
count = 2

def __enter__(self) -> Self:
return self

def __exit__(self, *args: object) -> None:
return None

def tags(self, band: int) -> dict[str, str]:
return {
1: {"GRIB_DISCIPLINE": "0(Meteorological)"},
2: {"GRIB_DISCIPLINE": "209(Local)"},
}[band]

def read(self, band: int, out_dtype: np.dtype[np.float32]) -> np.ndarray:
assert out_dtype == np.float32
assert band == 2
return data

monkeypatch.setattr("rasterio.open", lambda *_args, **_kwargs: FakeReader())

result = region_job.read_data(
NoaaMrmsSourceFileCoord(
time=pd.Timestamp("2019-06-15T12:00"),
product="GaugeCorr_QPE_01H",
level="00.00",
fallback_products=(),
downloaded_path=Path("fake.grib2"),
),
Mock(),
)

np.testing.assert_array_equal(result, data)


@pytest.mark.parametrize(
("region", "expected_processing_region"),
[
Expand Down Expand Up @@ -301,6 +403,8 @@ def test_download_and_read_precipitation(
coord = NoaaMrmsSourceFileCoord(
time=time,
product=expected_product,
level=precip_var.internal_attrs.mrms_level,
fallback_products=precip_var.internal_attrs.mrms_fallback_products,
)

downloaded_path = region_job.download_file(coord)
Expand Down Expand Up @@ -332,6 +436,8 @@ def test_download_and_read_radar_only(tmp_path: Path) -> None:
coord = NoaaMrmsSourceFileCoord(
time=pd.Timestamp("2024-01-15T12:00"),
product="RadarOnly_QPE_01H",
level=radar_var.internal_attrs.mrms_level,
fallback_products=radar_var.internal_attrs.mrms_fallback_products,
)

downloaded_path = region_job.download_file(coord)
Expand Down Expand Up @@ -365,6 +471,8 @@ def test_download_and_read_precip_flag(tmp_path: Path) -> None:
coord = NoaaMrmsSourceFileCoord(
time=pd.Timestamp("2024-01-15T12:00"),
product="PrecipFlag",
level=ptype_var.internal_attrs.mrms_level,
fallback_products=ptype_var.internal_attrs.mrms_fallback_products,
)

downloaded_path = region_job.download_file(coord)
Expand Down
Loading