From dcd41f60f1c9b5583b49bfb49b6d85c640a2892c Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Tue, 10 Dec 2024 23:13:08 +0800 Subject: [PATCH] Switch asset endpoints to use id instead of uri (#44801) * Switch asset endpoints to use id instead of uri With the addition of Asset.name, we can't guarantee the uri to be unique anymore. Using uri also presents addition issues on endpoints since it conflicts with subroutes. * Does not need annnotation in route format --- .../api_fastapi/core_api/datamodels/assets.py | 5 +- .../core_api/openapi/v1-generated.yaml | 56 ++++---- .../core_api/routes/public/assets.py | 121 +++++++----------- airflow/ui/openapi-gen/queries/common.ts | 18 +-- airflow/ui/openapi-gen/queries/prefetch.ts | 34 ++--- airflow/ui/openapi-gen/queries/queries.ts | 51 ++++---- airflow/ui/openapi-gen/queries/suspense.ts | 31 ++--- .../ui/openapi-gen/requests/schemas.gen.ts | 23 ++-- .../ui/openapi-gen/requests/services.gen.ts | 30 ++--- airflow/ui/openapi-gen/requests/types.gen.ts | 21 ++- .../core_api/routes/public/test_assets.py | 101 +++++---------- .../core_api/routes/public/test_dag_run.py | 3 +- 12 files changed, 211 insertions(+), 283 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/assets.py b/airflow/api_fastapi/core_api/datamodels/assets.py index 047f3f557bc25..9721157998564 100644 --- a/airflow/api_fastapi/core_api/datamodels/assets.py +++ b/airflow/api_fastapi/core_api/datamodels/assets.py @@ -102,7 +102,6 @@ class AssetEventResponse(BaseModel): id: int asset_id: int - uri: str extra: dict | None = None source_task_id: str | None = None source_dag_id: str | None = None @@ -127,8 +126,8 @@ class AssetEventCollectionResponse(BaseModel): class QueuedEventResponse(BaseModel): """Queued Event serializer for responses..""" - uri: str dag_id: str + asset_id: int created_at: datetime @@ -142,7 +141,7 @@ class QueuedEventCollectionResponse(BaseModel): class CreateAssetEventsBody(BaseModel): """Create asset events request.""" - uri: str + asset_id: int extra: dict = Field(default_factory=dict) @field_validator("extra", mode="after") diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 3b6d55c93137c..00f1c5aa5106f 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -594,7 +594,7 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /public/assets/queuedEvents/{uri}: + /public/assets/{asset_id}/queuedEvents: get: tags: - Asset @@ -602,12 +602,12 @@ paths: description: Get queued asset events for an asset. operationId: get_asset_queued_events parameters: - - name: uri + - name: asset_id in: path required: true schema: - type: string - title: Uri + type: integer + title: Asset Id - name: before in: query required: false @@ -654,12 +654,12 @@ paths: description: Delete queued asset events for an asset. operationId: delete_asset_queued_events parameters: - - name: uri + - name: asset_id in: path required: true schema: - type: string - title: Uri + type: integer + title: Asset Id - name: before in: query required: false @@ -695,7 +695,7 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /public/assets/{uri}: + /public/assets/{asset_id}: get: tags: - Asset @@ -703,12 +703,12 @@ paths: description: Get an asset. operationId: get_asset parameters: - - name: uri + - name: asset_id in: path required: true schema: - type: string - title: Uri + type: integer + title: Asset Id responses: '200': description: Successful Response @@ -846,7 +846,7 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /public/dags/{dag_id}/assets/queuedEvents/{uri}: + /public/dags/{dag_id}/assets/{asset_id}/queuedEvents: get: tags: - Asset @@ -860,12 +860,12 @@ paths: schema: type: string title: Dag Id - - name: uri + - name: asset_id in: path required: true schema: - type: string - title: Uri + type: integer + title: Asset Id - name: before in: query required: false @@ -918,12 +918,12 @@ paths: schema: type: string title: Dag Id - - name: uri + - name: asset_id in: path required: true schema: - type: string - title: Uri + type: integer + title: Asset Id - name: before in: query required: false @@ -5982,9 +5982,6 @@ components: asset_id: type: integer title: Asset Id - uri: - type: string - title: Uri extra: anyOf: - type: object @@ -6021,7 +6018,6 @@ components: required: - id - asset_id - - uri - source_map_index - created_dagruns - timestamp @@ -6548,16 +6544,16 @@ components: description: Connection Test serializer for responses. CreateAssetEventsBody: properties: - uri: - type: string - title: Uri + asset_id: + type: integer + title: Asset Id extra: type: object title: Extra additionalProperties: false type: object required: - - uri + - asset_id title: CreateAssetEventsBody description: Create asset events request. DAGCollectionResponse: @@ -8273,20 +8269,20 @@ components: description: Queued Event Collection serializer for responses. QueuedEventResponse: properties: - uri: - type: string - title: Uri dag_id: type: string title: Dag Id + asset_id: + type: integer + title: Asset Id created_at: type: string format: date-time title: Created At type: object required: - - uri - dag_id + - asset_id - created_at title: QueuedEventResponse description: Queued Event serializer for responses.. diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py b/airflow/api_fastapi/core_api/routes/public/assets.py index 779c32f6a4c1b..c8cc9fb0f7695 100644 --- a/airflow/api_fastapi/core_api/routes/public/assets.py +++ b/airflow/api_fastapi/core_api/routes/public/assets.py @@ -59,20 +59,16 @@ def _generate_queued_event_where_clause( *, + asset_id: int | None = None, dag_id: str | None = None, - uri: str | None = None, before: datetime | str | None = None, ) -> list: """Get AssetDagRunQueue where clause.""" where_clause = [] if dag_id is not None: where_clause.append(AssetDagRunQueue.target_dag_id == dag_id) - if uri is not None: - where_clause.append( - AssetDagRunQueue.asset_id.in_( - select(AssetModel.id).where(AssetModel.uri == uri), - ), - ) + if asset_id is not None: + where_clause.append(AssetDagRunQueue.asset_id == asset_id) if before is not None: where_clause.append(AssetDagRunQueue.created_at < before) return where_clause @@ -227,9 +223,9 @@ def create_asset_event( session: SessionDep, ) -> AssetEventResponse: """Create asset events.""" - asset_model = session.scalar(select(AssetModel).where(AssetModel.uri == body.uri).limit(1)) + asset_model = session.scalar(select(AssetModel).where(AssetModel.id == body.asset_id).limit(1)) if not asset_model: - raise HTTPException(status.HTTP_404_NOT_FOUND, f"Asset with uri: `{body.uri}` was not found") + raise HTTPException(status.HTTP_404_NOT_FOUND, f"Asset with ID: `{body.asset_id}` was not found") timestamp = timezone.utcnow() assets_event = asset_manager.register_asset_change( @@ -240,41 +236,35 @@ def create_asset_event( ) if not assets_event: - raise HTTPException(status.HTTP_404_NOT_FOUND, f"Asset with uri: `{body.uri}` was not found") - return assets_event + raise HTTPException(status.HTTP_404_NOT_FOUND, f"Asset with ID: `{body.asset_id}` was not found") + return AssetEventResponse.model_validate(assets_event) @assets_router.get( - "/assets/queuedEvents/{uri:path}", - responses=create_openapi_http_exception_doc( - [ - status.HTTP_404_NOT_FOUND, - ] - ), + "/assets/{asset_id}/queuedEvents", + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), ) def get_asset_queued_events( - uri: str, + asset_id: int, session: SessionDep, before: OptionalDateTimeQuery = None, ) -> QueuedEventCollectionResponse: """Get queued asset events for an asset.""" - print(f"uri: {uri}") - where_clause = _generate_queued_event_where_clause(uri=uri, before=before) - query = ( - select(AssetDagRunQueue, AssetModel.uri) - .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id) - .where(*where_clause) - ) + where_clause = _generate_queued_event_where_clause(asset_id=asset_id, before=before) + query = select(AssetDagRunQueue).where(*where_clause) dag_asset_queued_events_select, total_entries = paginated_select(statement=query) - adrqs = session.execute(dag_asset_queued_events_select).all() + adrqs = session.scalars(dag_asset_queued_events_select).all() if not adrqs: - raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with uri: `{uri}` was not found") + raise HTTPException( + status.HTTP_404_NOT_FOUND, + f"Queue event with asset_id: `{asset_id}` was not found", + ) queued_events = [ - QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, uri=uri) - for adrq, uri in adrqs + QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, asset_id=adrq.asset_id) + for adrq in adrqs ] return QueuedEventCollectionResponse( @@ -284,33 +274,29 @@ def get_asset_queued_events( @assets_router.get( - "/assets/{uri:path}", + "/assets/{asset_id}", responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), ) def get_asset( - uri: str, + asset_id: int, session: SessionDep, ) -> AssetResponse: """Get an asset.""" asset = session.scalar( select(AssetModel) - .where(AssetModel.uri == uri) + .where(AssetModel.id == asset_id) .options(joinedload(AssetModel.consuming_dags), joinedload(AssetModel.producing_tasks)) ) if asset is None: - raise HTTPException(status.HTTP_404_NOT_FOUND, f"The Asset with uri: `{uri}` was not found") + raise HTTPException(status.HTTP_404_NOT_FOUND, f"The Asset with ID: `{asset_id}` was not found") return AssetResponse.model_validate(asset) @assets_router.get( "/dags/{dag_id}/assets/queuedEvents", - responses=create_openapi_http_exception_doc( - [ - status.HTTP_404_NOT_FOUND, - ] - ), + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), ) def get_dag_asset_queued_events( dag_id: str, @@ -319,20 +305,16 @@ def get_dag_asset_queued_events( ) -> QueuedEventCollectionResponse: """Get queued asset events for a DAG.""" where_clause = _generate_queued_event_where_clause(dag_id=dag_id, before=before) - query = ( - select(AssetDagRunQueue, AssetModel.uri) - .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id) - .where(*where_clause) - ) + query = select(AssetDagRunQueue).where(*where_clause) dag_asset_queued_events_select, total_entries = paginated_select(statement=query) - adrqs = session.execute(dag_asset_queued_events_select).all() + adrqs = session.scalars(dag_asset_queued_events_select).all() if not adrqs: raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with dag_id: `{dag_id}` was not found") queued_events = [ - QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, uri=uri) - for adrq, uri in adrqs + QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, asset_id=adrq.asset_id) + for adrq in adrqs ] return QueuedEventCollectionResponse( @@ -342,56 +324,47 @@ def get_dag_asset_queued_events( @assets_router.get( - "/dags/{dag_id}/assets/queuedEvents/{uri:path}", - responses=create_openapi_http_exception_doc( - [ - status.HTTP_404_NOT_FOUND, - ] - ), + "/dags/{dag_id}/assets/{asset_id}/queuedEvents", + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), ) def get_dag_asset_queued_event( dag_id: str, - uri: str, + asset_id: int, session: SessionDep, before: OptionalDateTimeQuery = None, ) -> QueuedEventResponse: """Get a queued asset event for a DAG.""" - where_clause = _generate_queued_event_where_clause(dag_id=dag_id, uri=uri, before=before) - query = ( - select(AssetDagRunQueue) - .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id) - .where(*where_clause) - ) + where_clause = _generate_queued_event_where_clause(dag_id=dag_id, asset_id=asset_id, before=before) + query = select(AssetDagRunQueue).where(*where_clause) adrq = session.scalar(query) if not adrq: raise HTTPException( status.HTTP_404_NOT_FOUND, - f"Queued event with dag_id: `{dag_id}` and asset uri: `{uri}` was not found", + f"Queued event with dag_id: `{dag_id}` and asset_id: `{asset_id}` was not found", ) - return QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, uri=uri) + return QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, asset_id=asset_id) @assets_router.delete( - "/assets/queuedEvents/{uri:path}", + "/assets/{asset_id}/queuedEvents", status_code=status.HTTP_204_NO_CONTENT, - responses=create_openapi_http_exception_doc( - [ - status.HTTP_404_NOT_FOUND, - ] - ), + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), ) def delete_asset_queued_events( - uri: str, + asset_id: int, session: SessionDep, before: OptionalDateTimeQuery = None, ): """Delete queued asset events for an asset.""" - where_clause = _generate_queued_event_where_clause(uri=uri, before=before) + where_clause = _generate_queued_event_where_clause(asset_id=asset_id, before=before) delete_stmt = delete(AssetDagRunQueue).where(*where_clause).execution_options(synchronize_session="fetch") result = session.execute(delete_stmt) if result.rowcount == 0: - raise HTTPException(status.HTTP_404_NOT_FOUND, detail=f"Queue event with uri: `{uri}` was not found") + raise HTTPException( + status.HTTP_404_NOT_FOUND, + detail=f"Queue event with asset_id: `{asset_id}` was not found", + ) @assets_router.delete( @@ -419,7 +392,7 @@ def delete_dag_asset_queued_events( @assets_router.delete( - "/dags/{dag_id}/assets/queuedEvents/{uri:path}", + "/dags/{dag_id}/assets/{asset_id}/queuedEvents", status_code=status.HTTP_204_NO_CONTENT, responses=create_openapi_http_exception_doc( [ @@ -430,12 +403,12 @@ def delete_dag_asset_queued_events( ) def delete_dag_asset_queued_event( dag_id: str, - uri: str, + asset_id: int, session: SessionDep, before: OptionalDateTimeQuery = None, ): """Delete a queued asset event for a DAG.""" - where_clause = _generate_queued_event_where_clause(dag_id=dag_id, before=before, uri=uri) + where_clause = _generate_queued_event_where_clause(dag_id=dag_id, before=before, asset_id=asset_id) delete_statement = ( delete(AssetDagRunQueue).where(*where_clause).execution_options(synchronize_session="fetch") ) @@ -443,5 +416,5 @@ def delete_dag_asset_queued_event( if result.rowcount == 0: raise HTTPException( status.HTTP_404_NOT_FOUND, - detail=f"Queued event with dag_id: `{dag_id}` and asset uri: `{uri}` was not found", + detail=f"Queued event with dag_id: `{dag_id}` and asset_id: `{asset_id}` was not found", ) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index b759c0f2f23a1..ed1d6066bdcd6 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -174,16 +174,16 @@ export const useAssetServiceGetAssetQueuedEventsKey = "AssetServiceGetAssetQueuedEvents"; export const UseAssetServiceGetAssetQueuedEventsKeyFn = ( { + assetId, before, - uri, }: { + assetId: number; before?: string; - uri: string; }, queryKey?: Array, ) => [ useAssetServiceGetAssetQueuedEventsKey, - ...(queryKey ?? [{ before, uri }]), + ...(queryKey ?? [{ assetId, before }]), ]; export type AssetServiceGetAssetDefaultResponse = Awaited< ReturnType @@ -195,12 +195,12 @@ export type AssetServiceGetAssetQueryResult< export const useAssetServiceGetAssetKey = "AssetServiceGetAsset"; export const UseAssetServiceGetAssetKeyFn = ( { - uri, + assetId, }: { - uri: string; + assetId: number; }, queryKey?: Array, -) => [useAssetServiceGetAssetKey, ...(queryKey ?? [{ uri }])]; +) => [useAssetServiceGetAssetKey, ...(queryKey ?? [{ assetId }])]; export type AssetServiceGetDagAssetQueuedEventsDefaultResponse = Awaited< ReturnType >; @@ -234,18 +234,18 @@ export const useAssetServiceGetDagAssetQueuedEventKey = "AssetServiceGetDagAssetQueuedEvent"; export const UseAssetServiceGetDagAssetQueuedEventKeyFn = ( { + assetId, before, dagId, - uri, }: { + assetId: number; before?: string; dagId: string; - uri: string; }, queryKey?: Array, ) => [ useAssetServiceGetDagAssetQueuedEventKey, - ...(queryKey ?? [{ before, dagId, uri }]), + ...(queryKey ?? [{ assetId, before, dagId }]), ]; export type ConfigServiceGetConfigsDefaultResponse = Awaited< ReturnType diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index eab8ea6b3567d..4bb01a7feaad8 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -219,7 +219,7 @@ export const prefetchUseAssetServiceGetAssetEvents = ( * Get Asset Queued Events * Get queued asset events for an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @param data.before * @returns QueuedEventCollectionResponse Successful Response * @throws ApiError @@ -227,36 +227,39 @@ export const prefetchUseAssetServiceGetAssetEvents = ( export const prefetchUseAssetServiceGetAssetQueuedEvents = ( queryClient: QueryClient, { + assetId, before, - uri, }: { + assetId: number; before?: string; - uri: string; }, ) => queryClient.prefetchQuery({ - queryKey: Common.UseAssetServiceGetAssetQueuedEventsKeyFn({ before, uri }), - queryFn: () => AssetService.getAssetQueuedEvents({ before, uri }), + queryKey: Common.UseAssetServiceGetAssetQueuedEventsKeyFn({ + assetId, + before, + }), + queryFn: () => AssetService.getAssetQueuedEvents({ assetId, before }), }); /** * Get Asset * Get an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @returns AssetResponse Successful Response * @throws ApiError */ export const prefetchUseAssetServiceGetAsset = ( queryClient: QueryClient, { - uri, + assetId, }: { - uri: string; + assetId: number; }, ) => queryClient.prefetchQuery({ - queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }), - queryFn: () => AssetService.getAsset({ uri }), + queryKey: Common.UseAssetServiceGetAssetKeyFn({ assetId }), + queryFn: () => AssetService.getAsset({ assetId }), }); /** * Get Dag Asset Queued Events @@ -289,7 +292,7 @@ export const prefetchUseAssetServiceGetDagAssetQueuedEvents = ( * Get a queued asset event for a DAG. * @param data The data for the request. * @param data.dagId - * @param data.uri + * @param data.assetId * @param data.before * @returns QueuedEventResponse Successful Response * @throws ApiError @@ -297,22 +300,23 @@ export const prefetchUseAssetServiceGetDagAssetQueuedEvents = ( export const prefetchUseAssetServiceGetDagAssetQueuedEvent = ( queryClient: QueryClient, { + assetId, before, dagId, - uri, }: { + assetId: number; before?: string; dagId: string; - uri: string; }, ) => queryClient.prefetchQuery({ queryKey: Common.UseAssetServiceGetDagAssetQueuedEventKeyFn({ + assetId, before, dagId, - uri, }), - queryFn: () => AssetService.getDagAssetQueuedEvent({ before, dagId, uri }), + queryFn: () => + AssetService.getDagAssetQueuedEvent({ assetId, before, dagId }), }); /** * Get Configs diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index e90ae58e6b0e3..f8c8bcd7a58b3 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -279,7 +279,7 @@ export const useAssetServiceGetAssetEvents = < * Get Asset Queued Events * Get queued asset events for an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @param data.before * @returns QueuedEventCollectionResponse Successful Response * @throws ApiError @@ -290,28 +290,29 @@ export const useAssetServiceGetAssetQueuedEvents = < TQueryKey extends Array = unknown[], >( { + assetId, before, - uri, }: { + assetId: number; before?: string; - uri: string; }, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, ) => useQuery({ queryKey: Common.UseAssetServiceGetAssetQueuedEventsKeyFn( - { before, uri }, + { assetId, before }, queryKey, ), - queryFn: () => AssetService.getAssetQueuedEvents({ before, uri }) as TData, + queryFn: () => + AssetService.getAssetQueuedEvents({ assetId, before }) as TData, ...options, }); /** * Get Asset * Get an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @returns AssetResponse Successful Response * @throws ApiError */ @@ -321,16 +322,16 @@ export const useAssetServiceGetAsset = < TQueryKey extends Array = unknown[], >( { - uri, + assetId, }: { - uri: string; + assetId: number; }, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, ) => useQuery({ - queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }, queryKey), - queryFn: () => AssetService.getAsset({ uri }) as TData, + queryKey: Common.UseAssetServiceGetAssetKeyFn({ assetId }, queryKey), + queryFn: () => AssetService.getAsset({ assetId }) as TData, ...options, }); /** @@ -371,7 +372,7 @@ export const useAssetServiceGetDagAssetQueuedEvents = < * Get a queued asset event for a DAG. * @param data The data for the request. * @param data.dagId - * @param data.uri + * @param data.assetId * @param data.before * @returns QueuedEventResponse Successful Response * @throws ApiError @@ -382,24 +383,24 @@ export const useAssetServiceGetDagAssetQueuedEvent = < TQueryKey extends Array = unknown[], >( { + assetId, before, dagId, - uri, }: { + assetId: number; before?: string; dagId: string; - uri: string; }, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, ) => useQuery({ queryKey: Common.UseAssetServiceGetDagAssetQueuedEventKeyFn( - { before, dagId, uri }, + { assetId, before, dagId }, queryKey, ), queryFn: () => - AssetService.getDagAssetQueuedEvent({ before, dagId, uri }) as TData, + AssetService.getDagAssetQueuedEvent({ assetId, before, dagId }) as TData, ...options, }); /** @@ -3801,7 +3802,7 @@ export const useVariableServicePatchVariable = < * Delete Asset Queued Events * Delete queued asset events for an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @param data.before * @returns void Successful Response * @throws ApiError @@ -3816,8 +3817,8 @@ export const useAssetServiceDeleteAssetQueuedEvents = < TData, TError, { + assetId: number; before?: string; - uri: string; }, TContext >, @@ -3828,15 +3829,15 @@ export const useAssetServiceDeleteAssetQueuedEvents = < TData, TError, { + assetId: number; before?: string; - uri: string; }, TContext >({ - mutationFn: ({ before, uri }) => + mutationFn: ({ assetId, before }) => AssetService.deleteAssetQueuedEvents({ + assetId, before, - uri, }) as unknown as Promise, ...options, }); @@ -3887,7 +3888,7 @@ export const useAssetServiceDeleteDagAssetQueuedEvents = < * Delete a queued asset event for a DAG. * @param data The data for the request. * @param data.dagId - * @param data.uri + * @param data.assetId * @param data.before * @returns void Successful Response * @throws ApiError @@ -3902,9 +3903,9 @@ export const useAssetServiceDeleteDagAssetQueuedEvent = < TData, TError, { + assetId: number; before?: string; dagId: string; - uri: string; }, TContext >, @@ -3915,17 +3916,17 @@ export const useAssetServiceDeleteDagAssetQueuedEvent = < TData, TError, { + assetId: number; before?: string; dagId: string; - uri: string; }, TContext >({ - mutationFn: ({ before, dagId, uri }) => + mutationFn: ({ assetId, before, dagId }) => AssetService.deleteDagAssetQueuedEvent({ + assetId, before, dagId, - uri, }) as unknown as Promise, ...options, }); diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index e7b213387607e..cd6d7c5a7983f 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -254,7 +254,7 @@ export const useAssetServiceGetAssetEventsSuspense = < * Get Asset Queued Events * Get queued asset events for an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @param data.before * @returns QueuedEventCollectionResponse Successful Response * @throws ApiError @@ -265,28 +265,29 @@ export const useAssetServiceGetAssetQueuedEventsSuspense = < TQueryKey extends Array = unknown[], >( { + assetId, before, - uri, }: { + assetId: number; before?: string; - uri: string; }, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, ) => useSuspenseQuery({ queryKey: Common.UseAssetServiceGetAssetQueuedEventsKeyFn( - { before, uri }, + { assetId, before }, queryKey, ), - queryFn: () => AssetService.getAssetQueuedEvents({ before, uri }) as TData, + queryFn: () => + AssetService.getAssetQueuedEvents({ assetId, before }) as TData, ...options, }); /** * Get Asset * Get an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @returns AssetResponse Successful Response * @throws ApiError */ @@ -296,16 +297,16 @@ export const useAssetServiceGetAssetSuspense = < TQueryKey extends Array = unknown[], >( { - uri, + assetId, }: { - uri: string; + assetId: number; }, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, ) => useSuspenseQuery({ - queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }, queryKey), - queryFn: () => AssetService.getAsset({ uri }) as TData, + queryKey: Common.UseAssetServiceGetAssetKeyFn({ assetId }, queryKey), + queryFn: () => AssetService.getAsset({ assetId }) as TData, ...options, }); /** @@ -346,7 +347,7 @@ export const useAssetServiceGetDagAssetQueuedEventsSuspense = < * Get a queued asset event for a DAG. * @param data The data for the request. * @param data.dagId - * @param data.uri + * @param data.assetId * @param data.before * @returns QueuedEventResponse Successful Response * @throws ApiError @@ -357,24 +358,24 @@ export const useAssetServiceGetDagAssetQueuedEventSuspense = < TQueryKey extends Array = unknown[], >( { + assetId, before, dagId, - uri, }: { + assetId: number; before?: string; dagId: string; - uri: string; }, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, ) => useSuspenseQuery({ queryKey: Common.UseAssetServiceGetDagAssetQueuedEventKeyFn( - { before, dagId, uri }, + { assetId, before, dagId }, queryKey, ), queryFn: () => - AssetService.getDagAssetQueuedEvent({ before, dagId, uri }) as TData, + AssetService.getDagAssetQueuedEvent({ assetId, before, dagId }) as TData, ...options, }); /** diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 622ee94d697fa..965355b0c1937 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -180,10 +180,6 @@ export const $AssetEventResponse = { type: "integer", title: "Asset Id", }, - uri: { - type: "string", - title: "Uri", - }, extra: { anyOf: [ { @@ -249,7 +245,6 @@ export const $AssetEventResponse = { required: [ "id", "asset_id", - "uri", "source_map_index", "created_dagruns", "timestamp", @@ -1025,9 +1020,9 @@ export const $ConnectionTestResponse = { export const $CreateAssetEventsBody = { properties: { - uri: { - type: "string", - title: "Uri", + asset_id: { + type: "integer", + title: "Asset Id", }, extra: { type: "object", @@ -1036,7 +1031,7 @@ export const $CreateAssetEventsBody = { }, additionalProperties: false, type: "object", - required: ["uri"], + required: ["asset_id"], title: "CreateAssetEventsBody", description: "Create asset events request.", } as const; @@ -3640,14 +3635,14 @@ export const $QueuedEventCollectionResponse = { export const $QueuedEventResponse = { properties: { - uri: { - type: "string", - title: "Uri", - }, dag_id: { type: "string", title: "Dag Id", }, + asset_id: { + type: "integer", + title: "Asset Id", + }, created_at: { type: "string", format: "date-time", @@ -3655,7 +3650,7 @@ export const $QueuedEventResponse = { }, }, type: "object", - required: ["uri", "dag_id", "created_at"], + required: ["dag_id", "asset_id", "created_at"], title: "QueuedEventResponse", description: "Queued Event serializer for responses..", } as const; diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index c855ca580924e..f0446ae0fd858 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -370,7 +370,7 @@ export class AssetService { * Get Asset Queued Events * Get queued asset events for an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @param data.before * @returns QueuedEventCollectionResponse Successful Response * @throws ApiError @@ -380,9 +380,9 @@ export class AssetService { ): CancelablePromise { return __request(OpenAPI, { method: "GET", - url: "/public/assets/queuedEvents/{uri}", + url: "/public/assets/{asset_id}/queuedEvents", path: { - uri: data.uri, + asset_id: data.assetId, }, query: { before: data.before, @@ -400,7 +400,7 @@ export class AssetService { * Delete Asset Queued Events * Delete queued asset events for an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @param data.before * @returns void Successful Response * @throws ApiError @@ -410,9 +410,9 @@ export class AssetService { ): CancelablePromise { return __request(OpenAPI, { method: "DELETE", - url: "/public/assets/queuedEvents/{uri}", + url: "/public/assets/{asset_id}/queuedEvents", path: { - uri: data.uri, + asset_id: data.assetId, }, query: { before: data.before, @@ -430,7 +430,7 @@ export class AssetService { * Get Asset * Get an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @returns AssetResponse Successful Response * @throws ApiError */ @@ -439,9 +439,9 @@ export class AssetService { ): CancelablePromise { return __request(OpenAPI, { method: "GET", - url: "/public/assets/{uri}", + url: "/public/assets/{asset_id}", path: { - uri: data.uri, + asset_id: data.assetId, }, errors: { 401: "Unauthorized", @@ -517,7 +517,7 @@ export class AssetService { * Get a queued asset event for a DAG. * @param data The data for the request. * @param data.dagId - * @param data.uri + * @param data.assetId * @param data.before * @returns QueuedEventResponse Successful Response * @throws ApiError @@ -527,10 +527,10 @@ export class AssetService { ): CancelablePromise { return __request(OpenAPI, { method: "GET", - url: "/public/dags/{dag_id}/assets/queuedEvents/{uri}", + url: "/public/dags/{dag_id}/assets/{asset_id}/queuedEvents", path: { dag_id: data.dagId, - uri: data.uri, + asset_id: data.assetId, }, query: { before: data.before, @@ -549,7 +549,7 @@ export class AssetService { * Delete a queued asset event for a DAG. * @param data The data for the request. * @param data.dagId - * @param data.uri + * @param data.assetId * @param data.before * @returns void Successful Response * @throws ApiError @@ -559,10 +559,10 @@ export class AssetService { ): CancelablePromise { return __request(OpenAPI, { method: "DELETE", - url: "/public/dags/{dag_id}/assets/queuedEvents/{uri}", + url: "/public/dags/{dag_id}/assets/{asset_id}/queuedEvents", path: { dag_id: data.dagId, - uri: data.uri, + asset_id: data.assetId, }, query: { before: data.before, diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 96a159bad073d..88f6a2fc763c1 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -60,7 +60,6 @@ export type AssetEventCollectionResponse = { export type AssetEventResponse = { id: number; asset_id: number; - uri: string; extra?: { [key: string]: unknown; } | null; @@ -265,7 +264,7 @@ export type ConnectionTestResponse = { * Create asset events request. */ export type CreateAssetEventsBody = { - uri: string; + asset_id: number; extra?: { [key: string]: unknown; }; @@ -899,8 +898,8 @@ export type QueuedEventCollectionResponse = { * Queued Event serializer for responses.. */ export type QueuedEventResponse = { - uri: string; dag_id: string; + asset_id: number; created_at: string; }; @@ -1353,21 +1352,21 @@ export type CreateAssetEventData = { export type CreateAssetEventResponse = AssetEventResponse; export type GetAssetQueuedEventsData = { + assetId: number; before?: string | null; - uri: string; }; export type GetAssetQueuedEventsResponse = QueuedEventCollectionResponse; export type DeleteAssetQueuedEventsData = { + assetId: number; before?: string | null; - uri: string; }; export type DeleteAssetQueuedEventsResponse = void; export type GetAssetData = { - uri: string; + assetId: number; }; export type GetAssetResponse = AssetResponse; @@ -1387,17 +1386,17 @@ export type DeleteDagAssetQueuedEventsData = { export type DeleteDagAssetQueuedEventsResponse = void; export type GetDagAssetQueuedEventData = { + assetId: number; before?: string | null; dagId: string; - uri: string; }; export type GetDagAssetQueuedEventResponse = QueuedEventResponse; export type DeleteDagAssetQueuedEventData = { + assetId: number; before?: string | null; dagId: string; - uri: string; }; export type DeleteDagAssetQueuedEventResponse = void; @@ -2226,7 +2225,7 @@ export type $OpenApiTs = { }; }; }; - "/public/assets/queuedEvents/{uri}": { + "/public/assets/{asset_id}/queuedEvents": { get: { req: GetAssetQueuedEventsData; res: { @@ -2278,7 +2277,7 @@ export type $OpenApiTs = { }; }; }; - "/public/assets/{uri}": { + "/public/assets/{asset_id}": { get: { req: GetAssetData; res: { @@ -2361,7 +2360,7 @@ export type $OpenApiTs = { }; }; }; - "/public/dags/{dag_id}/assets/queuedEvents/{uri}": { + "/public/dags/{dag_id}/assets/{asset_id}/queuedEvents": { get: { req: GetDagAssetQueuedEventData; res: { diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py b/tests/api_fastapi/core_api/routes/public/test_assets.py index c7c2c2405d2f6..46c769640a8f3 100644 --- a/tests/api_fastapi/core_api/routes/public/test_assets.py +++ b/tests/api_fastapi/core_api/routes/public/test_assets.py @@ -16,7 +16,6 @@ # under the License. from __future__ import annotations -import urllib from collections.abc import Generator from datetime import datetime from unittest import mock @@ -543,7 +542,6 @@ def test_should_respond_200(self, test_client, session): { "id": 1, "asset_id": 1, - "uri": "s3://bucket/key/1", "extra": {"foo": "bar"}, "source_task_id": "source_task_id", "source_dag_id": "source_dag_id", @@ -566,7 +564,6 @@ def test_should_respond_200(self, test_client, session): { "id": 2, "asset_id": 2, - "uri": "s3://bucket/key/2", "extra": {"foo": "bar"}, "source_task_id": "source_task_id", "source_dag_id": "source_dag_id", @@ -618,17 +615,17 @@ def test_order_by_raises_400_for_invalid_attr(self, test_client, session): assert response.json()["detail"] == msg @pytest.mark.parametrize( - "params, expected_asset_uris", + "params, expected_asset_ids", [ # Limit test data - ({"limit": "1"}, ["s3://bucket/key/1"]), - ({"limit": "100"}, [f"s3://bucket/key/{i}" for i in range(1, 101)]), + ({"limit": "1"}, [1]), + ({"limit": "100"}, list(range(1, 101))), # Offset test data - ({"offset": "1"}, [f"s3://bucket/key/{i}" for i in range(2, 102)]), - ({"offset": "3"}, [f"s3://bucket/key/{i}" for i in range(4, 104)]), + ({"offset": "1"}, list(range(2, 102))), + ({"offset": "3"}, list(range(4, 104))), ], ) - def test_limit_and_offset(self, test_client, params, expected_asset_uris): + def test_limit_and_offset(self, test_client, params, expected_asset_ids): self.create_assets(num=110) self.create_assets_events(num=110) self.create_dag_run(num=110) @@ -637,8 +634,8 @@ def test_limit_and_offset(self, test_client, params, expected_asset_uris): response = test_client.get("/public/assets/events", params=params) assert response.status_code == 200 - asset_uris = [asset["uri"] for asset in response.json()["asset_events"]] - assert asset_uris == expected_asset_uris + asset_ids = [asset["id"] for asset in response.json()["asset_events"]] + assert asset_ids == expected_asset_ids @pytest.mark.usefixtures("time_freezer") @pytest.mark.enable_redact @@ -655,7 +652,6 @@ def test_should_mask_sensitive_extra(self, test_client, session): { "id": 1, "asset_id": 1, - "uri": "s3://bucket/key/1", "extra": {"password": "***"}, "source_task_id": "source_task_id", "source_dag_id": "source_dag_id", @@ -678,7 +674,6 @@ def test_should_mask_sensitive_extra(self, test_client, session): { "id": 2, "asset_id": 2, - "uri": "s3://bucket/key/2", "extra": {"password": "***"}, "source_task_id": "source_task_id", "source_dag_id": "source_dag_id", @@ -704,24 +699,13 @@ def test_should_mask_sensitive_extra(self, test_client, session): class TestGetAssetEndpoint(TestAssets): - @pytest.mark.parametrize( - "url", - [ - urllib.parse.quote( - "s3://bucket/key/1", safe="" - ), # api should cover raw as well as unquoted case like legacy - "s3://bucket/key/1", - ], - ) @provide_session - def test_should_respond_200(self, test_client, url, session): + def test_should_respond_200(self, test_client, session): self.create_assets(num=1) assert session.query(AssetModel).count() == 1 tz_datetime_format = from_datetime_to_zulu_without_ms(DEFAULT_DATE) with assert_queries_count(6): - response = test_client.get( - f"/public/assets/{url}", - ) + response = test_client.get("/public/assets/1") assert response.status_code == 200 assert response.json() == { "id": 1, @@ -737,21 +721,16 @@ def test_should_respond_200(self, test_client, url, session): } def test_should_respond_404(self, test_client): - response = test_client.get( - f"/public/assets/{urllib.parse.quote('s3://bucket/key', safe='')}", - ) + response = test_client.get("/public/assets/1") assert response.status_code == 404 - assert response.json()["detail"] == "The Asset with uri: `s3://bucket/key` was not found" + assert response.json()["detail"] == "The Asset with ID: `1` was not found" @pytest.mark.usefixtures("time_freezer") @pytest.mark.enable_redact def test_should_mask_sensitive_extra(self, test_client, session): self.create_assets_with_sensitive_extra() tz_datetime_format = from_datetime_to_zulu_without_ms(DEFAULT_DATE) - uri = "s3://bucket/key/1" - response = test_client.get( - f"/public/assets/{uri}", - ) + response = test_client.get("/public/assets/1") assert response.status_code == 200 assert response.json() == { "id": 1, @@ -808,9 +787,9 @@ def test_should_respond_200(self, test_client, session, create_dummy_dag): assert response.json() == { "queued_events": [ { - "created_at": from_datetime_to_zulu_without_ms(DEFAULT_DATE), - "uri": "s3://bucket/key/1", + "asset_id": 1, "dag_id": "dag", + "created_at": from_datetime_to_zulu_without_ms(DEFAULT_DATE), } ], "total_entries": 1, @@ -875,13 +854,12 @@ class TestPostAssetEvents(TestAssets): @pytest.mark.usefixtures("time_freezer") def test_should_respond_200(self, test_client, session): self.create_assets() - event_payload = {"uri": "s3://bucket/key/1", "extra": {"foo": "bar"}} + event_payload = {"asset_id": 1, "extra": {"foo": "bar"}} response = test_client.post("/public/assets/events", json=event_payload) assert response.status_code == 200 assert response.json() == { "id": mock.ANY, "asset_id": 1, - "uri": "s3://bucket/key/1", "extra": {"foo": "bar", "from_rest_api": True}, "source_task_id": None, "source_dag_id": None, @@ -902,13 +880,12 @@ def test_invalid_attr_not_allowed(self, test_client, session): @pytest.mark.enable_redact def test_should_mask_sensitive_extra(self, test_client, session): self.create_assets(session) - event_payload = {"uri": "s3://bucket/key/1", "extra": {"password": "bar"}} + event_payload = {"asset_id": 1, "extra": {"password": "bar"}} response = test_client.post("/public/assets/events", json=event_payload) assert response.status_code == 200 assert response.json() == { "id": mock.ANY, "asset_id": 1, - "uri": "s3://bucket/key/1", "extra": {"password": "***", "from_rest_api": True}, "source_task_id": None, "source_dag_id": None, @@ -925,34 +902,26 @@ def test_should_respond_200(self, test_client, session, create_dummy_dag): dag, _ = create_dummy_dag() dag_id = dag.dag_id self.create_assets(session=session, num=1) - uri = "s3://bucket/key/1" asset_id = 1 self._create_asset_dag_run_queues(dag_id, asset_id, session) - response = test_client.get( - f"/public/assets/queuedEvents/{uri}", - ) + response = test_client.get(f"/public/assets/{asset_id}/queuedEvents/") assert response.status_code == 200 assert response.json() == { "queued_events": [ { - "created_at": from_datetime_to_zulu_without_ms(DEFAULT_DATE), - "uri": "s3://bucket/key/1", + "asset_id": asset_id, "dag_id": "dag", + "created_at": from_datetime_to_zulu_without_ms(DEFAULT_DATE), } ], "total_entries": 1, } def test_should_respond_404(self, test_client): - uri = "not_exists" - - response = test_client.get( - f"/public/assets/queuedEvents/{uri}", - ) - + response = test_client.get("/public/assets/1/queuedEvents") assert response.status_code == 404 - assert response.json()["detail"] == "Queue event with uri: `not_exists` was not found" + assert response.json()["detail"] == "Queue event with asset_id: `1` was not found" class TestDeleteAssetQueuedEvents(TestQueuedEventEndpoint): @@ -960,33 +929,25 @@ class TestDeleteAssetQueuedEvents(TestQueuedEventEndpoint): def test_should_respond_204(self, test_client, session, create_dummy_dag): dag, _ = create_dummy_dag() dag_id = dag.dag_id - uri = "s3://bucket/key/1" self.create_assets(session=session, num=1) asset_id = 1 self._create_asset_dag_run_queues(dag_id, asset_id, session) - response = test_client.delete( - f"/public/assets/queuedEvents/{uri}", - ) + assert session.get(AssetDagRunQueue, (asset_id, dag_id)) is not None + response = test_client.delete(f"/public/assets/{asset_id}/queuedEvents") assert response.status_code == 204 - assert session.query(AssetDagRunQueue).filter_by(asset_id=1).first() is None + assert session.get(AssetDagRunQueue, (asset_id, dag_id)) is None def test_should_respond_404(self, test_client): - uri = "not_exists" - - response = test_client.delete( - f"/public/assets/queuedEvents/{uri}", - ) - + response = test_client.delete("/public/assets/1/queuedEvents") assert response.status_code == 404 - assert response.json()["detail"] == "Queue event with uri: `not_exists` was not found" + assert response.json()["detail"] == "Queue event with asset_id: `1` was not found" class TestDeleteDagAssetQueuedEvent(TestQueuedEventEndpoint): def test_delete_should_respond_204(self, test_client, session, create_dummy_dag): dag, _ = create_dummy_dag() dag_id = dag.dag_id - asset_uri = "s3://bucket/key/1" self.create_assets(session=session, num=1) asset_id = 1 @@ -995,7 +956,7 @@ def test_delete_should_respond_204(self, test_client, session, create_dummy_dag) assert len(adrq) == 1 response = test_client.delete( - f"/public/dags/{dag_id}/assets/queuedEvents/{asset_uri}", + f"/public/dags/{dag_id}/assets/{asset_id}/queuedEvents", ) assert response.status_code == 204 @@ -1004,14 +965,14 @@ def test_delete_should_respond_204(self, test_client, session, create_dummy_dag) def test_should_respond_404(self, test_client): dag_id = "not_exists" - asset_uri = "not_exists" + asset_id = 1 response = test_client.delete( - f"/public/dags/{dag_id}/assets/queuedEvents/{asset_uri}", + f"/public/dags/{dag_id}/assets/{asset_id}/queuedEvents/", ) assert response.status_code == 404 assert ( response.json()["detail"] - == "Queued event with dag_id: `not_exists` and asset uri: `not_exists` was not found" + == "Queued event with dag_id: `not_exists` and asset_id: `1` was not found" ) diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index 995b98a61dda8..ee57b77c48926 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -941,7 +941,7 @@ def test_delete_dag_run_not_found(self, test_client): class TestGetDagRunAssetTriggerEvents: def test_should_respond_200(self, test_client, dag_maker, session): - asset1 = Asset(uri="ds1") + asset1 = Asset(name="ds1", uri="file:///da1") with dag_maker(dag_id="source_dag", start_date=START_DATE1, session=session): EmptyOperator(task_id="task", outlets=[asset1]) @@ -975,7 +975,6 @@ def test_should_respond_200(self, test_client, dag_maker, session): { "timestamp": from_datetime_to_zulu(event.timestamp), "asset_id": asset1_id, - "uri": asset1.uri, "extra": {}, "id": event.id, "source_dag_id": ti.dag_id,