diff --git a/src/CSET/cset_workflow/app/parbake_recipes/bin/parbake.py b/src/CSET/cset_workflow/app/parbake_recipes/bin/parbake.py index 565901a27..667f0c0ce 100755 --- a/src/CSET/cset_workflow/app/parbake_recipes/bin/parbake.py +++ b/src/CSET/cset_workflow/app/parbake_recipes/bin/parbake.py @@ -17,13 +17,16 @@ import json import os +import subprocess from base64 import b64decode from pathlib import Path from CSET.recipes import load_recipes -def parbake_all(variables: dict, rose_datac: Path, share_dir: Path, aggregation: bool): +def parbake_all( + variables: dict, rose_datac: Path, share_dir: Path, aggregation: bool +) -> int: """Generate and parbake recipes from configuration.""" # Gather all recipes into a big list. recipes = list(load_recipes(variables)) @@ -31,9 +34,12 @@ def parbake_all(variables: dict, rose_datac: Path, share_dir: Path, aggregation: if not recipes: raise ValueError("At least one recipe should be enabled.") # Parbake all recipes remaining after filtering aggregation recipes. + recipe_count = 0 for recipe in filter(lambda r: r.aggregation == aggregation, recipes): print(f"Parbaking {recipe}", flush=True) recipe.parbake(rose_datac, share_dir) + recipe_count += 1 + return recipe_count def main(): @@ -44,7 +50,23 @@ def main(): share_dir = Path(os.environ["CYLC_WORKFLOW_SHARE_DIR"]) aggregation = bool(os.getenv("DO_CASE_AGGREGATION")) # Parbake recipes for cycle. - parbake_all(variables, rose_datac, share_dir, aggregation) + recipe_count = parbake_all(variables, rose_datac, share_dir, aggregation) + + # If running under cylc, notify cylc of task completion. + cylc_workflow_id = os.getenv("CYLC_WORKFLOW_ID") + cylc_task_job = os.getenv("CYLC_TASK_JOB") + if cylc_workflow_id and cylc_task_job: + message_command = [ + "cylc", + "message", + "--", + cylc_workflow_id, + cylc_task_job, + ] + if recipe_count: + subprocess.run(message_command + ["start baking"], check=True) + else: + subprocess.run(message_command + ["skip baking"], check=True) if __name__ == "__main__": # pragma: no cover diff --git a/src/CSET/cset_workflow/flow.cylc b/src/CSET/cset_workflow/flow.cylc index bd4a875ae..435d2b848 100644 --- a/src/CSET/cset_workflow/flow.cylc +++ b/src/CSET/cset_workflow/flow.cylc @@ -40,7 +40,9 @@ final cycle point = {{CSET_TRIAL_END_DATE}} R1/{{date}} = """ setup_complete[^] => parbake_recipes setup_complete[^] => FETCH_DATA:succeed-all => fetch_complete - fetch_complete & parbake_recipes => bake_recipes => cycle_complete + fetch_complete & parbake_recipes:start_baking? => bake_recipes + parbake_recipes:skip_baking? => ! bake_recipes + parbake_recipes:skip_baking? | bake_recipes => cycle_complete """ {% endfor %} {% elif CSET_CYCLING_MODE == "trial" %} @@ -48,7 +50,9 @@ final cycle point = {{CSET_TRIAL_END_DATE}} {{CSET_TRIAL_CYCLE_PERIOD}} = """ setup_complete[^] => parbake_recipes setup_complete[^] => FETCH_DATA:succeed-all => fetch_complete - fetch_complete & parbake_recipes => bake_recipes => cycle_complete + fetch_complete & parbake_recipes:start_baking? => bake_recipes + parbake_recipes:skip_baking? => ! bake_recipes + parbake_recipes:skip_baking? | bake_recipes => cycle_complete """ {% endif %} @@ -56,7 +60,9 @@ final cycle point = {{CSET_TRIAL_END_DATE}} R1/$ = """ # Run aggregation recipes. setup_complete[^] => parbake_aggregation_recipes - fetch_complete & parbake_aggregation_recipes => bake_aggregation_recipes => cycle_complete + fetch_complete & parbake_aggregation_recipes:start_baking? => bake_aggregation_recipes + parbake_aggregation_recipes:skip_baking? => ! bake_aggregation_recipes + parbake_aggregation_recipes:skip_baking? | bake_aggregation_recipes => cycle_complete # Finalise website and cleanup. cycle_complete => finish_website => send_email cycle_complete => housekeeping @@ -102,8 +108,12 @@ final cycle point = {{CSET_TRIAL_END_DATE}} [[PARBAKE]] script = rose task-run -v --app-key=parbake_recipes execution time limit = PT5M + completion = succeeded and (start_baking or skip_baking) [[[environment]]] ENCODED_ROSE_SUITE_VARIABLES = {{b64json(ROSE_SUITE_VARIABLES)}} + [[[outputs]]] + start_baking='start baking' + skip_baking='skip baking' [[METPLUS]] [[[environment]]] diff --git a/tests/workflow_utils/test_parbake_recipes.py b/tests/workflow_utils/test_parbake_recipes.py index 894f0655f..367b4441a 100644 --- a/tests/workflow_utils/test_parbake_recipes.py +++ b/tests/workflow_utils/test_parbake_recipes.py @@ -14,6 +14,7 @@ """Tests for parbake_recipe workflow utility.""" +import subprocess from pathlib import Path import pytest @@ -25,14 +26,28 @@ def test_main(monkeypatch): """Check parbake.main() invokes parbake_all correctly.""" function_ran = False + recipes_parbaked = 0 + cylc_message_ran = False + cylc_message = "" def mock_parbake_all(variables, rose_datac, share_dir, aggregation): nonlocal function_ran + nonlocal recipes_parbaked function_ran = True assert variables == {"variable": "value"} assert rose_datac == Path("/share/cycle/20000101T0000Z") assert share_dir == Path("/share") assert isinstance(aggregation, bool) + return recipes_parbaked + + def mock_run(cmd, **kwargs): + nonlocal cylc_message + nonlocal cylc_message_ran + cylc_message_ran = True + assert cmd[0:3] == ["cylc", "message", "--"] + assert cmd[3] == "test-workflow" + assert cmd[4] == "test-job" + assert cmd[5] == cylc_message monkeypatch.setattr(parbake, "parbake_all", mock_parbake_all) @@ -51,6 +66,25 @@ def mock_parbake_all(variables, rose_datac, share_dir, aggregation): parbake.main() assert function_ran, "Function did not run!" + # Retry with cylc environment variables set. + monkeypatch.setattr(subprocess, "run", mock_run) + monkeypatch.setenv("CYLC_WORKFLOW_ID", "test-workflow") + monkeypatch.setenv("CYLC_TASK_JOB", "test-job") + + # No recipes parbaked. + function_ran = False + recipes_parbaked = 0 + cylc_message = "skip baking" + parbake.main() + assert cylc_message_ran, "Cylc message function did not run!" + + # Some recipes parbaked. + function_ran = False + recipes_parbaked = 3 + cylc_message = "start baking" + parbake.main() + assert cylc_message_ran, "Cylc message function did not run!" + def test_parbake_all_none_enabled(tmp_working_dir, monkeypatch): """Error when no recipes are enabled."""