From 002359db854ca94599ec9ff049e7e62f95229144 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Tue, 14 Jan 2025 22:02:30 +0100 Subject: [PATCH] fixup! fixup! fixup! Add bundle_name to ParseImportError --- .../endpoints/import_error_endpoint.py | 6 ++--- airflow/api_connexion/openapi/v1.yaml | 4 ++++ .../core_api/datamodels/import_error.py | 1 + .../core_api/openapi/v1-generated.yaml | 4 ++++ airflow/dag_processing/manager.py | 6 ++--- .../ui/openapi-gen/requests/schemas.gen.ts | 6 ++++- airflow/ui/openapi-gen/requests/types.gen.ts | 1 + airflow/www/static/js/types/api-generated.ts | 2 ++ airflow/www/views.py | 10 ++++----- .../test_import_error_endpoint.py | 20 ++++++++++++----- providers/tests/fab/auth_manager/conftest.py | 22 +++++++++++++++++++ .../endpoints/test_import_error_endpoint.py | 5 +++++ .../schemas/test_error_schema.py | 3 +++ 13 files changed, 71 insertions(+), 19 deletions(-) diff --git a/airflow/api_connexion/endpoints/import_error_endpoint.py b/airflow/api_connexion/endpoints/import_error_endpoint.py index b4e19a1edbcf8..95225c8693f88 100644 --- a/airflow/api_connexion/endpoints/import_error_endpoint.py +++ b/airflow/api_connexion/endpoints/import_error_endpoint.py @@ -100,13 +100,13 @@ def get_import_errors( if not can_read_all_dags: # if the user doesn't have access to all DAGs, only display errors from visible DAGs readable_dag_ids = security.get_readable_dags() - dagfiles_stmt = ( + dagfiles_stmt = session.execute( select(DagModel.fileloc, DagModel.bundle_name) .distinct() .where(DagModel.dag_id.in_(readable_dag_ids)) - ) + ).all() query = query.where( - tuple_(ParseImportError.filename, ParseImportError.bundle_name).in_(dagfiles_stmt) + tuple_(ParseImportError.filename, ParseImportError.bundle_name or None).in_(dagfiles_stmt) ) count_query = count_query.where( tuple_(ParseImportError.filename, ParseImportError.bundle_name).in_(dagfiles_stmt) diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index 0ead649f4215a..c63fb72f2a9a0 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -3370,6 +3370,10 @@ components: type: string readOnly: true description: The filename + bundle_name: + type: string + readOnly: true + description: The bundle name stack_trace: type: string readOnly: true diff --git a/airflow/api_fastapi/core_api/datamodels/import_error.py b/airflow/api_fastapi/core_api/datamodels/import_error.py index 32c139da1a93a..baf1ffa4fb7f1 100644 --- a/airflow/api_fastapi/core_api/datamodels/import_error.py +++ b/airflow/api_fastapi/core_api/datamodels/import_error.py @@ -31,6 +31,7 @@ class ImportErrorResponse(BaseModel): id: int = Field(alias="import_error_id") timestamp: datetime filename: str + bundle_name: str stacktrace: str = Field(alias="stack_trace") diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 14f329ee94a4b..ed26c689aa8cb 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -8440,6 +8440,9 @@ components: filename: type: string title: Filename + bundle_name: + type: string + title: Bundle Name stack_trace: type: string title: Stack Trace @@ -8448,6 +8451,7 @@ components: - import_error_id - timestamp - filename + - bundle_name - stack_trace title: ImportErrorResponse description: Import Error Response. diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index d0876c6e8098f..96c7fe4f0ed5c 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -757,10 +757,8 @@ def clear_nonexistent_import_errors(self, session=NEW_SESSION): if self._file_paths: query = query.where( ( - tuple_( - (ParseImportError.filename, ParseImportError.bundle_name).notin_( - [(f.path, f.bundle_name) for f in self._file_paths] - ) + tuple_(ParseImportError.filename, ParseImportError.bundle_name).notin_( + [(f.path, f.bundle_name) for f in self._file_paths] ), ) ) diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 243643bd62d51..080df98714dea 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -3259,13 +3259,17 @@ export const $ImportErrorResponse = { type: "string", title: "Filename", }, + bundle_name: { + type: "string", + title: "Bundle Name", + }, stack_trace: { type: "string", title: "Stack Trace", }, }, type: "object", - required: ["import_error_id", "timestamp", "filename", "stack_trace"], + required: ["import_error_id", "timestamp", "filename", "bundle_name", "stack_trace"], title: "ImportErrorResponse", description: "Import Error Response.", } as const; diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index e5a3d549f91cf..c1c53c8843c88 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -795,6 +795,7 @@ export type ImportErrorResponse = { import_error_id: number; timestamp: string; filename: string; + bundle_name: string; stack_trace: string; }; diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts index b3ed0790efd71..9508b8f7bc923 100644 --- a/airflow/www/static/js/types/api-generated.ts +++ b/airflow/www/static/js/types/api-generated.ts @@ -1284,6 +1284,8 @@ export interface components { timestamp?: string; /** @description The filename */ filename?: string; + /** @description The bundle name */ + bundle_name?: string; /** @description The full stackstrace. */ stack_trace?: string; }; diff --git a/airflow/www/views.py b/airflow/www/views.py index c0ce8307dae2b..0db9fe5509333 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -1017,12 +1017,10 @@ def index(self): if not can_read_all_dags: # if the user doesn't have access to all DAGs, only display errors from visible DAGs import_errors = import_errors.where( - tuple_( - (ParseImportError.filename, ParseImportError.bundle_name).in_( - select(DagModel.fileloc, DagModel.bundle_name) - .distinct() - .where(DagModel.dag_id.in_(filter_dag_ids)) - ) + tuple_(ParseImportError.filename, ParseImportError.bundle_name).in_( + select(DagModel.fileloc, DagModel.bundle_name) + .distinct() + .where(DagModel.dag_id.in_(filter_dag_ids)) ), ) diff --git a/providers/tests/fab/auth_manager/api_endpoints/test_import_error_endpoint.py b/providers/tests/fab/auth_manager/api_endpoints/test_import_error_endpoint.py index 19509aa558146..2788e511e3276 100644 --- a/providers/tests/fab/auth_manager/api_endpoints/test_import_error_endpoint.py +++ b/providers/tests/fab/auth_manager/api_endpoints/test_import_error_endpoint.py @@ -18,6 +18,7 @@ import pytest +from airflow.dag_processing.bundles.manager import DagBundlesManager from airflow.models.dag import DagModel from airflow.providers.fab.www.security import permissions from airflow.utils import timezone @@ -60,7 +61,6 @@ def configured_app(minimal_app_for_auth_api): } ] ) - yield app delete_user(app, username="test_single_dag") @@ -123,6 +123,7 @@ def test_should_return_200_with_single_dag_read(self, session): response_data["import_error_id"] = 1 assert response_data == { "filename": "Lorem_ipsum.py", + "bundle_name": None, "import_error_id": 1, "stack_trace": "Lorem ipsum", "timestamp": "2020-06-10T12:00:00+00:00", @@ -149,6 +150,7 @@ def test_should_return_200_redacted_with_single_dag_read_in_dagfile(self, sessio response_data["import_error_id"] = 1 assert response_data == { "filename": "Lorem_ipsum.py", + "bundle_name": None, "import_error_id": 1, "stack_trace": "REDACTED - you do not have read permission on all DAGs in the file", "timestamp": "2020-06-10T12:00:00+00:00", @@ -156,13 +158,16 @@ def test_should_return_200_redacted_with_single_dag_read_in_dagfile(self, sessio class TestGetImportErrorsEndpoint(TestBaseImportError): - def test_get_import_errors_single_dag(self, session): + def test_get_import_errors_single_dag(self, configure_testing_dag_bundle, session): + with configure_testing_dag_bundle("/tmp"): + DagBundlesManager().sync_bundles_to_db() for dag_id in TEST_DAG_IDS: fake_filename = f"/tmp/{dag_id}.py" - dag_model = DagModel(dag_id=dag_id, fileloc=fake_filename) + dag_model = DagModel(dag_id=dag_id, fileloc=fake_filename, bundle_name="testing") session.add(dag_model) importerror = ParseImportError( filename=fake_filename, + bundle_name="testing", stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC"), ) @@ -180,6 +185,7 @@ def test_get_import_errors_single_dag(self, session): "import_errors": [ { "filename": "/tmp/test_dag.py", + "bundle_name": "testing", "import_error_id": 1, "stack_trace": "Lorem ipsum", "timestamp": "2020-06-10T12:00:00+00:00", @@ -188,14 +194,17 @@ def test_get_import_errors_single_dag(self, session): "total_entries": 1, } - def test_get_import_errors_single_dag_in_dagfile(self, session): + def test_get_import_errors_single_dag_in_dagfile(self, configure_testing_dag_bundle, session): + with configure_testing_dag_bundle("/tmp"): + DagBundlesManager().sync_bundles_to_db() for dag_id in TEST_DAG_IDS: fake_filename = "/tmp/all_in_one.py" - dag_model = DagModel(dag_id=dag_id, fileloc=fake_filename) + dag_model = DagModel(dag_id=dag_id, fileloc=fake_filename, bundle_name="testing") session.add(dag_model) importerror = ParseImportError( filename="/tmp/all_in_one.py", + bundle_name="testing", stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC"), ) @@ -213,6 +222,7 @@ def test_get_import_errors_single_dag_in_dagfile(self, session): "import_errors": [ { "filename": "/tmp/all_in_one.py", + "bundle_name": "testing", "import_error_id": 1, "stack_trace": "REDACTED - you do not have read permission on all DAGs in the file", "timestamp": "2020-06-10T12:00:00+00:00", diff --git a/providers/tests/fab/auth_manager/conftest.py b/providers/tests/fab/auth_manager/conftest.py index f26a08d19c3e3..ad23c1e1febb0 100644 --- a/providers/tests/fab/auth_manager/conftest.py +++ b/providers/tests/fab/auth_manager/conftest.py @@ -16,7 +16,10 @@ # under the License. from __future__ import annotations +import json import os +from contextlib import contextmanager +from pathlib import Path import pytest @@ -77,3 +80,22 @@ def dagbag(): parse_and_sync_to_db(os.devnull, include_examples=True) return DagBag(read_dags_from_db=True) + + +@pytest.fixture +def configure_testing_dag_bundle(): + """Configure the testing DAG bundle with the provided path, and disable the DAGs folder bundle.""" + + @contextmanager + def _config_bundle(path_to_parse: Path | str): + bundle_config = [ + { + "name": "testing", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"local_folder": str(path_to_parse), "refresh_interval": 0}, + } + ] + with conf_vars({("dag_bundles", "backends"): json.dumps(bundle_config)}): + yield + + return _config_bundle diff --git a/tests/api_connexion/endpoints/test_import_error_endpoint.py b/tests/api_connexion/endpoints/test_import_error_endpoint.py index 6697c776d9052..4c6d6ec344174 100644 --- a/tests/api_connexion/endpoints/test_import_error_endpoint.py +++ b/tests/api_connexion/endpoints/test_import_error_endpoint.py @@ -90,6 +90,7 @@ def test_response_200(self, session): response_data["import_error_id"] = 1 assert response_data == { "filename": "Lorem_ipsum.py", + "bundle_name": None, "import_error_id": 1, "stack_trace": "Lorem ipsum", "timestamp": "2020-06-10T12:00:00+00:00", @@ -147,12 +148,14 @@ def test_get_import_errors(self, session): "import_errors": [ { "filename": "Lorem_ipsum.py", + "bundle_name": None, "import_error_id": 1, "stack_trace": "Lorem ipsum", "timestamp": "2020-06-10T12:00:00+00:00", }, { "filename": "Lorem_ipsum.py", + "bundle_name": None, "import_error_id": 2, "stack_trace": "Lorem ipsum", "timestamp": "2020-06-10T12:00:00+00:00", @@ -184,12 +187,14 @@ def test_get_import_errors_order_by(self, session): "import_errors": [ { "filename": "Lorem_ipsum1.py", + "bundle_name": None, "import_error_id": 1, # id normalized with self._normalize_import_errors "stack_trace": "Lorem ipsum", "timestamp": "2020-06-09T12:00:00+00:00", }, { "filename": "Lorem_ipsum2.py", + "bundle_name": None, "import_error_id": 2, "stack_trace": "Lorem ipsum", "timestamp": "2020-06-08T12:00:00+00:00", diff --git a/tests/api_connexion/schemas/test_error_schema.py b/tests/api_connexion/schemas/test_error_schema.py index c953925c90d43..29d4e222900ec 100644 --- a/tests/api_connexion/schemas/test_error_schema.py +++ b/tests/api_connexion/schemas/test_error_schema.py @@ -55,6 +55,7 @@ def test_serialize(self, session): serialized_data["import_error_id"] = 1 assert serialized_data == { "filename": "lorem.py", + "bundle_name": None, "import_error_id": 1, "stack_trace": "Lorem Ipsum", "timestamp": "2020-06-10T12:02:44+00:00", @@ -86,12 +87,14 @@ def test_serialize(self, session): "import_errors": [ { "filename": "Lorem_ipsum.py", + "bundle_name": None, "import_error_id": 1, "stack_trace": "Lorem ipsum", "timestamp": "2020-06-10T12:02:44+00:00", }, { "filename": "Lorem_ipsum.py", + "bundle_name": None, "import_error_id": 2, "stack_trace": "Lorem ipsum", "timestamp": "2020-06-10T12:02:44+00:00",