-
Notifications
You must be signed in to change notification settings - Fork 6
feat: Implement ingested_forecast_length utility and integrate with GFS (#412) #421
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 4 commits
6332ae1
6d5f762
e79f998
6f0270c
75f169c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,47 @@ | ||
| from collections.abc import Mapping, Sequence | ||
| from typing import Protocol | ||
|
|
||
| import xarray as xr | ||
|
|
||
| from reformatters.common.logging import get_logger | ||
| from reformatters.common.types import Timedelta, Timestamp | ||
|
|
||
| log = get_logger(__name__) | ||
|
|
||
|
|
||
| class DeterministicForecastSourceFileCoord(Protocol): | ||
| init_time: Timestamp | ||
| lead_time: Timedelta | ||
|
|
||
|
|
||
| class HasTimeInfo(Protocol): | ||
| init_time: Timestamp | ||
| lead_time: Timedelta | ||
|
|
||
|
|
||
| def update_ingested_forecast_length( | ||
| template_ds: xr.Dataset, | ||
| results_coords: Mapping[str, Sequence[DeterministicForecastSourceFileCoord]], | ||
| ) -> xr.Dataset: | ||
| """ | ||
| Updates the 'ingested_forecast_length' coordinate in the template dataset. | ||
| """ | ||
ArkVex marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| assert "ingested_forecast_length" in template_ds.coords | ||
|
|
||
| max_lead_per_init: dict[Timestamp, Timedelta] = {} | ||
|
|
||
| for coords_seq in results_coords.values(): | ||
| for coord in coords_seq: | ||
| if ( | ||
| coord.init_time not in max_lead_per_init | ||
| or coord.lead_time > max_lead_per_init[coord.init_time] | ||
| ): | ||
| max_lead_per_init[coord.init_time] = coord.lead_time | ||
|
|
||
| for init_time, max_lead in max_lead_per_init.items(): | ||
| if init_time in template_ds.coords["init_time"]: | ||
| template_ds["ingested_forecast_length"].loc[{"init_time": init_time}] = ( | ||
| max_lead | ||
| ) | ||
|
|
||
| return template_ds | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,6 +13,7 @@ | |
| from reformatters.common.download import ( | ||
| http_download_to_disk, | ||
| ) | ||
| from reformatters.common.ingest_stats import update_ingested_forecast_length | ||
| from reformatters.common.iterating import digest, group_by | ||
| from reformatters.common.logging import get_logger | ||
| from reformatters.common.region_job import ( | ||
|
|
@@ -150,6 +151,20 @@ def apply_data_transformations( | |
| if isinstance(keep_mantissa_bits, int): | ||
| round_float32_inplace(data_array.values, keep_mantissa_bits) | ||
|
|
||
| def update_template_with_results( | ||
| self, | ||
| process_results: Mapping[str, Sequence[NoaaGfsSourceFileCoord]], | ||
| ) -> xr.Dataset: | ||
| ds = super().update_template_with_results(process_results) | ||
|
|
||
| all_coords = [] | ||
| for coord_list in process_results.values(): | ||
| all_coords.extend(coord_list) | ||
|
|
||
| update_ingested_forecast_length(ds, all_coords) | ||
|
|
||
| return ds | ||
|
||
|
|
||
| @classmethod | ||
| def operational_update_jobs( | ||
| cls, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,75 @@ | ||
| from collections.abc import Mapping | ||
|
|
||
| import numpy as np | ||
| import pandas as pd | ||
| import xarray as xr | ||
|
|
||
| from reformatters.common.ingest_stats import update_ingested_forecast_length | ||
| from reformatters.common.region_job import CoordinateValueOrRange, SourceFileCoord | ||
| from reformatters.common.types import Dim, Timedelta, Timestamp | ||
|
|
||
|
|
||
| class MockSourceFileCoord(SourceFileCoord): | ||
| init_time: Timestamp | ||
| lead_time: Timedelta | ||
|
|
||
| def out_loc(self) -> Mapping[Dim, CoordinateValueOrRange]: | ||
| return {} | ||
|
|
||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are missing a test that checks that the existing values in the array not not modified. |
||
| def test_update_ingested_forecast_length_simple() -> None: | ||
| init_times = [ | ||
| pd.Timestamp("2025-01-01 12:00"), | ||
| pd.Timestamp("2025-01-01 18:00"), | ||
| ] | ||
|
|
||
| empty_deltas = np.array([pd.NaT, pd.NaT], dtype="timedelta64[ns]") | ||
|
|
||
| ds = xr.Dataset( | ||
| coords={ | ||
| "init_time": init_times, | ||
| "ingested_forecast_length": (("init_time",), empty_deltas), | ||
| } | ||
| ) | ||
|
|
||
| coord1 = MockSourceFileCoord( | ||
| init_time=pd.Timestamp("2025-01-01 12:00"), | ||
| lead_time=pd.Timedelta(hours=6), | ||
| ) | ||
| coord2 = MockSourceFileCoord( | ||
| init_time=pd.Timestamp("2025-01-01 18:00"), | ||
| lead_time=pd.Timedelta(hours=48), | ||
| ) | ||
|
|
||
| results = [coord1, coord2] | ||
|
|
||
| update_ingested_forecast_length(ds, results) | ||
|
|
||
| assert ds["ingested_forecast_length"].sel( | ||
| init_time="2025-01-01 12:00" | ||
| ).values == pd.Timedelta(hours=6) | ||
| assert ds["ingested_forecast_length"].sel( | ||
| init_time="2025-01-01 18:00" | ||
| ).values == pd.Timedelta(hours=48) | ||
|
|
||
|
|
||
| def test_update_ingested_forecast_length_update_existing() -> None: | ||
| init_time = pd.Timestamp("2025-01-01 12:00") | ||
|
|
||
| ds = xr.Dataset( | ||
| coords={ | ||
| "init_time": [init_time], | ||
| "ingested_forecast_length": (("init_time",), [pd.Timedelta(hours=6)]), | ||
| } | ||
| ) | ||
|
|
||
| new_coord = MockSourceFileCoord( | ||
| init_time=init_time, | ||
| lead_time=pd.Timedelta(hours=12), | ||
| ) | ||
|
|
||
| update_ingested_forecast_length(ds, [new_coord]) | ||
|
|
||
| assert ds["ingested_forecast_length"].sel( | ||
| init_time=init_time | ||
| ).values == pd.Timedelta(hours=12) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| /c/Users/Lenovo/OneDrive/Desktop/reformatters | ||
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| /c/Users/Lenovo/OneDrive/Desktop/reformatters |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| /c/Users/Lenovo/OneDrive/Desktop/reformatters |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| /c/Users/Lenovo/OneDrive/Desktop/reformatters |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| /c/Users/Lenovo/OneDrive/Desktop/reformatters |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| /c/Users/Lenovo/OneDrive/Desktop/reformatters |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| /c/Users/Lenovo/OneDrive/Desktop/reformatters |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| /c/Users/Lenovo/OneDrive/Desktop/reformatters |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| /c/Users/Lenovo/OneDrive/Desktop/reformatters |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove