Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions workflows/argo/prognostic-run.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ spec:
- {name: memory, value: 6Gi}
- {name: online-diags-flags, value: " "}
- {name: online-diags, value: "true"}
- {name: movie-flags, value: " "}
steps:
- - name: resolve-output-url
templateRef:
Expand Down Expand Up @@ -70,6 +71,8 @@ spec:
value: "{{steps.resolve-output-url.outputs.result}}/fv3gfs_run_diagnostics"
- name: flags
value: "{{inputs.parameters.online-diags-flags}}"
- name: movie-flags
value: "{{inputs.parameters.movie-flags}}"
- - name: online-diags-report
when: "{{inputs.parameters.online-diags}} == true"
templateRef:
Expand Down
10 changes: 9 additions & 1 deletion workflows/argo/prognostic_run_diags.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ spec:
value: "false"
- name: flags
value: " "
- name: movie-flags
value: " "
entrypoint: all
volumes:
- name: gcp-key-secret
Expand All @@ -30,6 +32,8 @@ spec:
value: "{{item.url}}_diagnostics"
- name: flags
value: "{{workflow.parameters.flags}}"
- name: movie-flags
value: "{{workflow.parameters.movie-flags}}"
- - name: generate-report
template: report
arguments:
Expand Down Expand Up @@ -101,6 +105,8 @@ spec:
- name: output
- name: flags
value: " "
- name: movie-flags
value: " "
dag:
tasks:
- name: movies
Expand All @@ -109,6 +115,7 @@ spec:
parameters:
- {name: run, value: "{{inputs.parameters.run}}"}
- {name: output, value: "{{inputs.parameters.output}}"}
- {name: movie-flags, value: "{{inputs.parameters.movie-flags}}"}
- name: compute-reduced-diagnostics
template: compute-reduced-diagnostics
arguments:
Expand Down Expand Up @@ -167,6 +174,7 @@ spec:
parameters:
- name: run
- name: output
- {name: movie-flags, value: " "}
tolerations:
- key: "dedicated"
operator: "Equal"
Expand All @@ -184,7 +192,7 @@ spec:
command: ["/bin/bash", "-x", "-e", "-c"]
args:
- |
prognostic_run_diags movie --n_timesteps 960 --n_jobs 26 {{inputs.parameters.run}} {{inputs.parameters.output}}
prognostic_run_diags movie --n_timesteps 960 --n_jobs 26 {{inputs.parameters.run}} {{inputs.parameters.output}} {{inputs.parameters.movie-flags}}
workingDir: /home/jovyan/fv3net/workflows/diagnostics
env:
- name: GOOGLE_APPLICATION_CREDENTIALS
Expand Down
30 changes: 24 additions & 6 deletions workflows/diagnostics/fv3net/diagnostics/prognostic_run/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,14 +619,30 @@ def register_parser(subparsers):
"access data concurrently.",
default=-1,
)
parser.add_argument(
"--evaluation-grid",
type=str,
help="Grid upon which to evaluate prognostic run diagnostics",
default="c48",
)
parser.set_defaults(func=main)


def get_verification(args, catalog, join_2d="outer"):
def get_verification(args, catalog, join_2d="outer", evaluation_grid="c48"):
if args.verification_url:
return load_diags.SegmentedRun(args.verification_url, catalog, join_2d=join_2d)
return load_diags.SegmentedRun(
args.verification_url,
catalog,
join_2d=join_2d,
evaluation_grid=evaluation_grid,
)
else:
return load_diags.CatalogSimulation(args.verification, catalog, join_2d=join_2d)
return load_diags.CatalogSimulation(
args.verification,
catalog,
join_2d=join_2d,
evaluation_grid=evaluation_grid,
)


def main(args):
Expand All @@ -638,11 +654,13 @@ def main(args):
# begin constructing diags
diags = {}
catalog = intake.open_catalog(args.catalog)
prognostic = load_diags.SegmentedRun(args.url, catalog)
verification = get_verification(args, catalog)
prognostic = load_diags.SegmentedRun(
args.url, catalog, evaluation_grid=args.evaluation_grid
)
verification = get_verification(args, catalog, evaluation_grid=args.evaluation_grid)
attrs["verification"] = str(verification)

grid = load_diags.load_grid(catalog)
grid = load_diags.load_grid(catalog, args.evaluation_grid)
input_data = load_diags.evaluation_pair_to_input_data(
prognostic, verification, grid
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@


def get_verification_entries(
name: str, catalog: intake.catalog.Catalog
name: str, catalog: intake.catalog.Catalog, evaluation_grid="c48"
) -> Mapping[str, List[str]]:
"""Given simulation name, return catalog keys for c48 dycore and physics data.

Args:
name: Simulation to use for verification.
catalog: Catalog to search for verification data.
evaluation_grid: Grid upon which to compute diagnostics

Returns:
Mapping from category name ('physics', 'dycore', or '3d') to sequence
Expand All @@ -22,13 +23,14 @@ def get_verification_entries(
item_grid = metadata.get("grid", None)
item_category = metadata.get("category", None)

if item_simulation == name and item_grid == "c48":
if item_simulation == name and item_grid == evaluation_grid:
if item_category is not None:
entries[item_category].append(item)

if len(entries["2d"]) == 0:
raise ValueError(
f"No c48 2d diagnostics found in catalog for simulation {name}."
f"No {evaluation_grid} 2d diagnostics found in catalog for "
f"simulation {name}."
)

return entries
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def _load_standardized(path):


def _get_area(ds: xr.Dataset, catalog: intake.catalog.Catalog) -> xr.DataArray:
grid_entries = {48: "grid/c48", 96: "grid/c96", 384: "grid/c384"}
grid_entries = {12: "grid/c12", 48: "grid/c48", 96: "grid/c96", 384: "grid/c384"}
input_res = ds.sizes["x"]
if input_res not in grid_entries:
raise KeyError(f"No grid defined in catalog for c{input_res} resolution")
Expand All @@ -80,7 +80,9 @@ def _coarsen_cell_centered_to_target_resolution(
)


def _load_3d(url: str, catalog: intake.catalog.Catalog) -> xr.Dataset:
def _load_3d(
url: str, catalog: intake.catalog.Catalog, evaluation_grid: str
) -> xr.Dataset:
logger.info(f"Processing 3d data from run directory at {url}")
files_3d = [
"diags_3d.zarr",
Expand All @@ -90,7 +92,7 @@ def _load_3d(url: str, catalog: intake.catalog.Catalog) -> xr.Dataset:
]
ds = xr.merge(
[
load_coarse_data(os.path.join(url, filename), catalog)
load_coarse_data(os.path.join(url, filename), catalog, evaluation_grid)
for filename in files_3d
]
)
Expand All @@ -106,16 +108,20 @@ def _load_3d(url: str, catalog: intake.catalog.Catalog) -> xr.Dataset:
return ds_interp


def load_grid(catalog):
def load_grid(catalog, evaluation_grid="c48"):
logger.info("Opening Grid Spec")
grid_c48 = standardize_fv3_diagnostics(catalog["grid/c48"].to_dask())
ls_mask = standardize_fv3_diagnostics(catalog["landseamask/c48"].to_dask())
grid_c48 = standardize_fv3_diagnostics(catalog[f"grid/{evaluation_grid}"].to_dask())
ls_mask = standardize_fv3_diagnostics(
catalog[f"landseamask/{evaluation_grid}"].to_dask()
)
return xr.merge([grid_c48, ls_mask])


def load_coarse_data(path, catalog) -> xr.Dataset:
def load_coarse_data(path, catalog, evaluation_grid="c48") -> xr.Dataset:
logger.info(f"Opening prognostic run data at {path}")

target_resolution = int(evaluation_grid[1:])

try:
ds = _load_standardized(path)
except (FileNotFoundError, KeyError):
Expand All @@ -131,7 +137,7 @@ def load_coarse_data(path, catalog) -> xr.Dataset:
errors="ignore",
)
ds = _coarsen_cell_centered_to_target_resolution(
ds, target_resolution=48, catalog=catalog
ds, target_resolution=target_resolution, catalog=catalog
)

return ds
Expand Down Expand Up @@ -189,10 +195,13 @@ class CatalogSimulation:
tag: str
catalog: intake.catalog.base.Catalog
join_2d: str = "outer"
evaluation_grid: str = "c48"

@property
def _verif_entries(self):
return config.get_verification_entries(self.tag, self.catalog)
return config.get_verification_entries(
self.tag, self.catalog, self.evaluation_grid
)

@property
def _rename_map(self):
Expand All @@ -219,29 +228,31 @@ class SegmentedRun:
url: str
catalog: intake.catalog.base.Catalog
join_2d: str = "outer"
evaluation_grid: str = "c48"

@property
def data_2d(self) -> xr.Dataset:
url = self.url
catalog = self.catalog
evaluation_grid = self.evaluation_grid
path = os.path.join(url, "atmos_dt_atmos.zarr")
diags_url = os.path.join(url, "diags.zarr")
sfc_dt_atmos_url = os.path.join(url, "sfc_dt_atmos.zarr")

return xr.merge(
[
load_coarse_data(path, catalog),
load_coarse_data(path, catalog, evaluation_grid),
# TODO fillna required because diags.zarr may be saved with an
# incorrect fill_value. not sure if this is fixed or not.
load_coarse_data(diags_url, catalog).fillna(0.0),
load_coarse_data(sfc_dt_atmos_url, catalog),
load_coarse_data(diags_url, catalog, evaluation_grid).fillna(0.0),
load_coarse_data(sfc_dt_atmos_url, catalog, evaluation_grid),
],
join=self.join_2d,
)

@property
def data_3d(self) -> xr.Dataset:
return _load_3d(self.url, self.catalog)
return _load_3d(self.url, self.catalog, self.evaluation_grid)

@property
def artifacts(self) -> List[str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,12 @@ def register_parser(subparsers):
"If false, generate the movie of n_timesteps at the start of the run. "
),
)
parser.add_argument(
"--evaluation-grid",
type=str,
help="Grid upon which to evaluate prognostic run diagnostics",
default="c48",
)
add_catalog_and_verification_arguments(parser)
parser.set_defaults(func=main)

Expand Down Expand Up @@ -242,12 +248,16 @@ def main(args):
os.makedirs(args.output, exist_ok=True)

catalog = intake.open_catalog(args.catalog)
grid = load_diags.load_grid(catalog)
grid = load_diags.load_grid(catalog, evaluation_grid=args.evaluation_grid)
prognostic = derived_variables.derive_2d_variables(
load_diags.SegmentedRun(args.url, catalog).data_2d
load_diags.SegmentedRun(
args.url, catalog, evaluation_grid=args.evaluation_grid
).data_2d
)
verification = derived_variables.derive_2d_variables(
get_verification(args, catalog, join_2d="inner").data_2d
get_verification(
args, catalog, join_2d="inner", evaluation_grid=args.evaluation_grid,
).data_2d
)
# crashed prognostic runs have bad grid vars, so use grid from catalog instead
prognostic = (
Expand Down