Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 1 addition & 29 deletions src/reformatters/common/dynamical_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
from reformatters.common.storage import StorageConfig, StoreFactory, get_local_tmp_store
from reformatters.common.template_config import TemplateConfig
from reformatters.common.types import DatetimeLike
from reformatters.common.update_progress_tracker import UpdateProgressTracker
from reformatters.common.zarr import copy_zarr_metadata

DATA_VAR = TypeVar("DATA_VAR", bound=DataVar[Any])
Expand All @@ -50,8 +49,6 @@ class DynamicalDataset(FrozenBaseModel, Generic[DATA_VAR, SOURCE_FILE_COORD]):
primary_storage_config: StorageConfig
replica_storage_configs: Sequence[StorageConfig] = Field(default_factory=tuple)

use_progress_tracker: bool = False

@computed_field
@property
def store_factory(self) -> StoreFactory:
Expand Down Expand Up @@ -169,18 +166,9 @@ def update(
icechunk_only=True,
)

progress_tracker = None
if self.use_progress_tracker:
progress_tracker = UpdateProgressTracker(
reformat_job_name,
job.region.start,
self.store_factory,
)

process_results = job.process(
primary_store=primary_store,
replica_stores=replica_stores,
progress_tracker=progress_tracker,
)
updated_template = job.update_template_with_results(process_results)
# overwrite the tmp store metadata with updated template
Expand All @@ -198,9 +186,6 @@ def update(
replica_stores,
)

if progress_tracker is not None:
progress_tracker.close()

log.info(
f"Operational update complete. Wrote to primary store: {self.store_factory.primary_store()} and replicas {self.store_factory.replica_stores()} replicas"
)
Expand Down Expand Up @@ -388,27 +373,14 @@ def process_backfill_region_jobs(

template_utils.write_metadata(region_job.template_ds, region_job.tmp_store)

progress_tracker = None
if self.use_progress_tracker:
progress_tracker = UpdateProgressTracker(
reformat_job_name,
region_job.region.start,
self.store_factory,
)

region_job.process(
primary_store, replica_stores, progress_tracker=progress_tracker
)
region_job.process(primary_store, replica_stores)

storage.commit_if_icechunk(
f"Backfill completed at {pd.Timestamp.now(tz='UTC').isoformat()}",
primary_store,
replica_stores,
)

if progress_tracker is not None:
progress_tracker.close()

def validate_dataset(
self,
reformat_job_name: Annotated[str, typer.Argument(envvar="JOB_NAME")],
Expand Down
17 changes: 1 addition & 16 deletions src/reformatters/common/region_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
Dim,
Timestamp,
)
from reformatters.common.update_progress_tracker import UpdateProgressTracker
from reformatters.common.zarr import copy_data_var

log = get_logger(__name__)
Expand Down Expand Up @@ -493,8 +492,6 @@ def process(
self,
primary_store: Store,
replica_stores: list[Store],
*,
progress_tracker: UpdateProgressTracker | None = None,
) -> Mapping[str, Sequence[SOURCE_FILE_COORD]]:
"""
Orchestrate the full region job processing pipeline.
Expand All @@ -516,13 +513,7 @@ def process(
"""
processing_region_ds, output_region_ds = self._get_region_datasets()

if progress_tracker is not None:
data_vars_to_process: Sequence[DATA_VAR] = progress_tracker.get_unprocessed(
self.data_vars
) # type: ignore[assignment]
data_var_groups = self.source_groups(data_vars_to_process)
else:
data_var_groups = self.source_groups(self.data_vars)
data_var_groups = self.source_groups(self.data_vars)
if self.max_vars_per_download_group is not None:
data_var_groups = self._maybe_split_groups(
data_var_groups, self.max_vars_per_download_group
Expand Down Expand Up @@ -585,11 +576,6 @@ def process(
write_executor,
)

def track_progress_callback(data_var: DATA_VAR = data_var) -> None:
if progress_tracker is None:
return
progress_tracker.record_completion(data_var.name)

upload_futures.append(
upload_executor.submit(
copy_data_var,
Expand All @@ -600,7 +586,6 @@ def track_progress_callback(data_var: DATA_VAR = data_var) -> None:
self.tmp_store,
primary_store,
replica_stores=replica_stores,
track_progress_callback=track_progress_callback,
)
)

Expand Down
102 changes: 0 additions & 102 deletions src/reformatters/common/update_progress_tracker.py

This file was deleted.

6 changes: 1 addition & 5 deletions src/reformatters/common/zarr.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from collections.abc import Callable, Iterable
from collections.abc import Iterable
from pathlib import Path

import xarray as xr
Expand Down Expand Up @@ -47,7 +47,6 @@ def copy_data_var(
tmp_store: Path,
primary_store: Store,
replica_stores: Iterable[Store] = (),
track_progress_callback: Callable[[], None] | None = None,
) -> None:
dim_index = template_ds[data_var_name].dims.index(append_dim)
append_dim_shard_size = template_ds[data_var_name].encoding["shards"][dim_index]
Expand All @@ -72,9 +71,6 @@ def copy_data_var(
f"Done copying data var chunks to primary store ({primary_store}) for {relative_dir}."
)

if track_progress_callback is not None:
track_progress_callback()

try:
# Delete data to free disk space.
for file in tmp_store.glob(f"{relative_dir}**/*"):
Expand Down
2 changes: 0 additions & 2 deletions src/reformatters/noaa/gefs/analysis/dynamical_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ class GefsAnalysisDataset(DynamicalDataset[GEFSDataVar, GefsAnalysisSourceFileCo
template_config: GefsAnalysisTemplateConfig = GefsAnalysisTemplateConfig()
region_job_class: type[GefsAnalysisRegionJob] = GefsAnalysisRegionJob

use_progress_tracker: bool = True

def operational_kubernetes_resources(self, image_tag: str) -> Sequence[CronJob]:
"""Return the kubernetes cron job definitions to operationally update and validate this dataset."""
operational_update_cron_job = ReformatCronJob(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ class GefsForecast35DayDataset(
template_config: GefsForecast35DayTemplateConfig = GefsForecast35DayTemplateConfig()
region_job_class: type[GefsForecast35DayRegionJob] = GefsForecast35DayRegionJob

use_progress_tracker: bool = True

def operational_kubernetes_resources(self, image_tag: str) -> Sequence[CronJob]:
"""Return the kubernetes cron job definitions to operationally update and validate this dataset."""
operational_update_cron_job = ReformatCronJob(
Expand Down
Loading