From 383ad31c76411fb0a9f7d4243729d7bb0640ff0c Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Tue, 2 Jan 2024 16:08:55 +0530 Subject: [PATCH] Raise error when ``DagRun`` fails while running ``dag test`` (#36517) **Motivation**: Currently, when using `airflow dags test`, there is no easy way to know programmatically if a DagRun fails since the state is not stored in DB. The way to do know relies on log lines as below: ```bash state=$(airflow dags test exception_dag | grep "DagRun Finished" | awk -F, '{for(i=1;i<=NF;i++) if ($i ~ / state=/) print $i}' | awk -F= '{print $2}') if [[ $state == "failed" ]]; then exit 1 else exit 0 fi ``` This PR adds will return an exit code 1 when `airflow dags test` command if DagRun fails and makes it easy to integrate in CI for testing. --- airflow/cli/commands/dag_command.py | 5 ++++- tests/cli/commands/test_dag_command.py | 11 +++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index 74609c41dc5be..a9d6aaf34210a 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -515,7 +515,7 @@ def dag_test(args, dag: DAG | None = None, session: Session = NEW_SESSION) -> No raise SystemExit(f"Configuration {args.conf!r} is not valid JSON. Error: {e}") execution_date = args.execution_date or timezone.utcnow() dag = dag or get_dag(subdir=args.subdir, dag_id=args.dag_id) - dag.test(execution_date=execution_date, run_conf=run_conf, session=session) + dr: DagRun = dag.test(execution_date=execution_date, run_conf=run_conf, session=session) show_dagrun = args.show_dagrun imgcat = args.imgcat_dagrun filename = args.save_dagrun @@ -536,6 +536,9 @@ def dag_test(args, dag: DAG | None = None, session: Session = NEW_SESSION) -> No if show_dagrun: print(dot_graph.source) + if dr and dr.state == DagRunState.FAILED: + raise SystemExit("DagRun failed") + @cli_utils.action_cli @providers_configuration_loaded diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py index 30b5c475ea4a9..4f16c381ad381 100644 --- a/tests/cli/commands/test_dag_command.py +++ b/tests/cli/commands/test_dag_command.py @@ -43,6 +43,7 @@ from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger from airflow.utils import timezone from airflow.utils.session import create_session +from airflow.utils.state import DagRunState from airflow.utils.types import DagRunType from tests.models import TEST_DAGS_FOLDER from tests.test_utils.config import conf_vars @@ -747,6 +748,16 @@ def test_dag_test(self, mock_get_dag): ] ) + @mock.patch("airflow.cli.commands.dag_command.get_dag") + def test_dag_test_fail_raise_error(self, mock_get_dag): + execution_date_str = DEFAULT_DATE.isoformat() + mock_get_dag.return_value.test.return_value = DagRun( + dag_id="example_bash_operator", execution_date=DEFAULT_DATE, state=DagRunState.FAILED + ) + cli_args = self.parser.parse_args(["dags", "test", "example_bash_operator", execution_date_str]) + with pytest.raises(SystemExit, match=r"DagRun failed"): + dag_command.dag_test(cli_args) + @mock.patch("airflow.cli.commands.dag_command.get_dag") @mock.patch("airflow.utils.timezone.utcnow") def test_dag_test_no_execution_date(self, mock_utcnow, mock_get_dag):