Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 26 additions & 8 deletions src/CSET/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ class ArgumentError(ValueError):
"""Provided arguments are not understood."""


def parse_recipe(recipe_yaml: Path | str, variables: dict | None = None) -> dict:
def parse_recipe(
recipe_yaml: Path | str, variables: dict | None = None, allow_missing: bool = False
) -> dict:
"""Parse a recipe into a python dictionary.

Parameters
Expand All @@ -42,6 +44,9 @@ def parse_recipe(recipe_yaml: Path | str, variables: dict | None = None) -> dict
read.
variables: dict
Dictionary of recipe variables. If None templating is not attempted.
allow_missing: bool
If True, do not raise an error if a variable in the recipe is missing
from the variables dictionary. Instead leave the placeholder in place.

Returns
-------
Expand Down Expand Up @@ -80,7 +85,7 @@ def parse_recipe(recipe_yaml: Path | str, variables: dict | None = None) -> dict

if variables is not None:
logging.debug("Recipe variables: %s", variables)
recipe = template_variables(recipe, variables)
recipe = template_variables(recipe, variables, allow_missing=allow_missing)

logging.debug("Recipe after templating:\n%s", recipe)
return recipe
Expand Down Expand Up @@ -195,7 +200,9 @@ def parse_variable_options(
return recipe_variables


def template_variables(recipe: dict | list, variables: dict) -> dict:
def template_variables(
recipe: dict | list, variables: dict, allow_missing: bool = False
) -> dict:
"""Insert variables into recipe.

Parameters
Expand All @@ -204,6 +211,9 @@ def template_variables(recipe: dict | list, variables: dict) -> dict:
The recipe as a python dictionary. It is updated in-place.
variables: dict
Dictionary of variables for the recipe.
allow_missing: bool
If True, do not raise an error if a variable in the recipe is missing
from the variables dictionary. Instead leave the placeholder in place.

Returns
-------
Expand All @@ -225,13 +235,19 @@ def template_variables(recipe: dict | list, variables: dict) -> dict:

for i in index:
if isinstance(recipe[i], (dict, list)):
recipe[i] = template_variables(recipe[i], variables)
recipe[i] = template_variables(
recipe[i], variables, allow_missing=allow_missing
)
elif isinstance(recipe[i], str):
recipe[i] = replace_template_variable(recipe[i], variables)
recipe[i] = replace_template_variable(
recipe[i], variables, allow_missing=allow_missing
)
return recipe


def replace_template_variable(s: str, variables: dict[str, Any]):
def replace_template_variable(
s: str, variables: dict[str, Any], allow_missing: bool = False
):
"""Fill all variable placeholders in the string."""
for var_name, var_value in variables.items():
placeholder = f"${var_name}"
Expand All @@ -253,8 +269,10 @@ def replace_template_variable(s: str, variables: dict[str, Any]):
break
else:
s = s.replace(placeholder, str(var_value))
if isinstance(s, str) and re.match(r"^.*\$[A-Z_].*", s):
raise KeyError("Variable without a value.", s)

if not allow_missing:
if isinstance(s, str) and re.match(r"^.*\$[A-Z_].*", s):
raise KeyError("Variable without a value.", s)
return s


Expand Down
73 changes: 66 additions & 7 deletions src/CSET/cset_workflow/app/parbake_recipes/bin/parbake.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,94 @@
"""Run a recipe with the CSET CLI."""

import json
import logging
import os
from base64 import b64decode
import subprocess
from argparse import ArgumentParser
from base64 import b64decode, b64encode
from collections import defaultdict
from pathlib import Path

from CSET import logger as _base_logger
from CSET.recipes import load_recipes

logger = _base_logger.getChild(__name__)
logger.setLevel(logging.DEBUG)

def parbake_all(variables: dict, rose_datac: Path, share_dir: Path, aggregation: bool):

def get_args():
"""Get command line arguments."""
parser = ArgumentParser(
description="Generate and parbake recipes from configuration."
)
parser.add_argument(
"--premix",
action="store_true",
help="Output a base64 encoded JSON list of all recipes that would be parbaked.",
)
return parser.parse_args()


def parbake_all(
variables: dict, rose_datac: Path, share_dir: Path, aggregation: bool
) -> int:
"""Generate and parbake recipes from configuration."""
# Gather all recipes into a big list.
recipes = list(load_recipes(variables))
# Check we have some recipes enabled.
if not recipes:
raise ValueError("At least one recipe should be enabled.")
# Parbake all recipes remaining after filtering aggregation recipes.
recipe_count = 0
for recipe in filter(lambda r: r.aggregation == aggregation, recipes):
print(f"Parbaking {recipe}", flush=True)
recipe.parbake(rose_datac, share_dir)
recipe_count += 1
return recipe_count


def main():
"""Program entry point."""
# Gather configuration from environment.
args = get_args()
variables = json.loads(b64decode(os.environ["ENCODED_ROSE_SUITE_VARIABLES"]))
rose_datac = Path(os.environ["ROSE_DATAC"])
share_dir = Path(os.environ["CYLC_WORKFLOW_SHARE_DIR"])
aggregation = bool(os.getenv("DO_CASE_AGGREGATION"))
# Parbake recipes for cycle.
parbake_all(variables, rose_datac, share_dir, aggregation)

if args.premix:
recipes = list(load_recipes(variables))

batter = defaultdict(list)
for recipe in recipes:
batter[str(recipe.recipe_subdir)].append(recipe.premixed_names())

if any(v for v in batter.values()):
jsonified = json.dumps(batter)
logger.debug("Premixed recipes: %s", jsonified)
encoded = b64encode(jsonified.encode()).decode()
print(encoded)
else:
raise ValueError("At least one recipe should be enabled.")
else:
rose_datac = Path(os.environ["ROSE_DATAC"])
share_dir = Path(os.environ["CYLC_WORKFLOW_SHARE_DIR"])
aggregation = bool(os.getenv("DO_CASE_AGGREGATION"))
# Parbake recipes for cycle.
recipe_count = parbake_all(variables, rose_datac, share_dir, aggregation)

# If running under cylc, notify cylc of task completion.
cylc_workflow_id = os.environ.get("CYLC_WORKFLOW_ID", None)
cylc_task_job = os.environ.get("CYLC_TASK_JOB", None)
if cylc_workflow_id and cylc_task_job:
message_command = [
"cylc",
"message",
"--",
cylc_workflow_id,
cylc_task_job,
]
if recipe_count > 0:
subprocess.run(message_command + ["start baking"])
else:
subprocess.run(message_command + ["skip baking"])


if __name__ == "__main__": # pragma: no cover
Expand Down
66 changes: 45 additions & 21 deletions src/CSET/cset_workflow/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ description = Workflow for running CSET.
URL = https://metoffice.github.io/CSET

# Import all of our Jinja utilities for use in the workflow.
{% from "jinja_utils" import b64json, get_models %}
{% from "jinja_utils" import b64json, get_models, mix_recipes %}
# Load a list a model detail dictionaries.
{% set models = get_models(ROSE_SUITE_VARIABLES) %}

{% set premixed_recipes = mix_recipes(ROSE_SUITE_VARIABLES) %}

[scheduling]
# Allow many concurrent cycles to maximise workflow parallelism.
Expand Down Expand Up @@ -39,21 +40,31 @@ final cycle point = {{CSET_TRIAL_END_DATE}}
{% for date in CSET_CASE_DATES %}
R1/{{date}} = """
setup_complete[^] => FETCH_DATA:succeed-all => fetch_complete
fetch_complete & parbake_recipes => bake_recipes => cycle_complete
fetch_complete & parbake_recipes:start_baking?

parbake_recipes:skip_baking? | bake_recipes? => cycle_complete
"""
{% endfor %}
{% elif CSET_CYCLING_MODE == "trial" %}
# Analysis from each forecast.
{{CSET_TRIAL_CYCLE_PERIOD}} = """
setup_complete[^] => FETCH_DATA:succeed-all => fetch_complete
fetch_complete & parbake_recipes => bake_recipes => cycle_complete
fetch_complete & parbake_recipes:start_baking? => bake_recipes?

parbake_recipes:skip_baking? | bake_recipes? => cycle_complete
"""
{% endif %}

# Only runs on the final cycle.
R1/$ = """

{% if premixed_recipes.get('aggregation_recipes', False) %}
# Run aggregation recipes.
fetch_complete & parbake_aggregation_recipes => bake_aggregation_recipes => cycle_complete
fetch_complete & parbake_aggregation_recipes:start_baking? => BAKE_AGGREGATION_RECIPES:succeed-all?

parbake_aggregation_recipes:skip_baking? | BAKE_AGGREGATION_RECIPES:succeed-all? => cycle_complete
{% endif %}

# Finalise website and cleanup.
cycle_complete => finish_website => send_email
cycle_complete => housekeeping
Expand Down Expand Up @@ -97,6 +108,18 @@ final cycle point = {{CSET_TRIAL_END_DATE}}
[[[environment]]]
ANALYSIS_LENGTH = {{ANALYSIS_LENGTH}}

[[PARBAKE]]
script = rose task-run -v --app-key=parbake_recipes
execution time limit = PT5M
[[[directives]]]
--ntasks=1
--mem=500
[[[environment]]]
ENCODED_ROSE_SUITE_VARIABLES = {{b64json(ROSE_SUITE_VARIABLES)}}
[[[outputs]]]
start_baking='start baking'
skip_baking='skip baking'

[[METPLUS]]
[[[environment]]]
{% if METPLUS_GRID_STAT|default(False) %}
Expand Down Expand Up @@ -153,23 +176,12 @@ final cycle point = {{CSET_TRIAL_END_DATE}}

[[parbake_recipes]]
# Parbake all the recipes for this cycle.
script = rose task-run -v --app-key=parbake_recipes
execution time limit = PT5M
[[[directives]]]
--ntasks=1
--mem=500
[[[environment]]]
ENCODED_ROSE_SUITE_VARIABLES = {{b64json(ROSE_SUITE_VARIABLES)}}
inherit=PARBAKE

[[parbake_aggregation_recipes]]
# Parbake all the aggregation recipes.
script = rose task-run -v --app-key=parbake_recipes
execution time limit = PT5M
[[[directives]]]
--ntasks=1
--mem=500
inherit=PARBAKE
[[[environment]]]
ENCODED_ROSE_SUITE_VARIABLES = {{b64json(ROSE_SUITE_VARIABLES)}}
DO_CASE_AGGREGATION = True

[[bake_recipes]]
Expand All @@ -181,16 +193,28 @@ final cycle point = {{CSET_TRIAL_END_DATE}}
--ntasks=32
--mem=64000

[[bake_aggregation_recipes]]
# Bake the parbaked aggregation recipes.
script = "$CYLC_WORKFLOW_RUN_DIR/app/bake_recipes/bin/baker.sh"
{% if premixed_recipes.get('aggregation_recipes', False) %}
[[BAKE_AGGREGATION_RECIPES]]
execution time limit = PT3H
execution retry delays = PT1M
[[[directives]]]
--ntasks=8
--ntasks=2
--mem=64000
[[[environment]]]
DO_CASE_AGGREGATION = True
ROSE_APP_OPT_CONF_KEYS = aggregation
ROSE_TASK_APP=bake_recipes
RECIPE_DIR=$CYLC_WORKFLOW_SHARE_DIR/cycle/$CYLC_TASK_CYCLE_POINT

{% for recipe_file, recipe_tasks in premixed_recipes['aggregation_recipes'] %}
{% for task in recipe_tasks %}
[[{{task}}]]
inherit=BAKE_AGGREGATION_RECIPES
[[[environment]]]
RECIPE_FILE={{recipe_file}}
{% endfor %}
{% endfor %}
{% endif %}

[[housekeeping]]
# Housekeep input data files.
Expand Down
71 changes: 71 additions & 0 deletions src/CSET/cset_workflow/lib/python/jinja_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@

import base64
import json
import logging
import os
import subprocess
from pathlib import Path
from typing import Any

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)


def get_models(rose_variables: dict) -> list[dict]:
Expand Down Expand Up @@ -46,3 +54,66 @@ def b64json(d: dict | list) -> str:
new_d.pop("ROSE_SUITE_VARIABLES", None)
output = base64.b64encode(json.dumps(new_d).encode()).decode()
return output


def mix_recipes(rose_variables: dict):
"""Load recipe and task names.

Loads recipe files and generates unique task names for them.
"""

def _is_True(val: Any) -> bool:
if isinstance(val, str):
return val.lower() in ["true"]
# use explicit comparison to handle jinja2
# true/false types
return val == True # noqa: E712

shell_commands = []

if _is_True(rose_variables.get("CSET_ENV_USE_MODULES", False)):
shell_commands.append("IFS_SAVE=$IFS; IFS=' '")
if _is_True(rose_variables.get("MODULES_PURGE", False)):
shell_commands.append("module purge")
for module in rose_variables.get("MODULES_LIST", []):
shell_commands.append(f"module load {module}")
shell_commands.append("IFS=$IFS_SAVE")

conda_path = Path(rose_variables.get("CONDA_PATH", ""))

workflow_dir = Path(os.getcwd())
linked_conda_environment = workflow_dir / "conda-environment"
mix_command = (
f"{workflow_dir / 'app' / 'parbake_recipes' / 'bin' / 'parbake.py'} --premix"
)
if linked_conda_environment.exists():
LOGGER.debug(f"Activating conda environment from {conda_path}")
shell_commands.append(
f"{conda_path / 'conda'} run --no-capture-output --prefix {linked_conda_environment} {mix_command}"
)
else:
try:
subprocess.run(["conda info --envs | grep -q '^cset-dev '"], check=True)
LOGGER.debug("Activating conda environment from cset-dev")
shell_commands.append(
f"{conda_path / 'conda'} run --no-capture-output --name cset-dev {mix_command}"
)
except subprocess.CalledProcessError:
LOGGER.debug(
"No conda environment to use. Attempting last-ditch attempt to run directly."
)
shell_commands.append(mix_command)

premixing_env = os.environ.copy()
premixing_env["ENCODED_ROSE_SUITE_VARIABLES"] = b64json(rose_variables)

p = subprocess.run(
" && ".join(shell_commands),
shell=True,
check=True,
stdout=subprocess.PIPE,
env=premixing_env,
)
results = json.loads(base64.b64decode(p.stdout))

return results
Loading