Skip to content

Commit

Permalink
fixup! fixup! fixup! Add bundle_name to ParseImportError
Browse files Browse the repository at this point in the history
  • Loading branch information
ephraimbuddy committed Jan 14, 2025
1 parent 98250ee commit 002359d
Show file tree
Hide file tree
Showing 13 changed files with 71 additions and 19 deletions.
6 changes: 3 additions & 3 deletions airflow/api_connexion/endpoints/import_error_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,13 @@ def get_import_errors(
if not can_read_all_dags:
# if the user doesn't have access to all DAGs, only display errors from visible DAGs
readable_dag_ids = security.get_readable_dags()
dagfiles_stmt = (
dagfiles_stmt = session.execute(
select(DagModel.fileloc, DagModel.bundle_name)
.distinct()
.where(DagModel.dag_id.in_(readable_dag_ids))
)
).all()
query = query.where(
tuple_(ParseImportError.filename, ParseImportError.bundle_name).in_(dagfiles_stmt)
tuple_(ParseImportError.filename, ParseImportError.bundle_name or None).in_(dagfiles_stmt)
)
count_query = count_query.where(
tuple_(ParseImportError.filename, ParseImportError.bundle_name).in_(dagfiles_stmt)
Expand Down
4 changes: 4 additions & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3370,6 +3370,10 @@ components:
type: string
readOnly: true
description: The filename
bundle_name:
type: string
readOnly: true
description: The bundle name
stack_trace:
type: string
readOnly: true
Expand Down
1 change: 1 addition & 0 deletions airflow/api_fastapi/core_api/datamodels/import_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class ImportErrorResponse(BaseModel):
id: int = Field(alias="import_error_id")
timestamp: datetime
filename: str
bundle_name: str
stacktrace: str = Field(alias="stack_trace")


Expand Down
4 changes: 4 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8440,6 +8440,9 @@ components:
filename:
type: string
title: Filename
bundle_name:
type: string
title: Bundle Name
stack_trace:
type: string
title: Stack Trace
Expand All @@ -8448,6 +8451,7 @@ components:
- import_error_id
- timestamp
- filename
- bundle_name
- stack_trace
title: ImportErrorResponse
description: Import Error Response.
Expand Down
6 changes: 2 additions & 4 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -757,10 +757,8 @@ def clear_nonexistent_import_errors(self, session=NEW_SESSION):
if self._file_paths:
query = query.where(
(
tuple_(
(ParseImportError.filename, ParseImportError.bundle_name).notin_(
[(f.path, f.bundle_name) for f in self._file_paths]
)
tuple_(ParseImportError.filename, ParseImportError.bundle_name).notin_(
[(f.path, f.bundle_name) for f in self._file_paths]
),
)
)
Expand Down
6 changes: 5 additions & 1 deletion airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3259,13 +3259,17 @@ export const $ImportErrorResponse = {
type: "string",
title: "Filename",
},
bundle_name: {
type: "string",
title: "Bundle Name",
},
stack_trace: {
type: "string",
title: "Stack Trace",
},
},
type: "object",
required: ["import_error_id", "timestamp", "filename", "stack_trace"],
required: ["import_error_id", "timestamp", "filename", "bundle_name", "stack_trace"],
title: "ImportErrorResponse",
description: "Import Error Response.",
} as const;
Expand Down
1 change: 1 addition & 0 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,7 @@ export type ImportErrorResponse = {
import_error_id: number;
timestamp: string;
filename: string;
bundle_name: string;
stack_trace: string;
};

Expand Down
2 changes: 2 additions & 0 deletions airflow/www/static/js/types/api-generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1284,6 +1284,8 @@ export interface components {
timestamp?: string;
/** @description The filename */
filename?: string;
/** @description The bundle name */
bundle_name?: string;
/** @description The full stackstrace. */
stack_trace?: string;
};
Expand Down
10 changes: 4 additions & 6 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1017,12 +1017,10 @@ def index(self):
if not can_read_all_dags:
# if the user doesn't have access to all DAGs, only display errors from visible DAGs
import_errors = import_errors.where(
tuple_(
(ParseImportError.filename, ParseImportError.bundle_name).in_(
select(DagModel.fileloc, DagModel.bundle_name)
.distinct()
.where(DagModel.dag_id.in_(filter_dag_ids))
)
tuple_(ParseImportError.filename, ParseImportError.bundle_name).in_(
select(DagModel.fileloc, DagModel.bundle_name)
.distinct()
.where(DagModel.dag_id.in_(filter_dag_ids))
),
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import pytest

from airflow.dag_processing.bundles.manager import DagBundlesManager
from airflow.models.dag import DagModel
from airflow.providers.fab.www.security import permissions
from airflow.utils import timezone
Expand Down Expand Up @@ -60,7 +61,6 @@ def configured_app(minimal_app_for_auth_api):
}
]
)

yield app

delete_user(app, username="test_single_dag")
Expand Down Expand Up @@ -123,6 +123,7 @@ def test_should_return_200_with_single_dag_read(self, session):
response_data["import_error_id"] = 1
assert response_data == {
"filename": "Lorem_ipsum.py",
"bundle_name": None,
"import_error_id": 1,
"stack_trace": "Lorem ipsum",
"timestamp": "2020-06-10T12:00:00+00:00",
Expand All @@ -149,20 +150,24 @@ def test_should_return_200_redacted_with_single_dag_read_in_dagfile(self, sessio
response_data["import_error_id"] = 1
assert response_data == {
"filename": "Lorem_ipsum.py",
"bundle_name": None,
"import_error_id": 1,
"stack_trace": "REDACTED - you do not have read permission on all DAGs in the file",
"timestamp": "2020-06-10T12:00:00+00:00",
}


class TestGetImportErrorsEndpoint(TestBaseImportError):
def test_get_import_errors_single_dag(self, session):
def test_get_import_errors_single_dag(self, configure_testing_dag_bundle, session):
with configure_testing_dag_bundle("/tmp"):
DagBundlesManager().sync_bundles_to_db()
for dag_id in TEST_DAG_IDS:
fake_filename = f"/tmp/{dag_id}.py"
dag_model = DagModel(dag_id=dag_id, fileloc=fake_filename)
dag_model = DagModel(dag_id=dag_id, fileloc=fake_filename, bundle_name="testing")
session.add(dag_model)
importerror = ParseImportError(
filename=fake_filename,
bundle_name="testing",
stacktrace="Lorem ipsum",
timestamp=timezone.parse(self.timestamp, timezone="UTC"),
)
Expand All @@ -180,6 +185,7 @@ def test_get_import_errors_single_dag(self, session):
"import_errors": [
{
"filename": "/tmp/test_dag.py",
"bundle_name": "testing",
"import_error_id": 1,
"stack_trace": "Lorem ipsum",
"timestamp": "2020-06-10T12:00:00+00:00",
Expand All @@ -188,14 +194,17 @@ def test_get_import_errors_single_dag(self, session):
"total_entries": 1,
}

def test_get_import_errors_single_dag_in_dagfile(self, session):
def test_get_import_errors_single_dag_in_dagfile(self, configure_testing_dag_bundle, session):
with configure_testing_dag_bundle("/tmp"):
DagBundlesManager().sync_bundles_to_db()
for dag_id in TEST_DAG_IDS:
fake_filename = "/tmp/all_in_one.py"
dag_model = DagModel(dag_id=dag_id, fileloc=fake_filename)
dag_model = DagModel(dag_id=dag_id, fileloc=fake_filename, bundle_name="testing")
session.add(dag_model)

importerror = ParseImportError(
filename="/tmp/all_in_one.py",
bundle_name="testing",
stacktrace="Lorem ipsum",
timestamp=timezone.parse(self.timestamp, timezone="UTC"),
)
Expand All @@ -213,6 +222,7 @@ def test_get_import_errors_single_dag_in_dagfile(self, session):
"import_errors": [
{
"filename": "/tmp/all_in_one.py",
"bundle_name": "testing",
"import_error_id": 1,
"stack_trace": "REDACTED - you do not have read permission on all DAGs in the file",
"timestamp": "2020-06-10T12:00:00+00:00",
Expand Down
22 changes: 22 additions & 0 deletions providers/tests/fab/auth_manager/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
# under the License.
from __future__ import annotations

import json
import os
from contextlib import contextmanager
from pathlib import Path

import pytest

Expand Down Expand Up @@ -77,3 +80,22 @@ def dagbag():

parse_and_sync_to_db(os.devnull, include_examples=True)
return DagBag(read_dags_from_db=True)


@pytest.fixture
def configure_testing_dag_bundle():
"""Configure the testing DAG bundle with the provided path, and disable the DAGs folder bundle."""

@contextmanager
def _config_bundle(path_to_parse: Path | str):
bundle_config = [
{
"name": "testing",
"classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
"kwargs": {"local_folder": str(path_to_parse), "refresh_interval": 0},
}
]
with conf_vars({("dag_bundles", "backends"): json.dumps(bundle_config)}):
yield

return _config_bundle
5 changes: 5 additions & 0 deletions tests/api_connexion/endpoints/test_import_error_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def test_response_200(self, session):
response_data["import_error_id"] = 1
assert response_data == {
"filename": "Lorem_ipsum.py",
"bundle_name": None,
"import_error_id": 1,
"stack_trace": "Lorem ipsum",
"timestamp": "2020-06-10T12:00:00+00:00",
Expand Down Expand Up @@ -147,12 +148,14 @@ def test_get_import_errors(self, session):
"import_errors": [
{
"filename": "Lorem_ipsum.py",
"bundle_name": None,
"import_error_id": 1,
"stack_trace": "Lorem ipsum",
"timestamp": "2020-06-10T12:00:00+00:00",
},
{
"filename": "Lorem_ipsum.py",
"bundle_name": None,
"import_error_id": 2,
"stack_trace": "Lorem ipsum",
"timestamp": "2020-06-10T12:00:00+00:00",
Expand Down Expand Up @@ -184,12 +187,14 @@ def test_get_import_errors_order_by(self, session):
"import_errors": [
{
"filename": "Lorem_ipsum1.py",
"bundle_name": None,
"import_error_id": 1, # id normalized with self._normalize_import_errors
"stack_trace": "Lorem ipsum",
"timestamp": "2020-06-09T12:00:00+00:00",
},
{
"filename": "Lorem_ipsum2.py",
"bundle_name": None,
"import_error_id": 2,
"stack_trace": "Lorem ipsum",
"timestamp": "2020-06-08T12:00:00+00:00",
Expand Down
3 changes: 3 additions & 0 deletions tests/api_connexion/schemas/test_error_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def test_serialize(self, session):
serialized_data["import_error_id"] = 1
assert serialized_data == {
"filename": "lorem.py",
"bundle_name": None,
"import_error_id": 1,
"stack_trace": "Lorem Ipsum",
"timestamp": "2020-06-10T12:02:44+00:00",
Expand Down Expand Up @@ -86,12 +87,14 @@ def test_serialize(self, session):
"import_errors": [
{
"filename": "Lorem_ipsum.py",
"bundle_name": None,
"import_error_id": 1,
"stack_trace": "Lorem ipsum",
"timestamp": "2020-06-10T12:02:44+00:00",
},
{
"filename": "Lorem_ipsum.py",
"bundle_name": None,
"import_error_id": 2,
"stack_trace": "Lorem ipsum",
"timestamp": "2020-06-10T12:02:44+00:00",
Expand Down

0 comments on commit 002359d

Please sign in to comment.