Skip to content

Commit

Permalink
AIP-65 | Filter dag structure by dag version
Browse files Browse the repository at this point in the history
  • Loading branch information
jason810496 committed Jan 30, 2025
1 parent 0637366 commit b127dbb
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 4 deletions.
8 changes: 8 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,14 @@ paths:
type: boolean
default: false
title: External Dependencies
- name: dag_version
in: query
required: false
schema:
anyOf:
- type: integer
- type: 'null'
title: Dag Version
responses:
'200':
description: Successful Response
Expand Down
18 changes: 17 additions & 1 deletion airflow/api_fastapi/core_api/routes/ui/structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
from __future__ import annotations

from fastapi import HTTPException, Request, status
from sqlalchemy import select

from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.common.parameters import QueryIncludeDownstream, QueryIncludeUpstream
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.ui.structure import StructureDataResponse
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.services.ui.structure import get_upstream_assets
from airflow.models.dag_version import DagVersion
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils.dag_edges import dag_edges
from airflow.utils.task_group import task_group_to_dict
Expand All @@ -43,9 +45,23 @@ def structure_data(
include_downstream: QueryIncludeDownstream = False,
root: str | None = None,
external_dependencies: bool = False,
dag_version: int | None = None,
) -> StructureDataResponse:
"""Get Structure Data."""
dag = request.app.state.dag_bag.get_dag(dag_id)
if dag_version is None:
dag = request.app.state.dag_bag.get_dag(dag_id)
else:
serialized_dag: SerializedDagModel = session.scalar(
select(SerializedDagModel)
.join(DagVersion)
.where(SerializedDagModel.dag_id == dag_id, DagVersion.version_number == dag_version)
)
if serialized_dag is None:
raise HTTPException(
status.HTTP_404_NOT_FOUND,
f"Dag with id {dag_id} with version {dag_version} was not found",
)
dag = serialized_dag.dag

if dag is None:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} was not found")
Expand Down
4 changes: 3 additions & 1 deletion airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -359,12 +359,14 @@ export const useStructureServiceStructureDataKey = "StructureServiceStructureDat
export const UseStructureServiceStructureDataKeyFn = (
{
dagId,
dagVersion,
externalDependencies,
includeDownstream,
includeUpstream,
root,
}: {
dagId: string;
dagVersion?: number;
externalDependencies?: boolean;
includeDownstream?: boolean;
includeUpstream?: boolean;
Expand All @@ -373,7 +375,7 @@ export const UseStructureServiceStructureDataKeyFn = (
queryKey?: Array<unknown>,
) => [
useStructureServiceStructureDataKey,
...(queryKey ?? [{ dagId, externalDependencies, includeDownstream, includeUpstream, root }]),
...(queryKey ?? [{ dagId, dagVersion, externalDependencies, includeDownstream, includeUpstream, root }]),
];
export type BackfillServiceListBackfillsDefaultResponse = Awaited<
ReturnType<typeof BackfillService.listBackfills>
Expand Down
5 changes: 5 additions & 0 deletions airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -475,19 +475,22 @@ export const prefetchUseDashboardServiceHistoricalMetrics = (
* @param data.includeDownstream
* @param data.root
* @param data.externalDependencies
* @param data.dagVersion
* @returns StructureDataResponse Successful Response
* @throws ApiError
*/
export const prefetchUseStructureServiceStructureData = (
queryClient: QueryClient,
{
dagId,
dagVersion,
externalDependencies,
includeDownstream,
includeUpstream,
root,
}: {
dagId: string;
dagVersion?: number;
externalDependencies?: boolean;
includeDownstream?: boolean;
includeUpstream?: boolean;
Expand All @@ -497,6 +500,7 @@ export const prefetchUseStructureServiceStructureData = (
queryClient.prefetchQuery({
queryKey: Common.UseStructureServiceStructureDataKeyFn({
dagId,
dagVersion,
externalDependencies,
includeDownstream,
includeUpstream,
Expand All @@ -505,6 +509,7 @@ export const prefetchUseStructureServiceStructureData = (
queryFn: () =>
StructureService.structureData({
dagId,
dagVersion,
externalDependencies,
includeDownstream,
includeUpstream,
Expand Down
6 changes: 5 additions & 1 deletion airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,7 @@ export const useDashboardServiceHistoricalMetrics = <
* @param data.includeDownstream
* @param data.root
* @param data.externalDependencies
* @param data.dagVersion
* @returns StructureDataResponse Successful Response
* @throws ApiError
*/
Expand All @@ -595,12 +596,14 @@ export const useStructureServiceStructureData = <
>(
{
dagId,
dagVersion,
externalDependencies,
includeDownstream,
includeUpstream,
root,
}: {
dagId: string;
dagVersion?: number;
externalDependencies?: boolean;
includeDownstream?: boolean;
includeUpstream?: boolean;
Expand All @@ -611,12 +614,13 @@ export const useStructureServiceStructureData = <
) =>
useQuery<TData, TError>({
queryKey: Common.UseStructureServiceStructureDataKeyFn(
{ dagId, externalDependencies, includeDownstream, includeUpstream, root },
{ dagId, dagVersion, externalDependencies, includeDownstream, includeUpstream, root },
queryKey,
),
queryFn: () =>
StructureService.structureData({
dagId,
dagVersion,
externalDependencies,
includeDownstream,
includeUpstream,
Expand Down
6 changes: 5 additions & 1 deletion airflow/ui/openapi-gen/queries/suspense.ts
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,7 @@ export const useDashboardServiceHistoricalMetricsSuspense = <
* @param data.includeDownstream
* @param data.root
* @param data.externalDependencies
* @param data.dagVersion
* @returns StructureDataResponse Successful Response
* @throws ApiError
*/
Expand All @@ -574,12 +575,14 @@ export const useStructureServiceStructureDataSuspense = <
>(
{
dagId,
dagVersion,
externalDependencies,
includeDownstream,
includeUpstream,
root,
}: {
dagId: string;
dagVersion?: number;
externalDependencies?: boolean;
includeDownstream?: boolean;
includeUpstream?: boolean;
Expand All @@ -590,12 +593,13 @@ export const useStructureServiceStructureDataSuspense = <
) =>
useSuspenseQuery<TData, TError>({
queryKey: Common.UseStructureServiceStructureDataKeyFn(
{ dagId, externalDependencies, includeDownstream, includeUpstream, root },
{ dagId, dagVersion, externalDependencies, includeDownstream, includeUpstream, root },
queryKey,
),
queryFn: () =>
StructureService.structureData({
dagId,
dagVersion,
externalDependencies,
includeDownstream,
includeUpstream,
Expand Down
2 changes: 2 additions & 0 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,7 @@ export class StructureService {
* @param data.includeDownstream
* @param data.root
* @param data.externalDependencies
* @param data.dagVersion
* @returns StructureDataResponse Successful Response
* @throws ApiError
*/
Expand All @@ -753,6 +754,7 @@ export class StructureService {
include_downstream: data.includeDownstream,
root: data.root,
external_dependencies: data.externalDependencies,
dag_version: data.dagVersion,
},
errors: {
404: "Not Found",
Expand Down
1 change: 1 addition & 0 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1665,6 +1665,7 @@ export type HistoricalMetricsResponse = HistoricalMetricDataResponse;

export type StructureDataData = {
dagId: string;
dagVersion?: number | null;
externalDependencies?: boolean;
includeDownstream?: boolean;
includeUpstream?: boolean;
Expand Down
112 changes: 112 additions & 0 deletions tests/api_fastapi/core_api/routes/ui/test_structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@
# under the License.
from __future__ import annotations

import copy

import pendulum
import pytest
from deepdiff import DeepDiff

from airflow.models import DagBag
from airflow.models.dag_version import DagVersion
from airflow.models.serialized_dag import SerializedDagModel
from airflow.operators.empty import EmptyOperator
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.standard.sensors.external_task import ExternalTaskSensor
Expand All @@ -33,6 +37,53 @@

DAG_ID = "test_dag_id"
DAG_ID_EXTERNAL_TRIGGER = "external_trigger"
LATEST_VERSION_DAG_RESPONSE: dict = {
"edges": [],
"nodes": [
{
"children": None,
"id": "task1",
"is_mapped": None,
"label": "task1",
"tooltip": None,
"setup_teardown_type": None,
"type": "task",
"operator": "EmptyOperator",
"asset_condition_type": None,
},
{
"children": None,
"id": "task2",
"is_mapped": None,
"label": "task2",
"tooltip": None,
"setup_teardown_type": None,
"type": "task",
"operator": "EmptyOperator",
"asset_condition_type": None,
},
{
"children": None,
"id": "task3",
"is_mapped": None,
"label": "task3",
"tooltip": None,
"setup_teardown_type": None,
"type": "task",
"operator": "EmptyOperator",
"asset_condition_type": None,
},
],
"arrange": "LR",
}
SECOND_VERSION_DAG_RESPONSE: dict = copy.deepcopy(LATEST_VERSION_DAG_RESPONSE)
SECOND_VERSION_DAG_RESPONSE["nodes"] = [
node for node in SECOND_VERSION_DAG_RESPONSE["nodes"] if node["id"] != "task3"
]
FIRST_VERSION_DAG_RESPONSE: dict = copy.deepcopy(SECOND_VERSION_DAG_RESPONSE)
FIRST_VERSION_DAG_RESPONSE["nodes"] = [
node for node in FIRST_VERSION_DAG_RESPONSE["nodes"] if node["id"] != "task2"
]


@pytest.fixture(autouse=True, scope="module")
Expand Down Expand Up @@ -81,6 +132,28 @@ def make_dag(dag_maker, session, time_machine):
dag_maker.sync_dagbag_to_db()


@pytest.fixture
def make_dag_with_multiple_version(dag_maker, session):
with dag_maker(DAG_ID) as dag:
EmptyOperator(task_id="task1")
dag.sync_to_db()
SerializedDagModel.write_dag(dag, bundle_name="dag_maker")
with dag_maker(DAG_ID) as dag2:
EmptyOperator(task_id="task1")
EmptyOperator(task_id="task2")
dag2.sync_to_db()
SerializedDagModel.write_dag(dag2, bundle_name="dag_maker")
with dag_maker(DAG_ID) as dag3:
EmptyOperator(task_id="task1")
EmptyOperator(task_id="task2")
EmptyOperator(task_id="task3")
dag3.sync_to_db()
SerializedDagModel.write_dag(dag3, bundle_name="dag_maker")

latest_version = DagVersion.get_latest_version(dag.dag_id)
assert latest_version.version_number == 3


class TestStructureDataEndpoint:
@pytest.mark.parametrize(
"params, expected",
Expand Down Expand Up @@ -407,7 +480,46 @@ def test_should_return_200(self, test_client, params, expected):
assert response.status_code == 200
assert not DeepDiff(response.json(), expected, ignore_order=True)

@pytest.mark.parametrize(
"params, expected",
[
pytest.param(
{"dag_id": DAG_ID},
LATEST_VERSION_DAG_RESPONSE,
id="get_default_version",
),
pytest.param(
{"dag_id": DAG_ID, "dag_version": 1},
FIRST_VERSION_DAG_RESPONSE,
id="get_oldest_version",
),
pytest.param(
{"dag_id": DAG_ID, "dag_version": 2},
SECOND_VERSION_DAG_RESPONSE,
id="get_specific_version",
),
pytest.param(
{"dag_id": DAG_ID, "dag_version": 3},
LATEST_VERSION_DAG_RESPONSE,
id="get_latest_version",
),
],
)
def test_should_return_200_with_multiple_versions(
self, test_client, make_dag_with_multiple_version, params, expected
):
response = test_client.get("/ui/structure/structure_data", params=params)
assert response.status_code == 200
assert not DeepDiff(response.json(), expected, ignore_order=True)

def test_should_return_404(self, test_client):
response = test_client.get("/ui/structure/structure_data", params={"dag_id": "not_existing"})
assert response.status_code == 404
assert response.json()["detail"] == "Dag with id not_existing was not found"

def test_should_return_404_when_dag_version_not_found(self, test_client):
response = test_client.get(
"/ui/structure/structure_data", params={"dag_id": DAG_ID, "dag_version": 999}
)
assert response.status_code == 404
assert response.json()["detail"] == "Dag with id test_dag_id with version 999 was not found"

0 comments on commit b127dbb

Please sign in to comment.