Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/reformatters/common/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@

import contextlib
import functools
import re
import threading
import time
import uuid
from collections.abc import Sequence
from datetime import timedelta
from pathlib import Path
from typing import TYPE_CHECKING
from urllib.parse import urlparse
from urllib.parse import quote, urlparse

import httpx
import numpy as np
Expand Down Expand Up @@ -329,3 +330,11 @@ def httpx_download_to_disk(
raise

return local_path


def s3_list_first_key_with_prefix(base_url: str, key_prefix: str) -> str | None:
"""List S3 objects matching key_prefix and return the first key, or None if empty."""
list_url = f"{base_url}/?list-type=2&prefix={quote(key_prefix, safe='')}&max-keys=1"
response = _httpx_get_with_retry(list_url)
keys = re.findall(r"<Key>([^<]+)</Key>", response.text)
return keys[0] if keys else None
29 changes: 27 additions & 2 deletions src/reformatters/noaa/mrms/conus_analysis_hourly/region_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@

from reformatters.common.binary_rounding import round_float32_inplace
from reformatters.common.deaccumulation import deaccumulate_to_rates_inplace
from reformatters.common.download import http_download_to_disk
from reformatters.common.download import (
http_download_to_disk,
s3_list_first_key_with_prefix,
)
from reformatters.common.logging import get_logger
from reformatters.common.pydantic import replace
from reformatters.common.region_job import (
Expand All @@ -33,6 +36,8 @@

type DownloadSource = Literal["iowa", "s3", "ncep"]

_NOAA_MRMS_S3_BASE_URL = "https://noaa-mrms-pds.s3.amazonaws.com"


class NoaaMrmsSourceFileCoord(SourceFileCoord):
time: Timestamp
Expand Down Expand Up @@ -115,7 +120,27 @@ def generate_source_file_coords(
def _download_from_source(
self, coord: NoaaMrmsSourceFileCoord, source: DownloadSource
) -> Path:
gz_path = http_download_to_disk(coord.get_url(source=source), self.dataset_id)
try:
gz_path = http_download_to_disk(
coord.get_url(source=source), self.dataset_id
)
except FileNotFoundError:
# RadarOnly QPE is published at 2-min intervals. When the system starts late
# the exact HH:00:00 file is missing but a file at HH:MM:00 (MM > 0) exists.
# List the hour's directory and try the first file found.
if source != "s3" or not coord.product.startswith("RadarOnly_QPE_"):
raise
date_str = coord.time.strftime("%Y%m%d")
hour_str = coord.time.strftime("%Y%m%d-%H")
key_prefix = f"CONUS/{coord.product}_{coord.level}/{date_str}/MRMS_{coord.product}_{coord.level}_{hour_str}"
first_key = s3_list_first_key_with_prefix(
_NOAA_MRMS_S3_BASE_URL, key_prefix
)
if first_key is None:
raise
gz_path = http_download_to_disk(
f"{_NOAA_MRMS_S3_BASE_URL}/{first_key}", self.dataset_id
)
return _decompress_gzip(gz_path)

def download_file(self, coord: NoaaMrmsSourceFileCoord) -> Path:
Expand Down
56 changes: 56 additions & 0 deletions tests/common/test_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
http_download_to_disk,
http_store,
httpx_download_to_disk,
s3_list_first_key_with_prefix,
s3_store,
)

Expand Down Expand Up @@ -432,3 +433,58 @@ def test_httpx_get_with_retry_raises_on_4xx() -> None:
pytest.raises(httpx.HTTPStatusError),
):
_httpx_get_with_retry("https://example.com/test")


# --- s3_list_first_key_with_prefix tests ---


def _make_s3_list_xml(keys: list[str]) -> str:
contents = "".join(f"<Contents><Key>{k}</Key></Contents>" for k in keys)
return f"<?xml version='1.0'?><ListBucketResult>{contents}</ListBucketResult>"


def test_s3_list_first_key_with_prefix_returns_first_key() -> None:
xml = _make_s3_list_xml(["CONUS/RadarOnly/20210916/file-220800.grib2.gz"])
response = _make_httpx_response(status_code=200, content=xml.encode())

with patch.object(download_module, "_httpx_get_with_retry", return_value=response):
result = s3_list_first_key_with_prefix(
"https://noaa-mrms-pds.s3.amazonaws.com",
"CONUS/RadarOnly/20210916/file-22",
)

assert result == "CONUS/RadarOnly/20210916/file-220800.grib2.gz"


def test_s3_list_first_key_with_prefix_returns_none_when_empty() -> None:
xml = _make_s3_list_xml([])
response = _make_httpx_response(status_code=200, content=xml.encode())

with patch.object(download_module, "_httpx_get_with_retry", return_value=response):
result = s3_list_first_key_with_prefix(
"https://noaa-mrms-pds.s3.amazonaws.com",
"CONUS/RadarOnly/20210916/file-22",
)

assert result is None


def test_s3_list_first_key_with_prefix_uses_correct_list_url() -> None:
captured_urls: list[str] = []

def fake_get(url: str, **kwargs: object) -> httpx.Response:
captured_urls.append(url)
return _make_httpx_response(
status_code=200, content=_make_s3_list_xml([]).encode()
)

with patch.object(download_module, "_httpx_get_with_retry", fake_get):
s3_list_first_key_with_prefix(
"https://bucket.s3.amazonaws.com",
"some/key/prefix",
)

assert len(captured_urls) == 1
assert "list-type=2" in captured_urls[0]
assert "prefix=some%2Fkey%2Fprefix" in captured_urls[0]
assert "max-keys=1" in captured_urls[0]
119 changes: 118 additions & 1 deletion tests/noaa/mrms/conus_analysis_hourly/region_job_test.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import gzip
from pathlib import Path
from typing import Self
from unittest.mock import Mock
from unittest.mock import Mock, patch

import numpy as np
import pandas as pd
import pytest

from reformatters.common import download as download_module
from reformatters.common import template_utils
from reformatters.common.pydantic import replace
from reformatters.common.region_job import SourceFileStatus
from reformatters.common.storage import DatasetFormat, StorageConfig, StoreFactory
from reformatters.noaa.mrms.conus_analysis_hourly.region_job import (
_NOAA_MRMS_S3_BASE_URL,
NoaaMrmsRegionJob,
NoaaMrmsSourceFileCoord,
)
Expand Down Expand Up @@ -372,6 +375,120 @@ def test_update_template_with_results(
assert len(result_ds.time) == len(template_ds.time)


_DEFAULT_RADAR_ONLY_TIME = pd.Timestamp("2021-09-16T22:00")


def _make_region_job() -> NoaaMrmsRegionJob:
mock_ds = Mock()
mock_ds.attrs = {"dataset_id": "noaa-mrms-conus-analysis-hourly"}
return NoaaMrmsRegionJob.model_construct(
tmp_store=Mock(),
template_ds=mock_ds,
data_vars=[],
append_dim="time",
region=slice(0, 1),
reformat_job_name="test",
)


def _radar_only_coord(
time: pd.Timestamp = _DEFAULT_RADAR_ONLY_TIME,
) -> NoaaMrmsSourceFileCoord:
return NoaaMrmsSourceFileCoord(
time=time,
product="RadarOnly_QPE_01H",
level="00.00",
fallback_products=(),
)


def test_download_from_source_radaronly_falls_back_to_nearest_file_in_hour(
tmp_path: Path,
) -> None:
region_job = _make_region_job()
coord = _radar_only_coord()

nearest_key = "CONUS/RadarOnly_QPE_01H_00.00/20210916/MRMS_RadarOnly_QPE_01H_00.00_20210916-220800.grib2.gz"
gz_path = tmp_path / "fake.grib2.gz"
with gzip.open(gz_path, "wb") as f:
f.write(b"data")

exact_url = coord.get_url(source="s3")
nearest_url = f"{_NOAA_MRMS_S3_BASE_URL}/{nearest_key}"

downloaded_paths: list[str] = []

def fake_http_download(url: str, dataset_id: str) -> Path:
downloaded_paths.append(url)
if url == exact_url:
raise FileNotFoundError(f"Not found: {url}")
return gz_path

decompressed_path = tmp_path / "fake.grib2"

with (
patch.object(download_module, "_httpx_get_with_retry") as mock_list,
patch(
"reformatters.noaa.mrms.conus_analysis_hourly.region_job.http_download_to_disk",
side_effect=fake_http_download,
),
):
mock_list.return_value = Mock(
text=f"<ListBucketResult><Contents><Key>{nearest_key}</Key></Contents></ListBucketResult>",
status_code=200,
)
result = region_job._download_from_source(coord, source="s3")

assert downloaded_paths == [exact_url, nearest_url]
assert result == decompressed_path


def test_download_from_source_radaronly_reraises_when_no_files_in_hour(
tmp_path: Path,
) -> None:
region_job = _make_region_job()
coord = _radar_only_coord()

with patch.object(download_module, "_httpx_get_with_retry") as mock_list:
mock_list.return_value = Mock(
text="<ListBucketResult></ListBucketResult>",
status_code=200,
)
with (
patch(
"reformatters.noaa.mrms.conus_analysis_hourly.region_job.http_download_to_disk",
side_effect=FileNotFoundError("Not found"),
),
pytest.raises(FileNotFoundError),
):
region_job._download_from_source(coord, source="s3")


def test_download_from_source_multisensor_reraises_without_listing(
tmp_path: Path,
) -> None:
region_job = _make_region_job()
coord = NoaaMrmsSourceFileCoord(
time=pd.Timestamp("2021-09-16T22:00"),
product="MultiSensor_QPE_01H_Pass2",
level="00.00",
fallback_products=(),
)

with (
patch.object(download_module, "_httpx_get_with_retry") as mock_list,
patch(
"reformatters.noaa.mrms.conus_analysis_hourly.region_job.http_download_to_disk",
side_effect=FileNotFoundError("Not found"),
),
pytest.raises(FileNotFoundError),
):
region_job._download_from_source(coord, source="s3")

# S3 listing should NOT be called for non-RadarOnly products
mock_list.assert_not_called()


@pytest.mark.slow
@pytest.mark.parametrize(
("time", "expected_product"),
Expand Down