diff --git a/src/CSET/_common.py b/src/CSET/_common.py index 12b01a387..3df98202d 100644 --- a/src/CSET/_common.py +++ b/src/CSET/_common.py @@ -31,7 +31,9 @@ class ArgumentError(ValueError): """Provided arguments are not understood.""" -def parse_recipe(recipe_yaml: Path | str, variables: dict | None = None) -> dict: +def parse_recipe( + recipe_yaml: Path | str, variables: dict | None = None, allow_missing: bool = False +) -> dict: """Parse a recipe into a python dictionary. Parameters @@ -42,6 +44,9 @@ def parse_recipe(recipe_yaml: Path | str, variables: dict | None = None) -> dict read. variables: dict Dictionary of recipe variables. If None templating is not attempted. + allow_missing: bool + If True, do not raise an error if a variable in the recipe is missing + from the variables dictionary. Instead leave the placeholder in place. Returns ------- @@ -80,7 +85,7 @@ def parse_recipe(recipe_yaml: Path | str, variables: dict | None = None) -> dict if variables is not None: logging.debug("Recipe variables: %s", variables) - recipe = template_variables(recipe, variables) + recipe = template_variables(recipe, variables, allow_missing=allow_missing) logging.debug("Recipe after templating:\n%s", recipe) return recipe @@ -195,7 +200,9 @@ def parse_variable_options( return recipe_variables -def template_variables(recipe: dict | list, variables: dict) -> dict: +def template_variables( + recipe: dict | list, variables: dict, allow_missing: bool = False +) -> dict: """Insert variables into recipe. Parameters @@ -204,6 +211,9 @@ def template_variables(recipe: dict | list, variables: dict) -> dict: The recipe as a python dictionary. It is updated in-place. variables: dict Dictionary of variables for the recipe. + allow_missing: bool + If True, do not raise an error if a variable in the recipe is missing + from the variables dictionary. Instead leave the placeholder in place. Returns ------- @@ -225,13 +235,19 @@ def template_variables(recipe: dict | list, variables: dict) -> dict: for i in index: if isinstance(recipe[i], (dict, list)): - recipe[i] = template_variables(recipe[i], variables) + recipe[i] = template_variables( + recipe[i], variables, allow_missing=allow_missing + ) elif isinstance(recipe[i], str): - recipe[i] = replace_template_variable(recipe[i], variables) + recipe[i] = replace_template_variable( + recipe[i], variables, allow_missing=allow_missing + ) return recipe -def replace_template_variable(s: str, variables: dict[str, Any]): +def replace_template_variable( + s: str, variables: dict[str, Any], allow_missing: bool = False +): """Fill all variable placeholders in the string.""" for var_name, var_value in variables.items(): placeholder = f"${var_name}" @@ -253,8 +269,10 @@ def replace_template_variable(s: str, variables: dict[str, Any]): break else: s = s.replace(placeholder, str(var_value)) - if isinstance(s, str) and re.match(r"^.*\$[A-Z_].*", s): - raise KeyError("Variable without a value.", s) + + if not allow_missing: + if isinstance(s, str) and re.match(r"^.*\$[A-Z_].*", s): + raise KeyError("Variable without a value.", s) return s diff --git a/src/CSET/cset_workflow/app/parbake_recipes/bin/parbake.py b/src/CSET/cset_workflow/app/parbake_recipes/bin/parbake.py index 565901a27..ee7698e4b 100755 --- a/src/CSET/cset_workflow/app/parbake_recipes/bin/parbake.py +++ b/src/CSET/cset_workflow/app/parbake_recipes/bin/parbake.py @@ -16,14 +16,37 @@ """Run a recipe with the CSET CLI.""" import json +import logging import os -from base64 import b64decode +import subprocess +from argparse import ArgumentParser +from base64 import b64decode, b64encode +from collections import defaultdict from pathlib import Path +from CSET import logger as _base_logger from CSET.recipes import load_recipes +logger = _base_logger.getChild(__name__) +logger.setLevel(logging.DEBUG) -def parbake_all(variables: dict, rose_datac: Path, share_dir: Path, aggregation: bool): + +def get_args(): + """Get command line arguments.""" + parser = ArgumentParser( + description="Generate and parbake recipes from configuration." + ) + parser.add_argument( + "--premix", + action="store_true", + help="Output a base64 encoded JSON list of all recipes that would be parbaked.", + ) + return parser.parse_args() + + +def parbake_all( + variables: dict, rose_datac: Path, share_dir: Path, aggregation: bool +) -> int: """Generate and parbake recipes from configuration.""" # Gather all recipes into a big list. recipes = list(load_recipes(variables)) @@ -31,20 +54,56 @@ def parbake_all(variables: dict, rose_datac: Path, share_dir: Path, aggregation: if not recipes: raise ValueError("At least one recipe should be enabled.") # Parbake all recipes remaining after filtering aggregation recipes. + recipe_count = 0 for recipe in filter(lambda r: r.aggregation == aggregation, recipes): print(f"Parbaking {recipe}", flush=True) recipe.parbake(rose_datac, share_dir) + recipe_count += 1 + return recipe_count def main(): """Program entry point.""" # Gather configuration from environment. + args = get_args() variables = json.loads(b64decode(os.environ["ENCODED_ROSE_SUITE_VARIABLES"])) - rose_datac = Path(os.environ["ROSE_DATAC"]) - share_dir = Path(os.environ["CYLC_WORKFLOW_SHARE_DIR"]) - aggregation = bool(os.getenv("DO_CASE_AGGREGATION")) - # Parbake recipes for cycle. - parbake_all(variables, rose_datac, share_dir, aggregation) + + if args.premix: + recipes = list(load_recipes(variables)) + + batter = defaultdict(list) + for recipe in recipes: + batter[str(recipe.recipe_subdir)].append(recipe.premixed_names()) + + if any(v for v in batter.values()): + jsonified = json.dumps(batter) + logger.debug("Premixed recipes: %s", jsonified) + encoded = b64encode(jsonified.encode()).decode() + print(encoded) + else: + raise ValueError("At least one recipe should be enabled.") + else: + rose_datac = Path(os.environ["ROSE_DATAC"]) + share_dir = Path(os.environ["CYLC_WORKFLOW_SHARE_DIR"]) + aggregation = bool(os.getenv("DO_CASE_AGGREGATION")) + # Parbake recipes for cycle. + recipe_count = parbake_all(variables, rose_datac, share_dir, aggregation) + + # If running under cylc, notify cylc of task completion. + cylc_workflow_id = os.environ.get("CYLC_WORKFLOW_ID", None) + cylc_task_job = os.environ.get("CYLC_TASK_JOB", None) + if cylc_workflow_id and cylc_task_job: + message_command = [ + "cylc", + "message", + "--", + cylc_workflow_id, + cylc_task_job, + ] + if recipe_count > 0: + subprocess.run(message_command + ["start baking"]) + else: + subprocess.run(message_command + ["skip baking"]) if __name__ == "__main__": # pragma: no cover diff --git a/src/CSET/cset_workflow/flow.cylc b/src/CSET/cset_workflow/flow.cylc index ecfd22104..e3c9edaa0 100644 --- a/src/CSET/cset_workflow/flow.cylc +++ b/src/CSET/cset_workflow/flow.cylc @@ -5,10 +5,11 @@ description = Workflow for running CSET. URL = https://metoffice.github.io/CSET # Import all of our Jinja utilities for use in the workflow. -{% from "jinja_utils" import b64json, get_models %} +{% from "jinja_utils" import b64json, get_models, mix_recipes %} # Load a list a model detail dictionaries. {% set models = get_models(ROSE_SUITE_VARIABLES) %} +{% set premixed_recipes = mix_recipes(ROSE_SUITE_VARIABLES) %} [scheduling] # Allow many concurrent cycles to maximise workflow parallelism. @@ -39,21 +40,31 @@ final cycle point = {{CSET_TRIAL_END_DATE}} {% for date in CSET_CASE_DATES %} R1/{{date}} = """ setup_complete[^] => FETCH_DATA:succeed-all => fetch_complete - fetch_complete & parbake_recipes => bake_recipes => cycle_complete + fetch_complete & parbake_recipes:start_baking? + + parbake_recipes:skip_baking? | bake_recipes? => cycle_complete """ {% endfor %} {% elif CSET_CYCLING_MODE == "trial" %} # Analysis from each forecast. {{CSET_TRIAL_CYCLE_PERIOD}} = """ setup_complete[^] => FETCH_DATA:succeed-all => fetch_complete - fetch_complete & parbake_recipes => bake_recipes => cycle_complete + fetch_complete & parbake_recipes:start_baking? => bake_recipes? + + parbake_recipes:skip_baking? | bake_recipes? => cycle_complete """ {% endif %} # Only runs on the final cycle. R1/$ = """ + + {% if premixed_recipes.get('aggregation_recipes', False) %} # Run aggregation recipes. - fetch_complete & parbake_aggregation_recipes => bake_aggregation_recipes => cycle_complete + fetch_complete & parbake_aggregation_recipes:start_baking? => BAKE_AGGREGATION_RECIPES:succeed-all? + + parbake_aggregation_recipes:skip_baking? | BAKE_AGGREGATION_RECIPES:succeed-all? => cycle_complete + {% endif %} + # Finalise website and cleanup. cycle_complete => finish_website => send_email cycle_complete => housekeeping @@ -97,6 +108,18 @@ final cycle point = {{CSET_TRIAL_END_DATE}} [[[environment]]] ANALYSIS_LENGTH = {{ANALYSIS_LENGTH}} + [[PARBAKE]] + script = rose task-run -v --app-key=parbake_recipes + execution time limit = PT5M + [[[directives]]] + --ntasks=1 + --mem=500 + [[[environment]]] + ENCODED_ROSE_SUITE_VARIABLES = {{b64json(ROSE_SUITE_VARIABLES)}} + [[[outputs]]] + start_baking='start baking' + skip_baking='skip baking' + [[METPLUS]] [[[environment]]] {% if METPLUS_GRID_STAT|default(False) %} @@ -153,23 +176,12 @@ final cycle point = {{CSET_TRIAL_END_DATE}} [[parbake_recipes]] # Parbake all the recipes for this cycle. - script = rose task-run -v --app-key=parbake_recipes - execution time limit = PT5M - [[[directives]]] - --ntasks=1 - --mem=500 - [[[environment]]] - ENCODED_ROSE_SUITE_VARIABLES = {{b64json(ROSE_SUITE_VARIABLES)}} + inherit=PARBAKE [[parbake_aggregation_recipes]] # Parbake all the aggregation recipes. - script = rose task-run -v --app-key=parbake_recipes - execution time limit = PT5M - [[[directives]]] - --ntasks=1 - --mem=500 + inherit=PARBAKE [[[environment]]] - ENCODED_ROSE_SUITE_VARIABLES = {{b64json(ROSE_SUITE_VARIABLES)}} DO_CASE_AGGREGATION = True [[bake_recipes]] @@ -181,16 +193,28 @@ final cycle point = {{CSET_TRIAL_END_DATE}} --ntasks=32 --mem=64000 - [[bake_aggregation_recipes]] - # Bake the parbaked aggregation recipes. - script = "$CYLC_WORKFLOW_RUN_DIR/app/bake_recipes/bin/baker.sh" + {% if premixed_recipes.get('aggregation_recipes', False) %} + [[BAKE_AGGREGATION_RECIPES]] execution time limit = PT3H execution retry delays = PT1M [[[directives]]] - --ntasks=8 + --ntasks=2 --mem=64000 [[[environment]]] DO_CASE_AGGREGATION = True + ROSE_APP_OPT_CONF_KEYS = aggregation + ROSE_TASK_APP=bake_recipes + RECIPE_DIR=$CYLC_WORKFLOW_SHARE_DIR/cycle/$CYLC_TASK_CYCLE_POINT + + {% for recipe_file, recipe_tasks in premixed_recipes['aggregation_recipes'] %} + {% for task in recipe_tasks %} + [[{{task}}]] + inherit=BAKE_AGGREGATION_RECIPES + [[[environment]]] + RECIPE_FILE={{recipe_file}} + {% endfor %} + {% endfor %} + {% endif %} [[housekeeping]] # Housekeep input data files. diff --git a/src/CSET/cset_workflow/lib/python/jinja_utils.py b/src/CSET/cset_workflow/lib/python/jinja_utils.py index 62838ca6e..bf20b0622 100644 --- a/src/CSET/cset_workflow/lib/python/jinja_utils.py +++ b/src/CSET/cset_workflow/lib/python/jinja_utils.py @@ -16,6 +16,14 @@ import base64 import json +import logging +import os +import subprocess +from pathlib import Path +from typing import Any + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) def get_models(rose_variables: dict) -> list[dict]: @@ -46,3 +54,66 @@ def b64json(d: dict | list) -> str: new_d.pop("ROSE_SUITE_VARIABLES", None) output = base64.b64encode(json.dumps(new_d).encode()).decode() return output + + +def mix_recipes(rose_variables: dict): + """Load recipe and task names. + + Loads recipe files and generates unique task names for them. + """ + + def _is_True(val: Any) -> bool: + if isinstance(val, str): + return val.lower() in ["true"] + # use explicit comparison to handle jinja2 + # true/false types + return val == True # noqa: E712 + + shell_commands = [] + + if _is_True(rose_variables.get("CSET_ENV_USE_MODULES", False)): + shell_commands.append("IFS_SAVE=$IFS; IFS=' '") + if _is_True(rose_variables.get("MODULES_PURGE", False)): + shell_commands.append("module purge") + for module in rose_variables.get("MODULES_LIST", []): + shell_commands.append(f"module load {module}") + shell_commands.append("IFS=$IFS_SAVE") + + conda_path = Path(rose_variables.get("CONDA_PATH", "")) + + workflow_dir = Path(os.getcwd()) + linked_conda_environment = workflow_dir / "conda-environment" + mix_command = ( + f"{workflow_dir / 'app' / 'parbake_recipes' / 'bin' / 'parbake.py'} --premix" + ) + if linked_conda_environment.exists(): + LOGGER.debug(f"Activating conda environment from {conda_path}") + shell_commands.append( + f"{conda_path / 'conda'} run --no-capture-output --prefix {linked_conda_environment} {mix_command}" + ) + else: + try: + subprocess.run(["conda info --envs | grep -q '^cset-dev '"], check=True) + LOGGER.debug("Activating conda environment from cset-dev") + shell_commands.append( + f"{conda_path / 'conda'} run --no-capture-output --name cset-dev {mix_command}" + ) + except subprocess.CalledProcessError: + LOGGER.debug( + "No conda environment to use. Attempting last-ditch attempt to run directly." + ) + shell_commands.append(mix_command) + + premixing_env = os.environ.copy() + premixing_env["ENCODED_ROSE_SUITE_VARIABLES"] = b64json(rose_variables) + + p = subprocess.run( + " && ".join(shell_commands), + shell=True, + check=True, + stdout=subprocess.PIPE, + env=premixing_env, + ) + results = json.loads(base64.b64decode(p.stdout)) + + return results diff --git a/src/CSET/loaders/spatial_difference_field.py b/src/CSET/loaders/spatial_difference_field.py index d90fa1702..525c59ea6 100644 --- a/src/CSET/loaders/spatial_difference_field.py +++ b/src/CSET/loaders/spatial_difference_field.py @@ -217,7 +217,7 @@ def load(conf: Config): ]: base_model = models[0] yield RawRecipe( - recipe=f"mlevel_spatial_difference_case_aggregation_mean_{atype}.yaml", + recipe=f"level_spatial_difference_case_aggregation_mean_{atype}.yaml", variables={ "VARNAME": field, "LEVELTYPE": "model_level_number", diff --git a/src/CSET/recipes/__init__.py b/src/CSET/recipes/__init__.py index dfdd701a7..e93406d2c 100644 --- a/src/CSET/recipes/__init__.py +++ b/src/CSET/recipes/__init__.py @@ -19,6 +19,7 @@ import sys from collections.abc import Iterable from pathlib import Path +from tempfile import TemporaryDirectory from typing import Any from ruamel.yaml import YAML @@ -194,6 +195,16 @@ def __eq__(self, value: object) -> bool: ) return NotImplemented + @property + def recipe_subdir(self) -> Path: + """Return the lowest level target directory for the recipe file(s).""" + return Path("aggregation_recipes" if self.aggregation else "recipes") + + @staticmethod + def get_recipe_filename(parsed_recipe: dict) -> str: + """Return a suitable filename for the recipe.""" + return f"{slugify(parsed_recipe['title'])}.yaml" + def parbake(self, ROSE_DATAC: Path, SHARE_DIR: Path) -> None: """Pre-process recipe to bake in all variables. @@ -210,13 +221,13 @@ def parbake(self, ROSE_DATAC: Path, SHARE_DIR: Path) -> None: # Collect configuration from environment. if self.aggregation: # Construct the location for the recipe. - recipe_dir = ROSE_DATAC / "aggregation_recipes" + recipe_dir = ROSE_DATAC / self.recipe_subdir # Construct the input data directories for the cycle. data_dirs = [ SHARE_DIR / f"cycle/*/data/{model_id}" for model_id in self.model_ids ] else: - recipe_dir = ROSE_DATAC / "recipes" + recipe_dir = ROSE_DATAC / self.recipe_subdir data_dirs = [ROSE_DATAC / f"data/{model_id}" for model_id in self.model_ids] # Ensure recipe dir exists. @@ -227,11 +238,25 @@ def parbake(self, ROSE_DATAC: Path, SHARE_DIR: Path) -> None: # Parbake this recipe, saving into recipe_dir. recipe = parse_recipe(Path(self.recipe), self.variables) - output = recipe_dir / f"{slugify(recipe['title'])}.yaml" + output = recipe_dir / self.get_recipe_filename(recipe) with open(output, "wt") as fp: with YAML(pure=True, output=fp) as yaml: yaml.dump(recipe) + def premixed_names(self) -> tuple[str, list[str]]: + """Pre-process recipe to generate a file name and list of rose task names.""" + with TemporaryDirectory() as tmpdir: + unpack_path = Path(tmpdir) + unpack_recipe(unpack_path, self.recipe) + recipe = parse_recipe( + unpack_path / self.recipe, self.variables, allow_missing=True + ) + title = recipe["title"] + filename = self.recipe_subdir / self.get_recipe_filename(recipe) + + names = [slugify(f"{title}_{model_id})") for model_id in self.model_ids] + return str(filename), names + class Config: """Namespace for easy access to configuration values.