Skip to content

Commit

Permalink
AIP-84 List DAGs endpoint new features (2/2) Advanced (apache#45420)
Browse files Browse the repository at this point in the history
  • Loading branch information
prabhusneha authored Jan 22, 2025
1 parent 8fc7721 commit 3edd78a
Show file tree
Hide file tree
Showing 11 changed files with 428 additions and 34 deletions.
83 changes: 64 additions & 19 deletions airflow/api_fastapi/common/db/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,76 @@

from __future__ import annotations

from typing import TYPE_CHECKING

from sqlalchemy import func, select

if TYPE_CHECKING:
from sqlalchemy.sql import Select

from airflow.models.dag import DagModel
from airflow.models.dagrun import DagRun

latest_dag_run_per_dag_id_cte = (
select(DagRun.dag_id, func.max(DagRun.start_date).label("start_date"))
.where()
.group_by(DagRun.dag_id)
.cte()
)

def generate_dag_with_latest_run_query(dag_runs_cte: Select | None = None) -> Select:
latest_dag_run_per_dag_id_cte = (
select(DagRun.dag_id, func.max(DagRun.start_date).label("start_date"))
.where()
.group_by(DagRun.dag_id)
.cte()
)

dags_select_with_latest_dag_run = (
select(DagModel)
.join(
latest_dag_run_per_dag_id_cte,
DagModel.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id,
isouter=True,
)
.join(
DagRun,
DagRun.start_date == latest_dag_run_per_dag_id_cte.c.start_date
and DagRun.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id,
isouter=True,
)
.order_by(DagModel.dag_id)
)

if dag_runs_cte is None:
return dags_select_with_latest_dag_run

dags_select_with_latest_dag_run = (
select(DagModel)
.join(
latest_dag_run_per_dag_id_cte,
DagModel.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id,
isouter=True,
dag_run_filters_cte = (
select(DagModel.dag_id)
.join(
dag_runs_cte,
DagModel.dag_id == dag_runs_cte.c.dag_id,
)
.join(
DagRun,
DagRun.dag_id == dag_runs_cte.c.dag_id,
)
.group_by(DagModel.dag_id)
.cte()
)
.join(
DagRun,
DagRun.start_date == latest_dag_run_per_dag_id_cte.c.start_date
and DagRun.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id,
isouter=True,

dags_with_latest_and_filtered_runs = (
select(DagModel)
.join(
dag_run_filters_cte,
dag_run_filters_cte.c.dag_id == DagModel.dag_id,
)
.join(
latest_dag_run_per_dag_id_cte,
DagModel.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id,
isouter=True,
)
.join(
DagRun,
DagRun.start_date == latest_dag_run_per_dag_id_cte.c.start_date
and DagRun.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id,
isouter=True,
)
.order_by(DagModel.dag_id)
)
.order_by(DagModel.dag_id)
)

return dags_with_latest_and_filtered_runs
37 changes: 30 additions & 7 deletions airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
Any,
Callable,
Generic,
Literal,
Optional,
TypeVar,
Union,
Expand All @@ -36,7 +37,7 @@
from fastapi import Depends, HTTPException, Query, status
from pendulum.parsing.exceptions import ParserError
from pydantic import AfterValidator, BaseModel, NonNegativeInt
from sqlalchemy import Column, case, or_
from sqlalchemy import Column, and_, case, or_
from sqlalchemy.inspection import inspect

from airflow.models import Base
Expand Down Expand Up @@ -233,6 +234,7 @@ class FilterOptionEnum(Enum):
IN = "in"
NOT_IN = "not_in"
ANY_EQUAL = "any_eq"
ALL_EQUAL = "all_eq"
IS_NONE = "is_none"


Expand Down Expand Up @@ -265,6 +267,9 @@ def to_orm(self, select: Select) -> Select:
if self.filter_option == FilterOptionEnum.ANY_EQUAL:
conditions = [self.attribute == val for val in self.value]
return select.where(or_(*conditions))
if self.filter_option == FilterOptionEnum.ALL_EQUAL:
conditions = [self.attribute == val for val in self.value]
return select.where(and_(*conditions))
raise HTTPException(
400, f"Invalid filter option {self.filter_option} for list value {self.value}"
)
Expand Down Expand Up @@ -324,21 +329,33 @@ def depends_filter(value: T | None = query) -> FilterParam[T | None]:
return depends_filter


class _TagsFilter(BaseParam[list[str]]):
class _TagFilterModel(BaseModel):
"""Tag Filter Model with a match mode parameter."""

tags: list[str]
tags_match_mode: Literal["any", "all"] | None


class _TagsFilter(BaseParam[_TagFilterModel]):
"""Filter on tags."""

def to_orm(self, select: Select) -> Select:
if self.skip_none is False:
raise ValueError(f"Cannot set 'skip_none' to False on a {type(self)}")

if not self.value:
if not self.value or not self.value.tags:
return select

conditions = [DagModel.tags.any(DagTag.name == tag) for tag in self.value]
return select.where(or_(*conditions))
conditions = [DagModel.tags.any(DagTag.name == tag) for tag in self.value.tags]
operator = or_ if not self.value.tags_match_mode or self.value.tags_match_mode == "any" else and_
return select.where(operator(*conditions))

def depends(self, tags: list[str] = Query(default_factory=list)) -> _TagsFilter:
return self.set_value(tags)
def depends(
self,
tags: list[str] = Query(default_factory=list),
tags_match_mode: Literal["any", "all"] | None = None,
) -> _TagsFilter:
return self.set_value(_TagFilterModel(tags=tags, tags_match_mode=tags_match_mode))


class _OwnersFilter(BaseParam[list[str]]):
Expand Down Expand Up @@ -443,6 +460,12 @@ def to_orm(self, select: Select) -> Select:
def depends(self, *args: Any, **kwargs: Any) -> Self:
raise NotImplementedError("Use the `range_filter_factory` function to create the dependency")

def is_active(self) -> bool:
"""Check if the range filter has any active bounds."""
return self.value is not None and (
self.value.lower_bound is not None or self.value.upper_bound is not None
)


def datetime_range_filter_factory(
filter_name: str, model: Base, attribute_name: str | None = None
Expand Down
77 changes: 77 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,17 @@ paths:
items:
type: string
title: Tags
- name: tags_match_mode
in: query
required: false
schema:
anyOf:
- enum:
- any
- all
type: string
- type: 'null'
title: Tags Match Mode
- name: owners
in: query
required: false
Expand Down Expand Up @@ -2893,6 +2904,17 @@ paths:
items:
type: string
title: Tags
- name: tags_match_mode
in: query
required: false
schema:
anyOf:
- enum:
- any
- all
type: string
- type: 'null'
title: Tags Match Mode
- name: owners
in: query
required: false
Expand Down Expand Up @@ -2940,6 +2962,50 @@ paths:
- $ref: '#/components/schemas/DagRunState'
- type: 'null'
title: Last Dag Run State
- name: dag_run_start_date_gte
in: query
required: false
schema:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Dag Run Start Date Gte
- name: dag_run_start_date_lte
in: query
required: false
schema:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Dag Run Start Date Lte
- name: dag_run_end_date_gte
in: query
required: false
schema:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Dag Run End Date Gte
- name: dag_run_end_date_lte
in: query
required: false
schema:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Dag Run End Date Lte
- name: dag_run_state
in: query
required: false
schema:
type: array
items:
type: string
title: Dag Run State
- name: order_by
in: query
required: false
Expand Down Expand Up @@ -3013,6 +3079,17 @@ paths:
items:
type: string
title: Tags
- name: tags_match_mode
in: query
required: false
schema:
anyOf:
- enum:
- any
- all
type: string
- type: 'null'
title: Tags Match Mode
- name: owners
in: query
required: false
Expand Down
47 changes: 43 additions & 4 deletions airflow/api_fastapi/core_api/routes/public/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,17 @@
from fastapi import Depends, HTTPException, Query, Request, Response, status
from fastapi.exceptions import RequestValidationError
from pydantic import ValidationError
from sqlalchemy import update
from sqlalchemy import select, update

from airflow.api.common import delete_dag as delete_dag_module
from airflow.api_fastapi.common.db.common import (
SessionDep,
paginated_select,
)
from airflow.api_fastapi.common.db.dags import dags_select_with_latest_dag_run
from airflow.api_fastapi.common.db.dags import generate_dag_with_latest_run_query
from airflow.api_fastapi.common.parameters import (
FilterOptionEnum,
FilterParam,
QueryDagDisplayNamePatternSearch,
QueryDagIdPatternSearch,
QueryDagIdPatternSearchWithNone,
Expand All @@ -41,7 +43,11 @@
QueryOwnersFilter,
QueryPausedFilter,
QueryTagsFilter,
RangeFilter,
SortParam,
_transform_dag_run_states,
datetime_range_filter_factory,
filter_param_factory,
)
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.dags import (
Expand Down Expand Up @@ -69,6 +75,25 @@ def get_dags(
only_active: QueryOnlyActiveFilter,
paused: QueryPausedFilter,
last_dag_run_state: QueryLastDagRunStateFilter,
dag_run_start_date_range: Annotated[
RangeFilter, Depends(datetime_range_filter_factory("dag_run_start_date", DagRun, "start_date"))
],
dag_run_end_date_range: Annotated[
RangeFilter, Depends(datetime_range_filter_factory("dag_run_end_date", DagRun, "end_date"))
],
dag_run_state: Annotated[
FilterParam[list[str]],
Depends(
filter_param_factory(
DagRun.state,
list[str],
FilterOptionEnum.ANY_EQUAL,
"dag_run_state",
default_factory=list,
transform_callable=_transform_dag_run_states,
)
),
],
order_by: Annotated[
SortParam,
Depends(
Expand All @@ -82,8 +107,22 @@ def get_dags(
session: SessionDep,
) -> DAGCollectionResponse:
"""Get all DAGs."""
dag_runs_select = None

if dag_run_state.value or dag_run_start_date_range.is_active() or dag_run_end_date_range.is_active():
dag_runs_select, _ = paginated_select(
statement=select(DagRun),
filters=[
dag_run_start_date_range,
dag_run_end_date_range,
dag_run_state,
],
session=session,
)
dag_runs_select = dag_runs_select.cte()

dags_select, total_entries = paginated_select(
statement=dags_select_with_latest_dag_run,
statement=generate_dag_with_latest_run_query(dag_runs_select),
filters=[
only_active,
paused,
Expand Down Expand Up @@ -240,7 +279,7 @@ def patch_dags(
update_mask = ["is_paused"]

dags_select, total_entries = paginated_select(
statement=dags_select_with_latest_dag_run,
statement=generate_dag_with_latest_run_query(),
filters=[only_active, paused, dag_id_pattern, tags, owners, last_dag_run_state],
order_by=None,
offset=offset,
Expand Down
Loading

0 comments on commit 3edd78a

Please sign in to comment.