From 068bc8f6d2150f7630ce73add561bc5f1c0e9571 Mon Sep 17 00:00:00 2001 From: Simon Brugman Date: Wed, 31 Jan 2024 15:40:03 +0100 Subject: [PATCH] feat(airflow): include environment name in DAG filename (#492) * feat: include environment name in DAG file Signed-off-by: Simon Brugman * doc: add update to release notes Signed-off-by: Simon Brugman --------- Signed-off-by: Simon Brugman Co-authored-by: Ankita Katiyar <110245118+ankatiyar@users.noreply.github.com> --- kedro-airflow/RELEASE.md | 1 + kedro-airflow/kedro_airflow/plugin.py | 20 ++++++++++++-------- kedro-airflow/tests/test_plugin.py | 2 +- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/kedro-airflow/RELEASE.md b/kedro-airflow/RELEASE.md index fe8a79668..9aadcede8 100755 --- a/kedro-airflow/RELEASE.md +++ b/kedro-airflow/RELEASE.md @@ -1,5 +1,6 @@ # Upcoming Release * Option to group MemoryDatasets in the same Airflow task (breaking change for custom template via `--jinja-file`). +* Include the environment name in the DAG file name when different from the default. # Release 0.8.0 * Added support for Kedro 0.19.x diff --git a/kedro-airflow/kedro_airflow/plugin.py b/kedro-airflow/kedro_airflow/plugin.py index 1a164d4d1..3e5b79df0 100644 --- a/kedro-airflow/kedro_airflow/plugin.py +++ b/kedro-airflow/kedro_airflow/plugin.py @@ -24,6 +24,8 @@ passing multiple values using `--pipeline [p1] --pipeline [p2]`. Use the `--all` flag to convert all registered pipelines at once.""" ALL_ARG_HELP = """Convert all registered pipelines at once.""" +DEFAULT_RUN_ENV = "local" +DEFAULT_PIPELINE = "__default__" @click.group(name="Kedro-Airflow") @@ -80,11 +82,11 @@ def _get_pipeline_config(config_airflow: dict, params: dict, pipeline_name: str) "--pipelines", "pipeline_names", multiple=True, - default=("__default__",), + default=(DEFAULT_PIPELINE,), help=PIPELINE_ARG_HELP, ) @click.option("--all", "convert_all", is_flag=True, help=ALL_ARG_HELP) -@click.option("-e", "--env", default="local", help=ENV_HELP) +@click.option("-e", "--env", default=DEFAULT_RUN_ENV, help=ENV_HELP) @click.option( "-t", "--target-dir", @@ -129,7 +131,7 @@ def create( # noqa: PLR0913 convert_all: bool, ): """Create an Airflow DAG for a project""" - if convert_all and pipeline_names != ("__default__",): + if convert_all and pipeline_names != (DEFAULT_PIPELINE,): raise click.BadParameter( "The `--all` and `--pipeline` option are mutually exclusive." ) @@ -170,11 +172,13 @@ def create( # noqa: PLR0913 raise KedroCliError(f"Pipeline {name} not found.") # Obtain the file name - dag_filename = dags_folder / ( - f"{package_name}_dag.py" - if name == "__default__" - else f"{package_name}_{name}_dag.py" - ) + dag_name = package_name + if env != DEFAULT_RUN_ENV: + dag_name += f"_{env}" + if name != DEFAULT_PIPELINE: + dag_name += f"_{name}" + dag_name += "_dag.py" + dag_filename = dags_folder / dag_name # group memory nodes if group_in_memory: diff --git a/kedro-airflow/tests/test_plugin.py b/kedro-airflow/tests/test_plugin.py index 0969a601b..7783ba62e 100644 --- a/kedro-airflow/tests/test_plugin.py +++ b/kedro-airflow/tests/test_plugin.py @@ -252,7 +252,7 @@ def test_create_airflow_dag_env_parameter_exists(cli_runner, metadata): _kedro_create_env(Path.cwd()) - dag_file = Path.cwd() / "airflow_dags" / f"{dag_name}_dag.py" + dag_file = Path.cwd() / "airflow_dags" / f"{dag_name}_remote_dag.py" result = cli_runner.invoke(commands, command, obj=metadata) assert result.exit_code == 0, (result.exit_code, result.stdout)