diff --git a/airflow/api_fastapi/core_api/datamodels/assets.py b/airflow/api_fastapi/core_api/datamodels/assets.py index 9721157998564..c7b7bec034c06 100644 --- a/airflow/api_fastapi/core_api/datamodels/assets.py +++ b/airflow/api_fastapi/core_api/datamodels/assets.py @@ -102,6 +102,9 @@ class AssetEventResponse(BaseModel): id: int asset_id: int + uri: str | None = Field(alias="uri", default=None) + name: str | None = Field(alias="name", default=None) + group: str | None = Field(alias="group", default=None) extra: dict | None = None source_task_id: str | None = None source_dag_id: str | None = None diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index bc9344e77a2f1..9611e2af487cc 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -6460,6 +6460,21 @@ components: asset_id: type: integer title: Asset Id + uri: + anyOf: + - type: string + - type: 'null' + title: Uri + name: + anyOf: + - type: string + - type: 'null' + title: Name + group: + anyOf: + - type: string + - type: 'null' + title: Group extra: anyOf: - type: object diff --git a/airflow/models/asset.py b/airflow/models/asset.py index 1ac14fb6ec2f2..212a0b3a84c6c 100644 --- a/airflow/models/asset.py +++ b/airflow/models/asset.py @@ -714,6 +714,14 @@ class AssetEvent(Base): def uri(self): return self.asset.uri + @property + def group(self): + return self.asset.group + + @property + def name(self): + return self.asset.name + def __repr__(self) -> str: args = [] for attr in [ diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index bb86ab1ee5df7..63eb03d315653 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -180,6 +180,39 @@ export const $AssetEventResponse = { type: "integer", title: "Asset Id", }, + uri: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Uri", + }, + name: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Name", + }, + group: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Group", + }, extra: { anyOf: [ { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 37a2826419a28..e951424caca2d 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -60,6 +60,9 @@ export type AssetEventCollectionResponse = { export type AssetEventResponse = { id: number; asset_id: number; + uri?: string | null; + name?: string | null; + group?: string | null; extra?: { [key: string]: unknown; } | null; diff --git a/airflow/ui/src/pages/Dashboard/HistoricalMetrics/AssetEvent.tsx b/airflow/ui/src/pages/Dashboard/HistoricalMetrics/AssetEvent.tsx new file mode 100644 index 0000000000000..51dd47ecacfdf --- /dev/null +++ b/airflow/ui/src/pages/Dashboard/HistoricalMetrics/AssetEvent.tsx @@ -0,0 +1,75 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Box, Text, HStack } from "@chakra-ui/react"; +import { FiDatabase } from "react-icons/fi"; +import { MdOutlineAccountTree } from "react-icons/md"; +import { Link } from "react-router-dom"; + +import type { AssetEventResponse } from "openapi/requests/types.gen"; +import Time from "src/components/Time"; +import { Tooltip } from "src/components/ui"; + +export const AssetEvent = ({ event }: { readonly event: AssetEventResponse }) => { + const hasDagRuns = event.created_dagruns.length > 0; + const source = event.extra?.from_rest_api === true ? "API" : ""; + + return ( + + + + + + + group: {event.group ?? ""} + uri: {event.uri ?? ""} + + } + showArrow + > + {event.name ?? ""} + + + + Source: + {source === "" ? ( + + {event.source_dag_id} + + ) : ( + source + )} + + + Triggered Dag Runs: + {hasDagRuns ? ( + + {event.created_dagruns[0]?.dag_id} + + ) : ( + "~" + )} + + + ); +}; diff --git a/airflow/ui/src/pages/Dashboard/HistoricalMetrics/AssetEvents.tsx b/airflow/ui/src/pages/Dashboard/HistoricalMetrics/AssetEvents.tsx new file mode 100644 index 0000000000000..21c425f0c5a76 --- /dev/null +++ b/airflow/ui/src/pages/Dashboard/HistoricalMetrics/AssetEvents.tsx @@ -0,0 +1,93 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Box, Heading, Flex, HStack, VStack, StackSeparator, Skeleton } from "@chakra-ui/react"; +import { createListCollection } from "@chakra-ui/react/collection"; + +import { useAssetServiceGetAssetEvents } from "openapi/queries"; +import { MetricsBadge } from "src/components/MetricsBadge"; +import { Select } from "src/components/ui"; + +import { AssetEvent } from "./AssetEvent"; + +type AssetEventProps = { + readonly assetSortBy: string; + readonly endDate: string; + readonly setAssetSortBy: React.Dispatch>; + readonly startDate: string; +}; + +export const AssetEvents = ({ assetSortBy, endDate, setAssetSortBy, startDate }: AssetEventProps) => { + const { data, isLoading } = useAssetServiceGetAssetEvents({ + limit: 6, + orderBy: assetSortBy, + timestampGte: startDate, + timestampLte: endDate, + }); + + const assetSortOptions = createListCollection({ + items: [ + { label: "Newest first", value: "-timestamp" }, + { label: "Oldest first", value: "timestamp" }, + ], + }); + + return ( + + + + + + Asset Events + + + setAssetSortBy(option.value[0] as string)} + width={130} + > + + + + + + {assetSortOptions.items.map((option) => ( + + {option.label} + + ))} + + + + {isLoading ? ( + }> + {Array.from({ length: 5 }, (_, index) => index).map((index) => ( + + ))} + + ) : ( + }> + {data?.asset_events.map((event) => )} + + )} + + ); +}; diff --git a/airflow/ui/src/pages/Dashboard/HistoricalMetrics/HistoricalMetrics.tsx b/airflow/ui/src/pages/Dashboard/HistoricalMetrics/HistoricalMetrics.tsx index ac2201bb22c23..76f6c6b9ef1cf 100644 --- a/airflow/ui/src/pages/Dashboard/HistoricalMetrics/HistoricalMetrics.tsx +++ b/airflow/ui/src/pages/Dashboard/HistoricalMetrics/HistoricalMetrics.tsx @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -import { Box, VStack } from "@chakra-ui/react"; +import { Box, VStack, SimpleGrid, GridItem } from "@chakra-ui/react"; import dayjs from "dayjs"; import { useState } from "react"; @@ -24,6 +24,7 @@ import { useDashboardServiceHistoricalMetrics } from "openapi/queries"; import { ErrorAlert } from "src/components/ErrorAlert"; import TimeRangeSelector from "src/components/TimeRangeSelector"; +import { AssetEvents } from "./AssetEvents"; import { DagRunMetrics } from "./DagRunMetrics"; import { MetricSectionSkeleton } from "./MetricSectionSkeleton"; import { TaskInstanceMetrics } from "./TaskInstanceMetrics"; @@ -34,6 +35,7 @@ export const HistoricalMetrics = () => { const now = dayjs(); const [startDate, setStartDate] = useState(now.subtract(Number(defaultHour), "hour").toISOString()); const [endDate, setEndDate] = useState(now.toISOString()); + const [assetSortBy, setAssetSortBy] = useState("-timestamp"); const { data, error, isLoading } = useDashboardServiceHistoricalMetrics({ endDate, @@ -59,13 +61,25 @@ export const HistoricalMetrics = () => { setStartDate={setStartDate} startDate={startDate} /> - {isLoading ? : undefined} - {!isLoading && data !== undefined && ( - - - - - )} + + + {isLoading ? : undefined} + {!isLoading && data !== undefined && ( + + + + + )} + + + + + ); diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py b/tests/api_fastapi/core_api/routes/public/test_assets.py index 563fbac961986..a48c0da87fc7a 100644 --- a/tests/api_fastapi/core_api/routes/public/test_assets.py +++ b/tests/api_fastapi/core_api/routes/public/test_assets.py @@ -542,7 +542,10 @@ def test_should_respond_200(self, test_client, session): { "id": 1, "asset_id": 1, + "uri": "s3://bucket/key/1", "extra": {"foo": "bar"}, + "group": "asset", + "name": "simple1", "source_task_id": "source_task_id", "source_dag_id": "source_dag_id", "source_run_id": "source_run_id_1", @@ -564,6 +567,9 @@ def test_should_respond_200(self, test_client, session): { "id": 2, "asset_id": 2, + "uri": "s3://bucket/key/2", + "group": "asset", + "name": "simple2", "extra": {"foo": "bar"}, "source_task_id": "source_task_id", "source_dag_id": "source_dag_id", @@ -704,6 +710,9 @@ def test_should_mask_sensitive_extra(self, test_client, session): { "id": 1, "asset_id": 1, + "uri": "s3://bucket/key/1", + "group": "asset", + "name": "sensitive1", "extra": {"password": "***"}, "source_task_id": "source_task_id", "source_dag_id": "source_dag_id", @@ -726,6 +735,9 @@ def test_should_mask_sensitive_extra(self, test_client, session): { "id": 2, "asset_id": 2, + "uri": "s3://bucket/key/2", + "group": "asset", + "name": "sensitive2", "extra": {"password": "***"}, "source_task_id": "source_task_id", "source_dag_id": "source_dag_id", @@ -912,6 +924,9 @@ def test_should_respond_200(self, test_client, session): assert response.json() == { "id": mock.ANY, "asset_id": 1, + "uri": "s3://bucket/key/1", + "group": "asset", + "name": "simple1", "extra": {"foo": "bar", "from_rest_api": True}, "source_task_id": None, "source_dag_id": None, @@ -938,6 +953,9 @@ def test_should_mask_sensitive_extra(self, test_client, session): assert response.json() == { "id": mock.ANY, "asset_id": 1, + "uri": "s3://bucket/key/1", + "group": "asset", + "name": "simple1", "extra": {"password": "***", "from_rest_api": True}, "source_task_id": None, "source_dag_id": None, diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index fc171150534c2..dd32873b084c9 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -1016,8 +1016,11 @@ def test_should_respond_200(self, test_client, dag_maker, session): { "timestamp": from_datetime_to_zulu(event.timestamp), "asset_id": asset1_id, + "uri": "file:///da1", "extra": {}, "id": event.id, + "group": "asset", + "name": "ds1", "source_dag_id": ti.dag_id, "source_map_index": ti.map_index, "source_run_id": ti.run_id,