Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/reformatters/common/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def download_to_disk(
with open(temp_path, "wb") as file:
file.writelines(response_buffers)

temp_path.rename(local_path)
temp_path.replace(local_path)

except Exception:
with contextlib.suppress(FileNotFoundError):
Expand Down
42 changes: 42 additions & 0 deletions src/reformatters/common/ingest_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from collections.abc import Mapping, Sequence
from typing import Protocol

import xarray as xr

from reformatters.common.logging import get_logger
from reformatters.common.types import Timedelta, Timestamp

log = get_logger(__name__)


class DeterministicForecastSourceFileCoord(Protocol):
init_time: Timestamp
lead_time: Timedelta


def update_ingested_forecast_length(
template_ds: xr.Dataset,
results_coords: Mapping[str, Sequence[DeterministicForecastSourceFileCoord]],
) -> xr.Dataset:
"""
Updates the 'ingested_forecast_length' coordinate in the template dataset.

The maximum processed lead time across all variables is set as the
ingested_forecast_length. This can hide the nuance of a specific variable
having fewer lead times processed than others.
"""
assert "ingested_forecast_length" in template_ds.coords

max_lead_per_init: dict[Timestamp, Timedelta] = {}

for coords_seq in results_coords.values():
for coord in coords_seq:
if (
coord.init_time not in max_lead_per_init
or coord.lead_time > max_lead_per_init[coord.init_time]
):
max_lead_per_init[coord.init_time] = coord.lead_time

for init_time, max_lead in max_lead_per_init.items():
template_ds["ingested_forecast_length"].loc[{"init_time": init_time}] = max_lead
return template_ds
4 changes: 2 additions & 2 deletions src/reformatters/common/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def _copy_data_var_chunks(
for file in tmp_store.glob(f"{relative_dir}**/*"):
if not file.is_file():
continue
key = str(file.relative_to(tmp_store))
key = file.relative_to(tmp_store).as_posix()
sync_to_store(store, key, file.read_bytes())


Expand Down Expand Up @@ -155,7 +155,7 @@ def _copy_metadata_files(
store: Store,
) -> None:
for file in metadata_files:
relative_path = str(file.relative_to(tmp_store))
relative_path = file.relative_to(tmp_store).as_posix()
sync_to_store(store, relative_path, file.read_bytes())


Expand Down
8 changes: 8 additions & 0 deletions src/reformatters/noaa/gfs/region_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from reformatters.common.download import (
http_download_to_disk,
)
from reformatters.common.ingest_stats import update_ingested_forecast_length
from reformatters.common.iterating import digest, group_by
from reformatters.common.logging import get_logger
from reformatters.common.region_job import (
Expand Down Expand Up @@ -150,6 +151,13 @@ def apply_data_transformations(
if isinstance(keep_mantissa_bits, int):
round_float32_inplace(data_array.values, keep_mantissa_bits)

def update_template_with_results(
self,
process_results: Mapping[str, Sequence[NoaaGfsSourceFileCoord]],
) -> xr.Dataset:
ds = super().update_template_with_results(process_results)
return update_ingested_forecast_length(ds, process_results)

@classmethod
def operational_update_jobs(
cls,
Expand Down
73 changes: 73 additions & 0 deletions tests/common/test_ingest_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from collections.abc import Mapping

import pandas as pd
import xarray as xr

from reformatters.common.ingest_stats import update_ingested_forecast_length
from reformatters.common.region_job import CoordinateValueOrRange, SourceFileCoord
from reformatters.common.types import Dim, Timedelta, Timestamp


class MockSourceFileCoord(SourceFileCoord):
init_time: Timestamp
lead_time: Timedelta

def out_loc(self) -> Mapping[Dim, CoordinateValueOrRange]:
return {}


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are missing a test that checks that the existing values in the array not not modified.

def test_update_ingested_forecast_length_simple() -> None:
init_times = [
pd.Timestamp("2025-01-01 12:00"),
pd.Timestamp("2025-01-01 18:00"),
]

empty_deltas = pd.to_timedelta([pd.NaT, pd.NaT]).to_numpy() # type: ignore[call-overload]
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aldenks just a headsup this line was failing in type so i had to ignore this line while type checking.


ds = xr.Dataset(
coords={
"init_time": init_times,
"ingested_forecast_length": (("init_time",), empty_deltas),
}
)

coord1 = MockSourceFileCoord(
init_time=pd.Timestamp("2025-01-01 12:00"),
lead_time=pd.Timedelta(hours=6),
)
coord2 = MockSourceFileCoord(
init_time=pd.Timestamp("2025-01-01 18:00"),
lead_time=pd.Timedelta(hours=48),
)

results = {"var1": [coord1, coord2]}
ds = update_ingested_forecast_length(ds, results)

assert ds["ingested_forecast_length"].sel(
init_time="2025-01-01 12:00"
).values == pd.Timedelta(hours=6)
assert ds["ingested_forecast_length"].sel(
init_time="2025-01-01 18:00"
).values == pd.Timedelta(hours=48)


def test_update_ingested_forecast_length_update_existing() -> None:
init_time = pd.Timestamp("2025-01-01 12:00")

ds = xr.Dataset(
coords={
"init_time": [init_time],
"ingested_forecast_length": (("init_time",), [pd.Timedelta(hours=6)]),
}
)

new_coord = MockSourceFileCoord(
init_time=init_time,
lead_time=pd.Timedelta(hours=12),
)

ds = update_ingested_forecast_length(ds, {"var1": [new_coord]})

assert ds["ingested_forecast_length"].sel(
init_time=init_time
).values == pd.Timedelta(hours=12)
5 changes: 5 additions & 0 deletions tests/noaa/gfs/forecast/dynamical_dataset_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ def _check_updated_store(store: Store) -> None:
dtype="datetime64[ns]",
),
)

assert updated_ds["ingested_forecast_length"].sel(
init_time="2021-05-01T12:00:00"
).values == pd.Timedelta(hours=3)

point_ds2 = updated_ds.sel(
latitude=0,
longitude=0,
Expand Down
Loading