Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-65 | Filter dag structure by dag version #46279

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,14 @@ paths:
type: boolean
default: false
title: External Dependencies
- name: dag_version
in: query
required: false
schema:
anyOf:
- type: integer
- type: 'null'
title: Dag Version
responses:
'200':
description: Successful Response
Expand Down
30 changes: 24 additions & 6 deletions airflow/api_fastapi/core_api/routes/ui/structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@
# under the License.
from __future__ import annotations

from fastapi import HTTPException, Request, status
from fastapi import HTTPException, status
from sqlalchemy import select

from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.common.parameters import QueryIncludeDownstream, QueryIncludeUpstream
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.ui.structure import StructureDataResponse
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.services.ui.structure import get_upstream_assets
from airflow.models.dag_version import DagVersion
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils.dag_edges import dag_edges
from airflow.utils.task_group import task_group_to_dict
Expand All @@ -38,17 +40,33 @@
def structure_data(
session: SessionDep,
dag_id: str,
request: Request,
include_upstream: QueryIncludeUpstream = False,
include_downstream: QueryIncludeDownstream = False,
root: str | None = None,
external_dependencies: bool = False,
dag_version: int | None = None,
) -> StructureDataResponse:
"""Get Structure Data."""
dag = request.app.state.dag_bag.get_dag(dag_id)

if dag is None:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} was not found")
if dag_version is None:
dag_version_model = DagVersion.get_latest_version(dag_id)
if dag_version_model is None:
raise HTTPException(
status.HTTP_404_NOT_FOUND,
f"Dag with id {dag_id} was not found",
)
dag_version = dag_version_model.version_number

serialized_dag: SerializedDagModel = session.scalar(
select(SerializedDagModel)
.join(DagVersion)
.where(SerializedDagModel.dag_id == dag_id, DagVersion.version_number == dag_version)
)
if serialized_dag is None:
raise HTTPException(
status.HTTP_404_NOT_FOUND,
f"Dag with id {dag_id} and version {dag_version} was not found",
)
dag = serialized_dag.dag

if root:
dag = dag.partial_subset(
Expand Down
4 changes: 3 additions & 1 deletion airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -359,12 +359,14 @@ export const useStructureServiceStructureDataKey = "StructureServiceStructureDat
export const UseStructureServiceStructureDataKeyFn = (
{
dagId,
dagVersion,
externalDependencies,
includeDownstream,
includeUpstream,
root,
}: {
dagId: string;
dagVersion?: number;
externalDependencies?: boolean;
includeDownstream?: boolean;
includeUpstream?: boolean;
Expand All @@ -373,7 +375,7 @@ export const UseStructureServiceStructureDataKeyFn = (
queryKey?: Array<unknown>,
) => [
useStructureServiceStructureDataKey,
...(queryKey ?? [{ dagId, externalDependencies, includeDownstream, includeUpstream, root }]),
...(queryKey ?? [{ dagId, dagVersion, externalDependencies, includeDownstream, includeUpstream, root }]),
];
export type BackfillServiceListBackfillsDefaultResponse = Awaited<
ReturnType<typeof BackfillService.listBackfills>
Expand Down
5 changes: 5 additions & 0 deletions airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -475,19 +475,22 @@ export const prefetchUseDashboardServiceHistoricalMetrics = (
* @param data.includeDownstream
* @param data.root
* @param data.externalDependencies
* @param data.dagVersion
* @returns StructureDataResponse Successful Response
* @throws ApiError
*/
export const prefetchUseStructureServiceStructureData = (
queryClient: QueryClient,
{
dagId,
dagVersion,
externalDependencies,
includeDownstream,
includeUpstream,
root,
}: {
dagId: string;
dagVersion?: number;
externalDependencies?: boolean;
includeDownstream?: boolean;
includeUpstream?: boolean;
Expand All @@ -497,6 +500,7 @@ export const prefetchUseStructureServiceStructureData = (
queryClient.prefetchQuery({
queryKey: Common.UseStructureServiceStructureDataKeyFn({
dagId,
dagVersion,
externalDependencies,
includeDownstream,
includeUpstream,
Expand All @@ -505,6 +509,7 @@ export const prefetchUseStructureServiceStructureData = (
queryFn: () =>
StructureService.structureData({
dagId,
dagVersion,
externalDependencies,
includeDownstream,
includeUpstream,
Expand Down
6 changes: 5 additions & 1 deletion airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,7 @@ export const useDashboardServiceHistoricalMetrics = <
* @param data.includeDownstream
* @param data.root
* @param data.externalDependencies
* @param data.dagVersion
* @returns StructureDataResponse Successful Response
* @throws ApiError
*/
Expand All @@ -595,12 +596,14 @@ export const useStructureServiceStructureData = <
>(
{
dagId,
dagVersion,
externalDependencies,
includeDownstream,
includeUpstream,
root,
}: {
dagId: string;
dagVersion?: number;
externalDependencies?: boolean;
includeDownstream?: boolean;
includeUpstream?: boolean;
Expand All @@ -611,12 +614,13 @@ export const useStructureServiceStructureData = <
) =>
useQuery<TData, TError>({
queryKey: Common.UseStructureServiceStructureDataKeyFn(
{ dagId, externalDependencies, includeDownstream, includeUpstream, root },
{ dagId, dagVersion, externalDependencies, includeDownstream, includeUpstream, root },
queryKey,
),
queryFn: () =>
StructureService.structureData({
dagId,
dagVersion,
externalDependencies,
includeDownstream,
includeUpstream,
Expand Down
6 changes: 5 additions & 1 deletion airflow/ui/openapi-gen/queries/suspense.ts
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,7 @@ export const useDashboardServiceHistoricalMetricsSuspense = <
* @param data.includeDownstream
* @param data.root
* @param data.externalDependencies
* @param data.dagVersion
* @returns StructureDataResponse Successful Response
* @throws ApiError
*/
Expand All @@ -574,12 +575,14 @@ export const useStructureServiceStructureDataSuspense = <
>(
{
dagId,
dagVersion,
externalDependencies,
includeDownstream,
includeUpstream,
root,
}: {
dagId: string;
dagVersion?: number;
externalDependencies?: boolean;
includeDownstream?: boolean;
includeUpstream?: boolean;
Expand All @@ -590,12 +593,13 @@ export const useStructureServiceStructureDataSuspense = <
) =>
useSuspenseQuery<TData, TError>({
queryKey: Common.UseStructureServiceStructureDataKeyFn(
{ dagId, externalDependencies, includeDownstream, includeUpstream, root },
{ dagId, dagVersion, externalDependencies, includeDownstream, includeUpstream, root },
queryKey,
),
queryFn: () =>
StructureService.structureData({
dagId,
dagVersion,
externalDependencies,
includeDownstream,
includeUpstream,
Expand Down
2 changes: 2 additions & 0 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,7 @@ export class StructureService {
* @param data.includeDownstream
* @param data.root
* @param data.externalDependencies
* @param data.dagVersion
* @returns StructureDataResponse Successful Response
* @throws ApiError
*/
Expand All @@ -753,6 +754,7 @@ export class StructureService {
include_downstream: data.includeDownstream,
root: data.root,
external_dependencies: data.externalDependencies,
dag_version: data.dagVersion,
},
errors: {
404: "Not Found",
Expand Down
1 change: 1 addition & 0 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1665,6 +1665,7 @@ export type HistoricalMetricsResponse = HistoricalMetricDataResponse;

export type StructureDataData = {
dagId: string;
dagVersion?: number | null;
externalDependencies?: boolean;
includeDownstream?: boolean;
includeUpstream?: boolean;
Expand Down
103 changes: 103 additions & 0 deletions tests/api_fastapi/core_api/routes/ui/test_structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
# under the License.
from __future__ import annotations

import copy

import pendulum
import pytest
from deepdiff import DeepDiff

from airflow.models import DagBag
from airflow.models.serialized_dag import SerializedDagModel
from airflow.operators.empty import EmptyOperator
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.standard.sensors.external_task import ExternalTaskSensor
Expand All @@ -33,6 +36,53 @@

DAG_ID = "test_dag_id"
DAG_ID_EXTERNAL_TRIGGER = "external_trigger"
LATEST_VERSION_DAG_RESPONSE: dict = {
"edges": [],
"nodes": [
{
"children": None,
"id": "task1",
"is_mapped": None,
"label": "task1",
"tooltip": None,
"setup_teardown_type": None,
"type": "task",
"operator": "EmptyOperator",
"asset_condition_type": None,
},
{
"children": None,
"id": "task2",
"is_mapped": None,
"label": "task2",
"tooltip": None,
"setup_teardown_type": None,
"type": "task",
"operator": "EmptyOperator",
"asset_condition_type": None,
},
{
"children": None,
"id": "task3",
"is_mapped": None,
"label": "task3",
"tooltip": None,
"setup_teardown_type": None,
"type": "task",
"operator": "EmptyOperator",
"asset_condition_type": None,
},
],
"arrange": "LR",
}
SECOND_VERSION_DAG_RESPONSE: dict = copy.deepcopy(LATEST_VERSION_DAG_RESPONSE)
SECOND_VERSION_DAG_RESPONSE["nodes"] = [
node for node in SECOND_VERSION_DAG_RESPONSE["nodes"] if node["id"] != "task3"
]
FIRST_VERSION_DAG_RESPONSE: dict = copy.deepcopy(SECOND_VERSION_DAG_RESPONSE)
FIRST_VERSION_DAG_RESPONSE["nodes"] = [
node for node in FIRST_VERSION_DAG_RESPONSE["nodes"] if node["id"] != "task2"
]


@pytest.fixture(autouse=True, scope="module")
Expand Down Expand Up @@ -81,6 +131,21 @@ def make_dag(dag_maker, session, time_machine):
dag_maker.sync_dagbag_to_db()


@pytest.fixture
def make_dag_with_multiple_version(dag_maker):
"""
Create DAG with multiple versions

Version 1 will have 1 task, version 2 will have 2 tasks, and version 3 will have 3 tasks.
"""
for version_number in range(1, 4):
with dag_maker(DAG_ID) as dag:
for i in range(version_number):
EmptyOperator(task_id=f"task{i+1}")
dag.sync_to_db()
SerializedDagModel.write_dag(dag, bundle_name="dag_maker")


class TestStructureDataEndpoint:
@pytest.mark.parametrize(
"params, expected",
Expand Down Expand Up @@ -407,7 +472,45 @@ def test_should_return_200(self, test_client, params, expected):
assert response.status_code == 200
assert not DeepDiff(response.json(), expected, ignore_order=True)

@pytest.mark.parametrize(
"params, expected",
[
pytest.param(
{"dag_id": DAG_ID},
LATEST_VERSION_DAG_RESPONSE,
id="get_default_version",
),
pytest.param(
{"dag_id": DAG_ID, "dag_version": 1},
FIRST_VERSION_DAG_RESPONSE,
id="get_oldest_version",
),
pytest.param(
{"dag_id": DAG_ID, "dag_version": 2},
SECOND_VERSION_DAG_RESPONSE,
id="get_specific_version",
),
pytest.param(
{"dag_id": DAG_ID, "dag_version": 3},
LATEST_VERSION_DAG_RESPONSE,
id="get_latest_version",
),
],
)
@pytest.mark.usefixtures("make_dag_with_multiple_version")
def test_should_return_200_with_multiple_versions(self, test_client, params, expected):
response = test_client.get("/ui/structure/structure_data", params=params)
assert response.status_code == 200
assert response.json() == expected

def test_should_return_404(self, test_client):
response = test_client.get("/ui/structure/structure_data", params={"dag_id": "not_existing"})
assert response.status_code == 404
assert response.json()["detail"] == "Dag with id not_existing was not found"

def test_should_return_404_when_dag_version_not_found(self, test_client):
response = test_client.get(
"/ui/structure/structure_data", params={"dag_id": DAG_ID, "dag_version": 999}
)
assert response.status_code == 404
assert response.json()["detail"] == "Dag with id test_dag_id and version 999 was not found"