diff --git a/workflows/argo/prognostic-run.yaml b/workflows/argo/prognostic-run.yaml index 32849d8539..2fcfe69ead 100644 --- a/workflows/argo/prognostic-run.yaml +++ b/workflows/argo/prognostic-run.yaml @@ -28,6 +28,7 @@ spec: - {name: memory, value: 6Gi} - {name: online-diags-flags, value: " "} - {name: online-diags, value: "true"} + - {name: movie-flags, value: " "} steps: - - name: resolve-output-url templateRef: @@ -70,6 +71,8 @@ spec: value: "{{steps.resolve-output-url.outputs.result}}/fv3gfs_run_diagnostics" - name: flags value: "{{inputs.parameters.online-diags-flags}}" + - name: movie-flags + value: "{{inputs.parameters.movie-flags}}" - - name: online-diags-report when: "{{inputs.parameters.online-diags}} == true" templateRef: diff --git a/workflows/argo/prognostic_run_diags.yaml b/workflows/argo/prognostic_run_diags.yaml index 8a8b8b99fc..c29a8b16ee 100644 --- a/workflows/argo/prognostic_run_diags.yaml +++ b/workflows/argo/prognostic_run_diags.yaml @@ -10,6 +10,8 @@ spec: value: "false" - name: flags value: " " + - name: movie-flags + value: " " entrypoint: all volumes: - name: gcp-key-secret @@ -30,6 +32,8 @@ spec: value: "{{item.url}}_diagnostics" - name: flags value: "{{workflow.parameters.flags}}" + - name: movie-flags + value: "{{workflow.parameters.movie-flags}}" - - name: generate-report template: report arguments: @@ -101,6 +105,8 @@ spec: - name: output - name: flags value: " " + - name: movie-flags + value: " " dag: tasks: - name: movies @@ -109,6 +115,7 @@ spec: parameters: - {name: run, value: "{{inputs.parameters.run}}"} - {name: output, value: "{{inputs.parameters.output}}"} + - {name: movie-flags, value: "{{inputs.parameters.movie-flags}}"} - name: compute-reduced-diagnostics template: compute-reduced-diagnostics arguments: @@ -167,6 +174,7 @@ spec: parameters: - name: run - name: output + - {name: movie-flags, value: " "} tolerations: - key: "dedicated" operator: "Equal" @@ -184,7 +192,7 @@ spec: command: ["/bin/bash", "-x", "-e", "-c"] args: - | - prognostic_run_diags movie --n_timesteps 960 --n_jobs 26 {{inputs.parameters.run}} {{inputs.parameters.output}} + prognostic_run_diags movie --n_timesteps 960 --n_jobs 26 {{inputs.parameters.run}} {{inputs.parameters.output}} {{inputs.parameters.movie-flags}} workingDir: /home/jovyan/fv3net/workflows/diagnostics env: - name: GOOGLE_APPLICATION_CREDENTIALS diff --git a/workflows/diagnostics/fv3net/diagnostics/prognostic_run/compute.py b/workflows/diagnostics/fv3net/diagnostics/prognostic_run/compute.py index 2cfcba6e93..1f7bd3c2ca 100644 --- a/workflows/diagnostics/fv3net/diagnostics/prognostic_run/compute.py +++ b/workflows/diagnostics/fv3net/diagnostics/prognostic_run/compute.py @@ -619,14 +619,30 @@ def register_parser(subparsers): "access data concurrently.", default=-1, ) + parser.add_argument( + "--evaluation-grid", + type=str, + help="Grid upon which to evaluate prognostic run diagnostics", + default="c48", + ) parser.set_defaults(func=main) -def get_verification(args, catalog, join_2d="outer"): +def get_verification(args, catalog, join_2d="outer", evaluation_grid="c48"): if args.verification_url: - return load_diags.SegmentedRun(args.verification_url, catalog, join_2d=join_2d) + return load_diags.SegmentedRun( + args.verification_url, + catalog, + join_2d=join_2d, + evaluation_grid=evaluation_grid, + ) else: - return load_diags.CatalogSimulation(args.verification, catalog, join_2d=join_2d) + return load_diags.CatalogSimulation( + args.verification, + catalog, + join_2d=join_2d, + evaluation_grid=evaluation_grid, + ) def main(args): @@ -638,11 +654,13 @@ def main(args): # begin constructing diags diags = {} catalog = intake.open_catalog(args.catalog) - prognostic = load_diags.SegmentedRun(args.url, catalog) - verification = get_verification(args, catalog) + prognostic = load_diags.SegmentedRun( + args.url, catalog, evaluation_grid=args.evaluation_grid + ) + verification = get_verification(args, catalog, evaluation_grid=args.evaluation_grid) attrs["verification"] = str(verification) - grid = load_diags.load_grid(catalog) + grid = load_diags.load_grid(catalog, args.evaluation_grid) input_data = load_diags.evaluation_pair_to_input_data( prognostic, verification, grid ) diff --git a/workflows/diagnostics/fv3net/diagnostics/prognostic_run/config.py b/workflows/diagnostics/fv3net/diagnostics/prognostic_run/config.py index 8451c30ef9..2fc32c44d6 100644 --- a/workflows/diagnostics/fv3net/diagnostics/prognostic_run/config.py +++ b/workflows/diagnostics/fv3net/diagnostics/prognostic_run/config.py @@ -3,13 +3,14 @@ def get_verification_entries( - name: str, catalog: intake.catalog.Catalog + name: str, catalog: intake.catalog.Catalog, evaluation_grid="c48" ) -> Mapping[str, List[str]]: """Given simulation name, return catalog keys for c48 dycore and physics data. Args: name: Simulation to use for verification. catalog: Catalog to search for verification data. + evaluation_grid: Grid upon which to compute diagnostics Returns: Mapping from category name ('physics', 'dycore', or '3d') to sequence @@ -22,13 +23,14 @@ def get_verification_entries( item_grid = metadata.get("grid", None) item_category = metadata.get("category", None) - if item_simulation == name and item_grid == "c48": + if item_simulation == name and item_grid == evaluation_grid: if item_category is not None: entries[item_category].append(item) if len(entries["2d"]) == 0: raise ValueError( - f"No c48 2d diagnostics found in catalog for simulation {name}." + f"No {evaluation_grid} 2d diagnostics found in catalog for " + f"simulation {name}." ) return entries diff --git a/workflows/diagnostics/fv3net/diagnostics/prognostic_run/load_run_data.py b/workflows/diagnostics/fv3net/diagnostics/prognostic_run/load_run_data.py index 38e1560b98..78b63d9f2c 100644 --- a/workflows/diagnostics/fv3net/diagnostics/prognostic_run/load_run_data.py +++ b/workflows/diagnostics/fv3net/diagnostics/prognostic_run/load_run_data.py @@ -54,7 +54,7 @@ def _load_standardized(path): def _get_area(ds: xr.Dataset, catalog: intake.catalog.Catalog) -> xr.DataArray: - grid_entries = {48: "grid/c48", 96: "grid/c96", 384: "grid/c384"} + grid_entries = {12: "grid/c12", 48: "grid/c48", 96: "grid/c96", 384: "grid/c384"} input_res = ds.sizes["x"] if input_res not in grid_entries: raise KeyError(f"No grid defined in catalog for c{input_res} resolution") @@ -80,7 +80,9 @@ def _coarsen_cell_centered_to_target_resolution( ) -def _load_3d(url: str, catalog: intake.catalog.Catalog) -> xr.Dataset: +def _load_3d( + url: str, catalog: intake.catalog.Catalog, evaluation_grid: str +) -> xr.Dataset: logger.info(f"Processing 3d data from run directory at {url}") files_3d = [ "diags_3d.zarr", @@ -90,7 +92,7 @@ def _load_3d(url: str, catalog: intake.catalog.Catalog) -> xr.Dataset: ] ds = xr.merge( [ - load_coarse_data(os.path.join(url, filename), catalog) + load_coarse_data(os.path.join(url, filename), catalog, evaluation_grid) for filename in files_3d ] ) @@ -106,16 +108,20 @@ def _load_3d(url: str, catalog: intake.catalog.Catalog) -> xr.Dataset: return ds_interp -def load_grid(catalog): +def load_grid(catalog, evaluation_grid="c48"): logger.info("Opening Grid Spec") - grid_c48 = standardize_fv3_diagnostics(catalog["grid/c48"].to_dask()) - ls_mask = standardize_fv3_diagnostics(catalog["landseamask/c48"].to_dask()) + grid_c48 = standardize_fv3_diagnostics(catalog[f"grid/{evaluation_grid}"].to_dask()) + ls_mask = standardize_fv3_diagnostics( + catalog[f"landseamask/{evaluation_grid}"].to_dask() + ) return xr.merge([grid_c48, ls_mask]) -def load_coarse_data(path, catalog) -> xr.Dataset: +def load_coarse_data(path, catalog, evaluation_grid="c48") -> xr.Dataset: logger.info(f"Opening prognostic run data at {path}") + target_resolution = int(evaluation_grid[1:]) + try: ds = _load_standardized(path) except (FileNotFoundError, KeyError): @@ -131,7 +137,7 @@ def load_coarse_data(path, catalog) -> xr.Dataset: errors="ignore", ) ds = _coarsen_cell_centered_to_target_resolution( - ds, target_resolution=48, catalog=catalog + ds, target_resolution=target_resolution, catalog=catalog ) return ds @@ -189,10 +195,13 @@ class CatalogSimulation: tag: str catalog: intake.catalog.base.Catalog join_2d: str = "outer" + evaluation_grid: str = "c48" @property def _verif_entries(self): - return config.get_verification_entries(self.tag, self.catalog) + return config.get_verification_entries( + self.tag, self.catalog, self.evaluation_grid + ) @property def _rename_map(self): @@ -219,29 +228,31 @@ class SegmentedRun: url: str catalog: intake.catalog.base.Catalog join_2d: str = "outer" + evaluation_grid: str = "c48" @property def data_2d(self) -> xr.Dataset: url = self.url catalog = self.catalog + evaluation_grid = self.evaluation_grid path = os.path.join(url, "atmos_dt_atmos.zarr") diags_url = os.path.join(url, "diags.zarr") sfc_dt_atmos_url = os.path.join(url, "sfc_dt_atmos.zarr") return xr.merge( [ - load_coarse_data(path, catalog), + load_coarse_data(path, catalog, evaluation_grid), # TODO fillna required because diags.zarr may be saved with an # incorrect fill_value. not sure if this is fixed or not. - load_coarse_data(diags_url, catalog).fillna(0.0), - load_coarse_data(sfc_dt_atmos_url, catalog), + load_coarse_data(diags_url, catalog, evaluation_grid).fillna(0.0), + load_coarse_data(sfc_dt_atmos_url, catalog, evaluation_grid), ], join=self.join_2d, ) @property def data_3d(self) -> xr.Dataset: - return _load_3d(self.url, self.catalog) + return _load_3d(self.url, self.catalog, self.evaluation_grid) @property def artifacts(self) -> List[str]: diff --git a/workflows/diagnostics/fv3net/diagnostics/prognostic_run/views/movies.py b/workflows/diagnostics/fv3net/diagnostics/prognostic_run/views/movies.py index dca3d2175b..b52ae9137e 100644 --- a/workflows/diagnostics/fv3net/diagnostics/prognostic_run/views/movies.py +++ b/workflows/diagnostics/fv3net/diagnostics/prognostic_run/views/movies.py @@ -211,6 +211,12 @@ def register_parser(subparsers): "If false, generate the movie of n_timesteps at the start of the run. " ), ) + parser.add_argument( + "--evaluation-grid", + type=str, + help="Grid upon which to evaluate prognostic run diagnostics", + default="c48", + ) add_catalog_and_verification_arguments(parser) parser.set_defaults(func=main) @@ -242,12 +248,16 @@ def main(args): os.makedirs(args.output, exist_ok=True) catalog = intake.open_catalog(args.catalog) - grid = load_diags.load_grid(catalog) + grid = load_diags.load_grid(catalog, evaluation_grid=args.evaluation_grid) prognostic = derived_variables.derive_2d_variables( - load_diags.SegmentedRun(args.url, catalog).data_2d + load_diags.SegmentedRun( + args.url, catalog, evaluation_grid=args.evaluation_grid + ).data_2d ) verification = derived_variables.derive_2d_variables( - get_verification(args, catalog, join_2d="inner").data_2d + get_verification( + args, catalog, join_2d="inner", evaluation_grid=args.evaluation_grid, + ).data_2d ) # crashed prognostic runs have bad grid vars, so use grid from catalog instead prognostic = (