Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(airflow): [Draft] Add grouping for namespaces #981

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions kedro-airflow/kedro_airflow/grouping.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,31 @@ def dfs(cur_node_name: str, component: int) -> None:
group_dependencies[new_name_parent].append(new_name_child)

return group_to_seq, group_dependencies


def group_by_namespace(
pipeline: Pipeline,
) -> tuple[dict[str, list[Node]], dict[str, list[str]]]:
"""
Groups nodes based on their namespace.
"""
nodes: dict[str, list[Node]] = {}
dependencies: dict[str, list[str]] = {}

for node in pipeline.nodes:
key = node.namespace if node.namespace else node.name
if key not in nodes:
nodes[key] = []
dependencies[key] = []
nodes[key].append(node)
for parent in pipeline.node_dependencies[node]:
if parent.namespace and parent.namespace != key:
dependencies[key].append(parent.namespace)
elif parent.namespace and parent.namespace == key:
continue
else:
dependencies[key].append(parent.name)
for key, value in dependencies.items():
dependencies[key] = list(set(value))

return nodes, dependencies
10 changes: 9 additions & 1 deletion kedro-airflow/kedro_airflow/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from kedro.framework.startup import ProjectMetadata
from slugify import slugify

from kedro_airflow.grouping import group_memory_nodes
from kedro_airflow.grouping import group_by_namespace, group_memory_nodes

PIPELINE_ARG_HELP = """Name of the registered pipeline to convert.
If not set, the '__default__' pipeline is used. This argument supports
Expand Down Expand Up @@ -141,6 +141,11 @@ def _get_pipeline_config(config_airflow: dict, params: dict, pipeline_name: str)
help=CONF_SOURCE_HELP,
default=None,
)
@click.option(
"--group-namespace",
is_flag=True,
default=False,
)
@click.pass_obj
def create( # noqa: PLR0913, PLR0912
metadata: ProjectMetadata,
Expand All @@ -152,6 +157,7 @@ def create( # noqa: PLR0913, PLR0912
tags,
params,
conf_source,
group_namespace,
convert_all: bool,
):
"""Create an Airflow DAG for a project"""
Expand Down Expand Up @@ -218,6 +224,8 @@ def create( # noqa: PLR0913, PLR0912
# topological sort order obtained from pipeline.nodes, see group_memory_nodes()
# implementation
nodes, dependencies = group_memory_nodes(context.catalog, pipeline)
elif group_namespace:
nodes, dependencies = group_by_namespace(pipeline)
else:
# To keep the order of nodes and dependencies deterministic - nodes are
# iterated in the topological sort order obtained from pipeline.nodes and
Expand Down
Loading