Skip to content

Commit

Permalink
feat(airflow): Add --conf-source to DAG created with kedro-airflow (
Browse files Browse the repository at this point in the history
#712)

* Add --conf-source to kedro-airflow

Signed-off-by: Ankita Katiyar <[email protected]>

* Add conf_source to operator

Signed-off-by: Ankita Katiyar <[email protected]>

* Set default behavour

Signed-off-by: Ankita Katiyar <[email protected]>

* fix tests

Signed-off-by: Ankita Katiyar <[email protected]>

* fix tests

Signed-off-by: Ankita Katiyar <[email protected]>

* Release notes + remove print

Signed-off-by: Ankita Katiyar <[email protected]>

* Turn of path exist check

Signed-off-by: Ankita Katiyar <[email protected]>

* Turn of path resolving

Signed-off-by: Ankita Katiyar <[email protected]>

---------

Signed-off-by: Ankita Katiyar <[email protected]>
  • Loading branch information
ankatiyar authored Jun 6, 2024
1 parent f157e1a commit ca46a05
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 1 deletion.
1 change: 1 addition & 0 deletions kedro-airflow/RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Upcoming Release
* Added support to specify `--conf-source` which would point to the runtime configuration directory to be used for running the DAG in airflow. This configuration path is added to the generated DAG.

# Release 0.9.0
* Sort DAGs to make sure `kedro airflow create` is deterministic.
Expand Down
7 changes: 6 additions & 1 deletion kedro-airflow/kedro_airflow/airflow_dag_template.j2
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class KedroOperator(BaseOperator):
node_name: str | list[str],
project_path: str | Path,
env: str,
conf_source: str,
*args, **kwargs
) -> None:
super().__init__(*args, **kwargs)
Expand All @@ -28,10 +29,11 @@ class KedroOperator(BaseOperator):
self.node_name = node_name
self.project_path = project_path
self.env = env
self.conf_source = conf_source

def execute(self, context):
configure_project(self.package_name)
with KedroSession.create(self.project_path, env=self.env) as session:
with KedroSession.create(self.project_path, env=self.env, conf_source=self.conf_source) as session:
if isinstance(self.node_name, str):
self.node_name = [self.node_name]
session.run(self.pipeline_name, node_names=self.node_name)
Expand All @@ -41,6 +43,8 @@ env = "{{ env }}"
pipeline_name = "{{ pipeline_name }}"
project_path = Path.cwd()
package_name = "{{ package_name }}"
conf_source = "{{ conf_source }}" or Path.cwd() / "conf"


# Using a DAG context manager, you don't have to specify the dag property of each task
with DAG(
Expand Down Expand Up @@ -68,6 +72,7 @@ with DAG(
node_name={% if node_list | length > 1 %}[{% endif %}{% for node in node_list %}"{{ node.name | safe }}"{% if not loop.last %}, {% endif %}{% endfor %}{% if node_list | length > 1 %}]{% endif %},
project_path=project_path,
env=env,
conf_source=conf_source,
),
{% endfor %} }

Expand Down
12 changes: 12 additions & 0 deletions kedro-airflow/kedro_airflow/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
`--tags tag1,tag2`."""
DEFAULT_RUN_ENV = "local"
DEFAULT_PIPELINE = "__default__"
CONF_SOURCE_HELP = """Path to the configuration folder or archived file to be used in the Airflow DAG."""


@click.group(name="Kedro-Airflow")
Expand Down Expand Up @@ -134,6 +135,12 @@ def _get_pipeline_config(config_airflow: dict, params: dict, pipeline_name: str)
help=PARAMS_ARG_HELP,
callback=_split_params,
)
@click.option(
"--conf-source",
type=click.Path(exists=False, file_okay=True, resolve_path=False),
help=CONF_SOURCE_HELP,
default=None,
)
@click.pass_obj
def create( # noqa: PLR0913, PLR0912
metadata: ProjectMetadata,
Expand All @@ -144,9 +151,13 @@ def create( # noqa: PLR0913, PLR0912
group_in_memory,
tags,
params,
conf_source,
convert_all: bool,
):
"""Create an Airflow DAG for a project"""

if conf_source is None:
conf_source = ""
if convert_all and pipeline_names != (DEFAULT_PIPELINE,):
raise click.BadParameter(
"The `--all` and `--pipeline` option are mutually exclusive."
Expand Down Expand Up @@ -228,6 +239,7 @@ def create( # noqa: PLR0913, PLR0912
pipeline_name=name,
package_name=package_name,
pipeline=pipeline,
conf_source=conf_source,
**dag_config,
).dump(str(dag_filename))

Expand Down
15 changes: 15 additions & 0 deletions kedro-airflow/tests/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,3 +376,18 @@ def test_create_airflow_all_and_pipeline(cli_runner, metadata):
"Error: Invalid value: The `--all` and `--pipeline` option are mutually exclusive."
in result.stdout
)


def test_create_airflow_conf_source(cli_runner, metadata):
command = ["airflow", "create", "--conf-source", "conf"]
result = cli_runner.invoke(commands, command, obj=metadata)
assert result.exit_code == 0
dag_file = metadata.project_path / "airflow_dags" / "fake_project_dag.py"

assert dag_file.exists()

expected_airflow_dag = 'conf_source = "conf" or Path.cwd() / "conf"'
with dag_file.open(encoding="utf-8") as f:
dag_code = [line.strip() for line in f.read().splitlines()]
assert expected_airflow_dag in dag_code
dag_file.unlink()

0 comments on commit ca46a05

Please sign in to comment.