Skip to content

Commit

Permalink
feat(airflow): include environment name in DAG filename (#492)
Browse files Browse the repository at this point in the history
* feat: include environment name in DAG file

Signed-off-by: Simon Brugman <[email protected]>

* doc: add update to release notes

Signed-off-by: Simon Brugman <[email protected]>

---------

Signed-off-by: Simon Brugman <[email protected]>
Co-authored-by: Ankita Katiyar <[email protected]>
  • Loading branch information
sbrugman and ankatiyar authored Jan 31, 2024
1 parent 01f1ccf commit 068bc8f
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 9 deletions.
1 change: 1 addition & 0 deletions kedro-airflow/RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Upcoming Release
* Option to group MemoryDatasets in the same Airflow task (breaking change for custom template via `--jinja-file`).
* Include the environment name in the DAG file name when different from the default.

# Release 0.8.0
* Added support for Kedro 0.19.x
Expand Down
20 changes: 12 additions & 8 deletions kedro-airflow/kedro_airflow/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
passing multiple values using `--pipeline [p1] --pipeline [p2]`.
Use the `--all` flag to convert all registered pipelines at once."""
ALL_ARG_HELP = """Convert all registered pipelines at once."""
DEFAULT_RUN_ENV = "local"
DEFAULT_PIPELINE = "__default__"


@click.group(name="Kedro-Airflow")
Expand Down Expand Up @@ -80,11 +82,11 @@ def _get_pipeline_config(config_airflow: dict, params: dict, pipeline_name: str)
"--pipelines",
"pipeline_names",
multiple=True,
default=("__default__",),
default=(DEFAULT_PIPELINE,),
help=PIPELINE_ARG_HELP,
)
@click.option("--all", "convert_all", is_flag=True, help=ALL_ARG_HELP)
@click.option("-e", "--env", default="local", help=ENV_HELP)
@click.option("-e", "--env", default=DEFAULT_RUN_ENV, help=ENV_HELP)
@click.option(
"-t",
"--target-dir",
Expand Down Expand Up @@ -129,7 +131,7 @@ def create( # noqa: PLR0913
convert_all: bool,
):
"""Create an Airflow DAG for a project"""
if convert_all and pipeline_names != ("__default__",):
if convert_all and pipeline_names != (DEFAULT_PIPELINE,):
raise click.BadParameter(
"The `--all` and `--pipeline` option are mutually exclusive."
)
Expand Down Expand Up @@ -170,11 +172,13 @@ def create( # noqa: PLR0913
raise KedroCliError(f"Pipeline {name} not found.")

# Obtain the file name
dag_filename = dags_folder / (
f"{package_name}_dag.py"
if name == "__default__"
else f"{package_name}_{name}_dag.py"
)
dag_name = package_name
if env != DEFAULT_RUN_ENV:
dag_name += f"_{env}"
if name != DEFAULT_PIPELINE:
dag_name += f"_{name}"
dag_name += "_dag.py"
dag_filename = dags_folder / dag_name

# group memory nodes
if group_in_memory:
Expand Down
2 changes: 1 addition & 1 deletion kedro-airflow/tests/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ def test_create_airflow_dag_env_parameter_exists(cli_runner, metadata):

_kedro_create_env(Path.cwd())

dag_file = Path.cwd() / "airflow_dags" / f"{dag_name}_dag.py"
dag_file = Path.cwd() / "airflow_dags" / f"{dag_name}_remote_dag.py"
result = cli_runner.invoke(commands, command, obj=metadata)

assert result.exit_code == 0, (result.exit_code, result.stdout)
Expand Down

0 comments on commit 068bc8f

Please sign in to comment.