diff --git a/airflow/api/common/delete_dag.py b/airflow/api/common/delete_dag.py index 1e9d4a80dbd67..fff64a6e53771 100644 --- a/airflow/api/common/delete_dag.py +++ b/airflow/api/common/delete_dag.py @@ -75,7 +75,10 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session: Session = # This handles the case when the dag_id is changed in the file session.execute( delete(ParseImportError) - .where(ParseImportError.filename == dag.fileloc) + .where( + ParseImportError.filename == dag.fileloc, + ParseImportError.bundle_name == dag.get_bundle_name(session), + ) .execution_options(synchronize_session="fetch") ) diff --git a/airflow/api_connexion/endpoints/import_error_endpoint.py b/airflow/api_connexion/endpoints/import_error_endpoint.py index d2780fe941b13..b4e19a1edbcf8 100644 --- a/airflow/api_connexion/endpoints/import_error_endpoint.py +++ b/airflow/api_connexion/endpoints/import_error_endpoint.py @@ -19,7 +19,7 @@ from collections.abc import Sequence from typing import TYPE_CHECKING -from sqlalchemy import func, select +from sqlalchemy import func, select, tuple_ from airflow.api_connexion import security from airflow.api_connexion.exceptions import NotFound, PermissionDenied @@ -61,7 +61,9 @@ def get_import_error(*, import_error_id: int, session: Session = NEW_SESSION) -> readable_dag_ids = security.get_readable_dags() file_dag_ids = { dag_id[0] - for dag_id in session.query(DagModel.dag_id).filter(DagModel.fileloc == error.filename).all() + for dag_id in session.query(DagModel.dag_id) + .filter(DagModel.fileloc == error.filename, DagModel.bundle_name == error.bundle_name) + .all() } # Can the user read any DAGs in the file? @@ -98,9 +100,17 @@ def get_import_errors( if not can_read_all_dags: # if the user doesn't have access to all DAGs, only display errors from visible DAGs readable_dag_ids = security.get_readable_dags() - dagfiles_stmt = select(DagModel.fileloc).distinct().where(DagModel.dag_id.in_(readable_dag_ids)) - query = query.where(ParseImportError.filename.in_(dagfiles_stmt)) - count_query = count_query.where(ParseImportError.filename.in_(dagfiles_stmt)) + dagfiles_stmt = ( + select(DagModel.fileloc, DagModel.bundle_name) + .distinct() + .where(DagModel.dag_id.in_(readable_dag_ids)) + ) + query = query.where( + tuple_(ParseImportError.filename, ParseImportError.bundle_name).in_(dagfiles_stmt) + ) + count_query = count_query.where( + tuple_(ParseImportError.filename, ParseImportError.bundle_name).in_(dagfiles_stmt) + ) total_entries = session.scalars(count_query).one() import_errors = session.scalars(query.offset(offset).limit(limit)).all() @@ -109,7 +119,12 @@ def get_import_errors( for import_error in import_errors: # Check if user has read access to all the DAGs defined in the file file_dag_ids = ( - session.query(DagModel.dag_id).filter(DagModel.fileloc == import_error.filename).all() + session.query(DagModel.dag_id) + .filter( + DagModel.fileloc == import_error.filename, + DagModel.bundle_name == import_error.bundle_name, + ) + .all() ) requests: Sequence[IsAuthorizedDagRequest] = [ { diff --git a/airflow/api_connexion/schemas/error_schema.py b/airflow/api_connexion/schemas/error_schema.py index 8f117fb9666b2..237abcc091d41 100644 --- a/airflow/api_connexion/schemas/error_schema.py +++ b/airflow/api_connexion/schemas/error_schema.py @@ -35,6 +35,7 @@ class Meta: import_error_id = auto_field("id", dump_only=True) timestamp = auto_field(format="iso", dump_only=True) filename = auto_field(dump_only=True) + bundle_name = auto_field(dump_only=True) stack_trace = auto_field("stacktrace", dump_only=True) diff --git a/airflow/api_fastapi/core_api/routes/public/import_error.py b/airflow/api_fastapi/core_api/routes/public/import_error.py index 89ad78403a430..01caf9048e2d6 100644 --- a/airflow/api_fastapi/core_api/routes/public/import_error.py +++ b/airflow/api_fastapi/core_api/routes/public/import_error.py @@ -74,6 +74,7 @@ def get_import_errors( "id", "timestamp", "filename", + "bundle_name", "stacktrace", ], ParseImportError, diff --git a/airflow/dag_processing/collection.py b/airflow/dag_processing/collection.py index 6e0b627198995..5712958b55b95 100644 --- a/airflow/dag_processing/collection.py +++ b/airflow/dag_processing/collection.py @@ -241,21 +241,36 @@ def _update_dag_warnings( session.merge(warning_to_add) -def _update_import_errors(files_parsed: set[str], import_errors: dict[str, str], session: Session): +def _update_import_errors( + files_parsed: set[str], bundle_name: str, import_errors: dict[str, str], session: Session +): from airflow.listeners.listener import get_listener_manager # We can remove anything from files parsed in this batch that doesn't have an error. We need to remove old # errors (i.e. from files that are removed) separately - session.execute(delete(ParseImportError).where(ParseImportError.filename.in_(list(files_parsed)))) + session.execute( + delete(ParseImportError).where( + ParseImportError.filename.in_(list(files_parsed)), ParseImportError.bundle_name == bundle_name + ) + ) - existing_import_error_files = set(session.scalars(select(ParseImportError.filename))) + existing_import_error_files = set( + session.execute(select(ParseImportError.filename, ParseImportError.bundle_name)) + ) # Add the errors of the processed files for filename, stacktrace in import_errors.items(): - if filename in existing_import_error_files: - session.query(ParseImportError).where(ParseImportError.filename == filename).update( - {"filename": filename, "timestamp": utcnow(), "stacktrace": stacktrace}, + if (filename, bundle_name) in existing_import_error_files: + session.query(ParseImportError).where( + ParseImportError.filename == filename, ParseImportError.bundle_name == bundle_name + ).update( + { + "filename": filename, + "bundle_name": bundle_name, + "timestamp": utcnow(), + "stacktrace": stacktrace, + }, ) # sending notification when an existing dag import error occurs get_listener_manager().hook.on_existing_dag_import_error(filename=filename, stacktrace=stacktrace) @@ -263,13 +278,16 @@ def _update_import_errors(files_parsed: set[str], import_errors: dict[str, str], session.add( ParseImportError( filename=filename, + bundle_name=bundle_name, timestamp=utcnow(), stacktrace=stacktrace, ) ) # sending notification when a new dag import error occurs get_listener_manager().hook.on_new_dag_import_error(filename=filename, stacktrace=stacktrace) - session.query(DagModel).filter(DagModel.fileloc == filename).update({"has_import_errors": True}) + session.query(DagModel).filter( + DagModel.fileloc == filename, DagModel.bundle_name == bundle_name + ).update({"has_import_errors": True}) def update_dag_parsing_results_in_db( @@ -314,7 +332,6 @@ def update_dag_parsing_results_in_db( try: DAG.bulk_write_to_db(bundle_name, bundle_version, dags, session=session) # Write Serialized DAGs to DB, capturing errors - # Write Serialized DAGs to DB, capturing errors for dag in dags: serialize_errors.extend(_serialize_dag_capturing_errors(dag, session)) except OperationalError: @@ -332,6 +349,7 @@ def update_dag_parsing_results_in_db( good_dag_filelocs = {dag.fileloc for dag in dags if dag.fileloc not in import_errors} _update_import_errors( files_parsed=good_dag_filelocs, + bundle_name=bundle_name, import_errors=import_errors, session=session, ) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 9ac8802f7d7fd..b7ce093f2ea3e 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -40,7 +40,7 @@ import attrs from setproctitle import setproctitle -from sqlalchemy import delete, select, update +from sqlalchemy import and_, delete, select, update from tabulate import tabulate from uuid6 import uuid7 @@ -745,7 +745,10 @@ def clear_nonexistent_import_errors(self, session=NEW_SESSION): if self._file_paths: query = query.where( - ParseImportError.filename.notin_([f.path for f in self._file_paths]), + and_( + ParseImportError.filename.notin_([f.path for f in self._file_paths]), + ParseImportError.bundle_name.notin_([f.bundle_name for f in self._file_paths]), + ) ) session.execute(query.execution_options(synchronize_session="fetch")) diff --git a/airflow/migrations/versions/0055_3_0_0_add_bundle_name_to_parseimporterror.py b/airflow/migrations/versions/0055_3_0_0_add_bundle_name_to_parseimporterror.py new file mode 100644 index 0000000000000..5670668e9e7e6 --- /dev/null +++ b/airflow/migrations/versions/0055_3_0_0_add_bundle_name_to_parseimporterror.py @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +add bundle_name to ParseImportError. + +Revision ID: 03de77aaa4ec +Revises: 38770795785f +Create Date: 2025-01-08 10:38:02.108760 + +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "03de77aaa4ec" +down_revision = "38770795785f" +branch_labels = None +depends_on = None +airflow_version = "3.0.0" + + +def upgrade(): + """Apply add bundle_name to ParseImportError.""" + with op.batch_alter_table("import_error", schema=None) as batch_op: + batch_op.add_column(sa.Column("bundle_name", sa.String(length=250), nullable=True)) + + +def downgrade(): + """Unapply add bundle_name to ParseImportError.""" + with op.batch_alter_table("import_error", schema=None) as batch_op: + batch_op.drop_column("bundle_name") diff --git a/airflow/models/errors.py b/airflow/models/errors.py index ff05a1385a966..21c2236e2c18b 100644 --- a/airflow/models/errors.py +++ b/airflow/models/errors.py @@ -19,7 +19,7 @@ from sqlalchemy import Column, Integer, String, Text -from airflow.models.base import Base +from airflow.models.base import Base, StringID from airflow.utils.sqlalchemy import UtcDateTime @@ -30,4 +30,5 @@ class ParseImportError(Base): id = Column(Integer, primary_key=True) timestamp = Column(UtcDateTime) filename = Column(String(1024)) + bundle_name = Column(StringID()) stacktrace = Column(Text) diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 29d3dc5439cee..ed7eb1090416c 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -94,7 +94,7 @@ class MappedClassProtocol(Protocol): "2.9.2": "686269002441", "2.10.0": "22ed7efa9da2", "2.10.3": "5f2621c13b39", - "3.0.0": "38770795785f", + "3.0.0": "03de77aaa4ec", } diff --git a/airflow/www/utils.py b/airflow/www/utils.py index 9c31942457487..7860e3cf7282d 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -198,14 +198,19 @@ def encode_dag_run( return encoded_dag_run, None -def check_import_errors(fileloc, session): +def check_import_errors(fileloc, bundle_name, session): # Check dag import errors import_errors = session.scalars( - select(ParseImportError).where(ParseImportError.filename == fileloc) + select(ParseImportError).where( + ParseImportError.filename == fileloc, ParseImportError.bundle_name == bundle_name + ) ).all() if import_errors: for import_error in import_errors: - flash(f"Broken DAG: [{import_error.filename}] {import_error.stacktrace}", "dag_import_error") + flash( + f"Broken DAG: [{import_error.filename}, Bundle name: {bundle_name}] {import_error.stacktrace}", + "dag_import_error", + ) def check_dag_warnings(dag_id, session): diff --git a/airflow/www/views.py b/airflow/www/views.py index 01b10b98aff64..ee4aff114be94 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -1019,7 +1019,10 @@ def index(self): import_errors = import_errors.where( ParseImportError.filename.in_( select(DagModel.fileloc).distinct().where(DagModel.dag_id.in_(filter_dag_ids)) - ) + ), + ParseImportError.bundle_name.in_( + select(DagModel.bundle_name).distinct().where(DagModel.dag_id.in_(filter_dag_ids)) + ), ) import_errors = session.scalars(import_errors) @@ -2876,10 +2879,10 @@ def grid(self, dag_id: str, session: Session = NEW_SESSION): dag = get_airflow_app().dag_bag.get_dag(dag_id, session=session) url_serializer = URLSafeSerializer(current_app.config["SECRET_KEY"]) dag_model = DagModel.get_dagmodel(dag_id, session=session) - if not dag: + if not dag or not dag_model: flash(f'DAG "{dag_id}" seems to be missing from DagBag.', "error") return redirect(url_for("Airflow.index")) - wwwutils.check_import_errors(dag.fileloc, session) + wwwutils.check_import_errors(dag.fileloc, dag_model.bundle_name, session) wwwutils.check_dag_warnings(dag.dag_id, session) included_events_raw = conf.get("webserver", "audit_view_included_events", fallback="") diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 48c765c1699bd..9a197736d1d53 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -f4ad824c8d9ff45e86002506edd83b540a88dab45bb292b1af96cd86dec5ecab \ No newline at end of file +044c0ecf74548e2aeff6a04bc3761c79df18e66a8de4ea0d56c724b1f77361f6 \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index b0f6d6b896667..feaccd8f3483c 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -4,11 +4,11 @@ - - + + %3 - + dag_priority_parsing_request @@ -273,2241 +273,2245 @@ import_error - -import_error - -id - - [INTEGER] - NOT NULL - -filename - - [VARCHAR(1024)] - -stacktrace - - [TEXT] - -timestamp - - [TIMESTAMP] + +import_error + +id + + [INTEGER] + NOT NULL + +bundle_name + + [VARCHAR(250)] + +filename + + [VARCHAR(1024)] + +stacktrace + + [TEXT] + +timestamp + + [TIMESTAMP] asset_alias - -asset_alias + +asset_alias + +id + + [INTEGER] + NOT NULL -id - - [INTEGER] - NOT NULL +group + + [VARCHAR(1500)] + NOT NULL -group - - [VARCHAR(1500)] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL +name + + [VARCHAR(1500)] + NOT NULL asset_alias_asset - -asset_alias_asset + +asset_alias_asset + +alias_id + + [INTEGER] + NOT NULL -alias_id - - [INTEGER] - NOT NULL - -asset_id - - [INTEGER] - NOT NULL +asset_id + + [INTEGER] + NOT NULL asset_alias--asset_alias_asset - -0..N -1 + +0..N +1 asset_alias_asset_event - -asset_alias_asset_event + +asset_alias_asset_event + +alias_id + + [INTEGER] + NOT NULL -alias_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL +event_id + + [INTEGER] + NOT NULL asset_alias--asset_alias_asset_event - -0..N -1 + +0..N +1 dag_schedule_asset_alias_reference - -dag_schedule_asset_alias_reference + +dag_schedule_asset_alias_reference + +alias_id + + [INTEGER] + NOT NULL -alias_id - - [INTEGER] - NOT NULL +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +created_at + + [TIMESTAMP] + NOT NULL -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL +updated_at + + [TIMESTAMP] + NOT NULL asset_alias--dag_schedule_asset_alias_reference - -0..N -1 + +0..N +1 asset - -asset + +asset + +id + + [INTEGER] + NOT NULL -id - - [INTEGER] - NOT NULL +created_at + + [TIMESTAMP] + NOT NULL -created_at - - [TIMESTAMP] - NOT NULL +extra + + [JSON] + NOT NULL -extra - - [JSON] - NOT NULL +group + + [VARCHAR(1500)] + NOT NULL -group - - [VARCHAR(1500)] - NOT NULL +name + + [VARCHAR(1500)] + NOT NULL -name - - [VARCHAR(1500)] - NOT NULL +updated_at + + [TIMESTAMP] + NOT NULL -updated_at - - [TIMESTAMP] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL +uri + + [VARCHAR(1500)] + NOT NULL asset--asset_alias_asset - -0..N -1 + +0..N +1 asset_trigger - -asset_trigger + +asset_trigger + +asset_id + + [INTEGER] + NOT NULL -asset_id - - [INTEGER] - NOT NULL - -trigger_id - - [INTEGER] - NOT NULL +trigger_id + + [INTEGER] + NOT NULL asset--asset_trigger - -0..N -1 + +0..N +1 asset_active - -asset_active + +asset_active + +name + + [VARCHAR(1500)] + NOT NULL -name - - [VARCHAR(1500)] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL +uri + + [VARCHAR(1500)] + NOT NULL asset--asset_active - -1 -1 + +1 +1 asset--asset_active - -1 -1 + +1 +1 dag_schedule_asset_reference - -dag_schedule_asset_reference + +dag_schedule_asset_reference + +asset_id + + [INTEGER] + NOT NULL -asset_id - - [INTEGER] - NOT NULL +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +created_at + + [TIMESTAMP] + NOT NULL -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL +updated_at + + [TIMESTAMP] + NOT NULL asset--dag_schedule_asset_reference - -0..N -1 + +0..N +1 task_outlet_asset_reference - -task_outlet_asset_reference + +task_outlet_asset_reference + +asset_id + + [INTEGER] + NOT NULL -asset_id - - [INTEGER] - NOT NULL +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +task_id + + [VARCHAR(250)] + NOT NULL -task_id - - [VARCHAR(250)] - NOT NULL +created_at + + [TIMESTAMP] + NOT NULL -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL +updated_at + + [TIMESTAMP] + NOT NULL asset--task_outlet_asset_reference - -0..N -1 + +0..N +1 asset_dag_run_queue - -asset_dag_run_queue + +asset_dag_run_queue + +asset_id + + [INTEGER] + NOT NULL -asset_id - - [INTEGER] - NOT NULL +target_dag_id + + [VARCHAR(250)] + NOT NULL -target_dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL +created_at + + [TIMESTAMP] + NOT NULL asset--asset_dag_run_queue - -0..N -1 + +0..N +1 asset_event - -asset_event + +asset_event + +id + + [INTEGER] + NOT NULL -id - - [INTEGER] - NOT NULL +asset_id + + [INTEGER] + NOT NULL -asset_id - - [INTEGER] - NOT NULL +extra + + [JSON] + NOT NULL -extra - - [JSON] - NOT NULL +source_dag_id + + [VARCHAR(250)] -source_dag_id - - [VARCHAR(250)] +source_map_index + + [INTEGER] -source_map_index - - [INTEGER] +source_run_id + + [VARCHAR(250)] -source_run_id - - [VARCHAR(250)] +source_task_id + + [VARCHAR(250)] -source_task_id - - [VARCHAR(250)] - -timestamp - - [TIMESTAMP] - NOT NULL +timestamp + + [TIMESTAMP] + NOT NULL asset_event--asset_alias_asset_event - -0..N -1 + +0..N +1 dagrun_asset_event - -dagrun_asset_event + +dagrun_asset_event + +dag_run_id + + [INTEGER] + NOT NULL -dag_run_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL +event_id + + [INTEGER] + NOT NULL asset_event--dagrun_asset_event - -0..N -1 + +0..N +1 trigger - -trigger + +trigger + +id + + [INTEGER] + NOT NULL -id - - [INTEGER] - NOT NULL +classpath + + [VARCHAR(1000)] + NOT NULL -classpath - - [VARCHAR(1000)] - NOT NULL +created_date + + [TIMESTAMP] + NOT NULL -created_date - - [TIMESTAMP] - NOT NULL +kwargs + + [TEXT] + NOT NULL -kwargs - - [TEXT] - NOT NULL - -triggerer_id - - [INTEGER] +triggerer_id + + [INTEGER] trigger--asset_trigger - -0..N -1 + +0..N +1 task_instance - -task_instance + +task_instance + +id + + [UUID] + NOT NULL -id - - [UUID] - NOT NULL +custom_operator_name + + [VARCHAR(1000)] -custom_operator_name - - [VARCHAR(1000)] +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +dag_version_id + + [UUID] -dag_version_id - - [UUID] +duration + + [DOUBLE_PRECISION] -duration - - [DOUBLE_PRECISION] +end_date + + [TIMESTAMP] -end_date - - [TIMESTAMP] +executor + + [VARCHAR(1000)] -executor - - [VARCHAR(1000)] +executor_config + + [BYTEA] -executor_config - - [BYTEA] +external_executor_id + + [VARCHAR(250)] -external_executor_id - - [VARCHAR(250)] +hostname + + [VARCHAR(1000)] -hostname - - [VARCHAR(1000)] +last_heartbeat_at + + [TIMESTAMP] -last_heartbeat_at - - [TIMESTAMP] +map_index + + [INTEGER] + NOT NULL -map_index - - [INTEGER] - NOT NULL +max_tries + + [INTEGER] -max_tries - - [INTEGER] +next_kwargs + + [JSON] -next_kwargs - - [JSON] +next_method + + [VARCHAR(1000)] -next_method - - [VARCHAR(1000)] +operator + + [VARCHAR(1000)] -operator - - [VARCHAR(1000)] +pid + + [INTEGER] -pid - - [INTEGER] +pool + + [VARCHAR(256)] + NOT NULL -pool - - [VARCHAR(256)] - NOT NULL +pool_slots + + [INTEGER] + NOT NULL -pool_slots - - [INTEGER] - NOT NULL +priority_weight + + [INTEGER] -priority_weight - - [INTEGER] +queue + + [VARCHAR(256)] -queue - - [VARCHAR(256)] +queued_by_job_id + + [INTEGER] -queued_by_job_id - - [INTEGER] +queued_dttm + + [TIMESTAMP] -queued_dttm - - [TIMESTAMP] +rendered_map_index + + [VARCHAR(250)] -rendered_map_index - - [VARCHAR(250)] +run_id + + [VARCHAR(250)] + NOT NULL -run_id - - [VARCHAR(250)] - NOT NULL +start_date + + [TIMESTAMP] -start_date - - [TIMESTAMP] +state + + [VARCHAR(20)] -state - - [VARCHAR(20)] +task_display_name + + [VARCHAR(2000)] -task_display_name - - [VARCHAR(2000)] +task_id + + [VARCHAR(250)] + NOT NULL -task_id - - [VARCHAR(250)] - NOT NULL +trigger_id + + [INTEGER] -trigger_id - - [INTEGER] +trigger_timeout + + [TIMESTAMP] -trigger_timeout - - [TIMESTAMP] +try_number + + [INTEGER] -try_number - - [INTEGER] +unixname + + [VARCHAR(1000)] -unixname - - [VARCHAR(1000)] - -updated_at - - [TIMESTAMP] +updated_at + + [TIMESTAMP] trigger--task_instance - -0..N -{0,1} + +0..N +{0,1} task_reschedule - -task_reschedule + +task_reschedule + +id + + [INTEGER] + NOT NULL -id - - [INTEGER] - NOT NULL +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +duration + + [INTEGER] + NOT NULL -duration - - [INTEGER] - NOT NULL +end_date + + [TIMESTAMP] + NOT NULL -end_date - - [TIMESTAMP] - NOT NULL +map_index + + [INTEGER] + NOT NULL -map_index - - [INTEGER] - NOT NULL +reschedule_date + + [TIMESTAMP] + NOT NULL -reschedule_date - - [TIMESTAMP] - NOT NULL +run_id + + [VARCHAR(250)] + NOT NULL -run_id - - [VARCHAR(250)] - NOT NULL +start_date + + [TIMESTAMP] + NOT NULL -start_date - - [TIMESTAMP] - NOT NULL +task_id + + [VARCHAR(250)] + NOT NULL -task_id - - [VARCHAR(250)] - NOT NULL - -try_number - - [INTEGER] - NOT NULL +try_number + + [INTEGER] + NOT NULL task_instance--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 rendered_task_instance_fields - -rendered_task_instance_fields + +rendered_task_instance_fields + +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +map_index + + [INTEGER] + NOT NULL -map_index - - [INTEGER] - NOT NULL +run_id + + [VARCHAR(250)] + NOT NULL -run_id - - [VARCHAR(250)] - NOT NULL +task_id + + [VARCHAR(250)] + NOT NULL -task_id - - [VARCHAR(250)] - NOT NULL +k8s_pod_yaml + + [JSON] -k8s_pod_yaml - - [JSON] - -rendered_fields - - [JSON] - NOT NULL +rendered_fields + + [JSON] + NOT NULL task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_map - -task_map + +task_map + +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +map_index + + [INTEGER] + NOT NULL -map_index - - [INTEGER] - NOT NULL +run_id + + [VARCHAR(250)] + NOT NULL -run_id - - [VARCHAR(250)] - NOT NULL +task_id + + [VARCHAR(250)] + NOT NULL -task_id - - [VARCHAR(250)] - NOT NULL +keys + + [JSON] -keys - - [JSON] - -length - - [INTEGER] - NOT NULL +length + + [INTEGER] + NOT NULL task_instance--task_map - -0..N -1 + +0..N +1 task_instance--task_map - -0..N -1 + +0..N +1 task_instance--task_map - -0..N -1 + +0..N +1 task_instance--task_map - -0..N -1 + +0..N +1 xcom - -xcom + +xcom + +dag_run_id + + [INTEGER] + NOT NULL -dag_run_id - - [INTEGER] - NOT NULL +key + + [VARCHAR(512)] + NOT NULL -key - - [VARCHAR(512)] - NOT NULL +map_index + + [INTEGER] + NOT NULL -map_index - - [INTEGER] - NOT NULL +task_id + + [VARCHAR(250)] + NOT NULL -task_id - - [VARCHAR(250)] - NOT NULL +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +run_id + + [VARCHAR(250)] + NOT NULL -run_id - - [VARCHAR(250)] - NOT NULL +timestamp + + [TIMESTAMP] + NOT NULL -timestamp - - [TIMESTAMP] - NOT NULL - -value - - [JSONB] +value + + [JSONB] task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance_note - -task_instance_note + +task_instance_note + +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +map_index + + [INTEGER] + NOT NULL -map_index - - [INTEGER] - NOT NULL +run_id + + [VARCHAR(250)] + NOT NULL -run_id - - [VARCHAR(250)] - NOT NULL +task_id + + [VARCHAR(250)] + NOT NULL -task_id - - [VARCHAR(250)] - NOT NULL +content + + [VARCHAR(1000)] -content - - [VARCHAR(1000)] +created_at + + [TIMESTAMP] + NOT NULL -created_at - - [TIMESTAMP] - NOT NULL +updated_at + + [TIMESTAMP] + NOT NULL -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] +user_id + + [VARCHAR(128)] task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance_history - -task_instance_history + +task_instance_history + +id + + [INTEGER] + NOT NULL -id - - [INTEGER] - NOT NULL +custom_operator_name + + [VARCHAR(1000)] -custom_operator_name - - [VARCHAR(1000)] +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +dag_version_id + + [UUID] -dag_version_id - - [UUID] +duration + + [DOUBLE_PRECISION] -duration - - [DOUBLE_PRECISION] +end_date + + [TIMESTAMP] -end_date - - [TIMESTAMP] +executor + + [VARCHAR(1000)] -executor - - [VARCHAR(1000)] +executor_config + + [BYTEA] -executor_config - - [BYTEA] +external_executor_id + + [VARCHAR(250)] -external_executor_id - - [VARCHAR(250)] +hostname + + [VARCHAR(1000)] -hostname - - [VARCHAR(1000)] +map_index + + [INTEGER] + NOT NULL -map_index - - [INTEGER] - NOT NULL +max_tries + + [INTEGER] -max_tries - - [INTEGER] +next_kwargs + + [JSON] -next_kwargs - - [JSON] +next_method + + [VARCHAR(1000)] -next_method - - [VARCHAR(1000)] +operator + + [VARCHAR(1000)] -operator - - [VARCHAR(1000)] +pid + + [INTEGER] -pid - - [INTEGER] +pool + + [VARCHAR(256)] + NOT NULL -pool - - [VARCHAR(256)] - NOT NULL +pool_slots + + [INTEGER] + NOT NULL -pool_slots - - [INTEGER] - NOT NULL +priority_weight + + [INTEGER] -priority_weight - - [INTEGER] +queue + + [VARCHAR(256)] -queue - - [VARCHAR(256)] +queued_by_job_id + + [INTEGER] -queued_by_job_id - - [INTEGER] +queued_dttm + + [TIMESTAMP] -queued_dttm - - [TIMESTAMP] +rendered_map_index + + [VARCHAR(250)] -rendered_map_index - - [VARCHAR(250)] +run_id + + [VARCHAR(250)] + NOT NULL -run_id - - [VARCHAR(250)] - NOT NULL +start_date + + [TIMESTAMP] -start_date - - [TIMESTAMP] +state + + [VARCHAR(20)] -state - - [VARCHAR(20)] +task_display_name + + [VARCHAR(2000)] -task_display_name - - [VARCHAR(2000)] +task_id + + [VARCHAR(250)] + NOT NULL -task_id - - [VARCHAR(250)] - NOT NULL +trigger_id + + [INTEGER] -trigger_id - - [INTEGER] +trigger_timeout + + [TIMESTAMP] -trigger_timeout - - [TIMESTAMP] +try_number + + [INTEGER] + NOT NULL -try_number - - [INTEGER] - NOT NULL +unixname + + [VARCHAR(1000)] -unixname - - [VARCHAR(1000)] - -updated_at - - [TIMESTAMP] +updated_at + + [TIMESTAMP] task_instance--task_instance_history - -0..N -1 + +0..N +1 task_instance--task_instance_history - -0..N -1 + +0..N +1 task_instance--task_instance_history - -0..N -1 + +0..N +1 task_instance--task_instance_history - -0..N -1 + +0..N +1 dag_bundle - -dag_bundle + +dag_bundle + +name + + [VARCHAR(250)] + NOT NULL -name - - [VARCHAR(250)] - NOT NULL +active + + [BOOLEAN] -active - - [BOOLEAN] +last_refreshed + + [TIMESTAMP] -last_refreshed - - [TIMESTAMP] - -latest_version - - [VARCHAR(200)] +latest_version + + [VARCHAR(200)] dag - -dag + +dag + +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +asset_expression + + [JSON] -asset_expression - - [JSON] +bundle_name + + [VARCHAR(250)] -bundle_name - - [VARCHAR(250)] +dag_display_name + + [VARCHAR(2000)] -dag_display_name - - [VARCHAR(2000)] +default_view + + [VARCHAR(25)] -default_view - - [VARCHAR(25)] +description + + [TEXT] -description - - [TEXT] +fileloc + + [VARCHAR(2000)] -fileloc - - [VARCHAR(2000)] +has_import_errors + + [BOOLEAN] -has_import_errors - - [BOOLEAN] +has_task_concurrency_limits + + [BOOLEAN] + NOT NULL -has_task_concurrency_limits - - [BOOLEAN] - NOT NULL +is_active + + [BOOLEAN] -is_active - - [BOOLEAN] +is_paused + + [BOOLEAN] -is_paused - - [BOOLEAN] +last_expired + + [TIMESTAMP] -last_expired - - [TIMESTAMP] +last_parsed_time + + [TIMESTAMP] -last_parsed_time - - [TIMESTAMP] +latest_bundle_version + + [VARCHAR(200)] -latest_bundle_version - - [VARCHAR(200)] +max_active_runs + + [INTEGER] -max_active_runs - - [INTEGER] +max_active_tasks + + [INTEGER] + NOT NULL -max_active_tasks - - [INTEGER] - NOT NULL +max_consecutive_failed_dag_runs + + [INTEGER] + NOT NULL -max_consecutive_failed_dag_runs - - [INTEGER] - NOT NULL +next_dagrun + + [TIMESTAMP] -next_dagrun - - [TIMESTAMP] +next_dagrun_create_after + + [TIMESTAMP] -next_dagrun_create_after - - [TIMESTAMP] +next_dagrun_data_interval_end + + [TIMESTAMP] -next_dagrun_data_interval_end - - [TIMESTAMP] +next_dagrun_data_interval_start + + [TIMESTAMP] -next_dagrun_data_interval_start - - [TIMESTAMP] +owners + + [VARCHAR(2000)] -owners - - [VARCHAR(2000)] +timetable_description + + [VARCHAR(1000)] -timetable_description - - [VARCHAR(1000)] - -timetable_summary - - [TEXT] +timetable_summary + + [TEXT] dag_bundle--dag - -0..N -{0,1} + +0..N +{0,1} dag--dag_schedule_asset_alias_reference - -0..N -1 + +0..N +1 dag--dag_schedule_asset_reference - -0..N -1 + +0..N +1 dag--task_outlet_asset_reference - -0..N -1 + +0..N +1 dag--asset_dag_run_queue - -0..N -1 + +0..N +1 dag_schedule_asset_name_reference - -dag_schedule_asset_name_reference + +dag_schedule_asset_name_reference + +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +name + + [VARCHAR(1500)] + NOT NULL -name - - [VARCHAR(1500)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL +created_at + + [TIMESTAMP] + NOT NULL dag--dag_schedule_asset_name_reference - -0..N -1 + +0..N +1 dag_schedule_asset_uri_reference - -dag_schedule_asset_uri_reference + +dag_schedule_asset_uri_reference + +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +uri + + [VARCHAR(1500)] + NOT NULL -uri - - [VARCHAR(1500)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL +created_at + + [TIMESTAMP] + NOT NULL dag--dag_schedule_asset_uri_reference - -0..N -1 + +0..N +1 dag_version - -dag_version + +dag_version + +id + + [UUID] + NOT NULL -id - - [UUID] - NOT NULL +created_at + + [TIMESTAMP] + NOT NULL -created_at - - [TIMESTAMP] - NOT NULL +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL - -version_number - - [INTEGER] - NOT NULL +version_number + + [INTEGER] + NOT NULL dag--dag_version - -0..N -1 + +0..N +1 dag_tag - -dag_tag + +dag_tag + +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL - -name - - [VARCHAR(100)] - NOT NULL +name + + [VARCHAR(100)] + NOT NULL dag--dag_tag - -0..N -1 + +0..N +1 dag_owner_attributes - -dag_owner_attributes + +dag_owner_attributes + +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +owner + + [VARCHAR(500)] + NOT NULL -owner - - [VARCHAR(500)] - NOT NULL - -link - - [VARCHAR(500)] - NOT NULL +link + + [VARCHAR(500)] + NOT NULL dag--dag_owner_attributes - -0..N -1 + +0..N +1 dag_warning - -dag_warning + +dag_warning + +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +warning_type + + [VARCHAR(50)] + NOT NULL -warning_type - - [VARCHAR(50)] - NOT NULL +message + + [TEXT] + NOT NULL -message - - [TEXT] - NOT NULL - -timestamp - - [TIMESTAMP] - NOT NULL +timestamp + + [TIMESTAMP] + NOT NULL dag--dag_warning - -0..N -1 + +0..N +1 deadline - -deadline + +deadline + +id + + [UUID] + NOT NULL -id - - [UUID] - NOT NULL +callback + + [VARCHAR(500)] + NOT NULL -callback - - [VARCHAR(500)] - NOT NULL +callback_kwargs + + [JSON] -callback_kwargs - - [JSON] +dag_id + + [VARCHAR(250)] -dag_id - - [VARCHAR(250)] +dagrun_id + + [INTEGER] -dagrun_id - - [INTEGER] - -deadline - - [TIMESTAMP] - NOT NULL +deadline + + [TIMESTAMP] + NOT NULL dag--deadline - -0..N -{0,1} + +0..N +{0,1} dag_version--task_instance - -0..N -{0,1} + +0..N +{0,1} dag_run - -dag_run + +dag_run + +id + + [INTEGER] + NOT NULL -id - - [INTEGER] - NOT NULL +backfill_id + + [INTEGER] -backfill_id - - [INTEGER] +clear_number + + [INTEGER] + NOT NULL -clear_number - - [INTEGER] - NOT NULL +conf + + [BYTEA] -conf - - [BYTEA] +creating_job_id + + [INTEGER] -creating_job_id - - [INTEGER] +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +dag_version_id + + [UUID] -dag_version_id - - [UUID] +data_interval_end + + [TIMESTAMP] -data_interval_end - - [TIMESTAMP] +data_interval_start + + [TIMESTAMP] -data_interval_start - - [TIMESTAMP] +end_date + + [TIMESTAMP] -end_date - - [TIMESTAMP] +external_trigger + + [BOOLEAN] -external_trigger - - [BOOLEAN] +last_scheduling_decision + + [TIMESTAMP] -last_scheduling_decision - - [TIMESTAMP] +log_template_id + + [INTEGER] -log_template_id - - [INTEGER] +logical_date + + [TIMESTAMP] + NOT NULL -logical_date - - [TIMESTAMP] - NOT NULL +queued_at + + [TIMESTAMP] -queued_at - - [TIMESTAMP] +run_id + + [VARCHAR(250)] + NOT NULL -run_id - - [VARCHAR(250)] - NOT NULL +run_type + + [VARCHAR(50)] + NOT NULL -run_type - - [VARCHAR(50)] - NOT NULL +start_date + + [TIMESTAMP] -start_date - - [TIMESTAMP] +state + + [VARCHAR(50)] -state - - [VARCHAR(50)] +triggered_by + + [VARCHAR(50)] -triggered_by - - [VARCHAR(50)] - -updated_at - - [TIMESTAMP] +updated_at + + [TIMESTAMP] dag_version--dag_run - -0..N -{0,1} + +0..N +{0,1} dag_code - -dag_code + +dag_code + +id + + [UUID] + NOT NULL -id - - [UUID] - NOT NULL +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +dag_version_id + + [UUID] + NOT NULL -dag_version_id - - [UUID] - NOT NULL +fileloc + + [VARCHAR(2000)] + NOT NULL -fileloc - - [VARCHAR(2000)] - NOT NULL +last_updated + + [TIMESTAMP] + NOT NULL -last_updated - - [TIMESTAMP] - NOT NULL +source_code + + [TEXT] + NOT NULL -source_code - - [TEXT] - NOT NULL - -source_code_hash - - [VARCHAR(32)] - NOT NULL +source_code_hash + + [VARCHAR(32)] + NOT NULL dag_version--dag_code - -0..N -1 + +0..N +1 serialized_dag - -serialized_dag + +serialized_dag + +id + + [UUID] + NOT NULL -id - - [UUID] - NOT NULL +created_at + + [TIMESTAMP] + NOT NULL -created_at - - [TIMESTAMP] - NOT NULL +dag_hash + + [VARCHAR(32)] + NOT NULL -dag_hash - - [VARCHAR(32)] - NOT NULL +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +dag_version_id + + [UUID] + NOT NULL -dag_version_id - - [UUID] - NOT NULL +data + + [JSON] -data - - [JSON] - -data_compressed - - [BYTEA] +data_compressed + + [BYTEA] dag_version--serialized_dag - -0..N -1 + +0..N +1 dag_run--dagrun_asset_event - -0..N -1 + +0..N +1 dag_run--task_instance - -0..N -1 + +0..N +1 dag_run--task_instance - -0..N -1 + +0..N +1 dag_run--deadline - -0..N -{0,1} + +0..N +{0,1} backfill_dag_run - -backfill_dag_run + +backfill_dag_run + +id + + [INTEGER] + NOT NULL -id - - [INTEGER] - NOT NULL +backfill_id + + [INTEGER] + NOT NULL -backfill_id - - [INTEGER] - NOT NULL +dag_run_id + + [INTEGER] -dag_run_id - - [INTEGER] +exception_reason + + [VARCHAR(250)] -exception_reason - - [VARCHAR(250)] +logical_date + + [TIMESTAMP] + NOT NULL -logical_date +sort_ordinal - [TIMESTAMP] - NOT NULL - -sort_ordinal - - [INTEGER] - NOT NULL + [INTEGER] + NOT NULL dag_run--backfill_dag_run - -0..N -{0,1} + +0..N +{0,1} dag_run_note - -dag_run_note + +dag_run_note + +dag_run_id + + [INTEGER] + NOT NULL -dag_run_id - - [INTEGER] - NOT NULL +content + + [VARCHAR(1000)] -content - - [VARCHAR(1000)] +created_at + + [TIMESTAMP] + NOT NULL -created_at - - [TIMESTAMP] - NOT NULL +updated_at + + [TIMESTAMP] + NOT NULL -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] +user_id + + [VARCHAR(128)] dag_run--dag_run_note - -1 -1 + +1 +1 dag_run--task_reschedule - -0..N -1 + +0..N +1 dag_run--task_reschedule - -0..N -1 + +0..N +1 log_template - -log_template + +log_template + +id + + [INTEGER] + NOT NULL -id - - [INTEGER] - NOT NULL +created_at + + [TIMESTAMP] + NOT NULL -created_at - - [TIMESTAMP] - NOT NULL +elasticsearch_id + + [TEXT] + NOT NULL -elasticsearch_id - - [TEXT] - NOT NULL - -filename - - [TEXT] - NOT NULL +filename + + [TEXT] + NOT NULL log_template--dag_run - -0..N -{0,1} + +0..N +{0,1} backfill - -backfill + +backfill + +id + + [INTEGER] + NOT NULL -id - - [INTEGER] - NOT NULL +completed_at + + [TIMESTAMP] -completed_at - - [TIMESTAMP] +created_at + + [TIMESTAMP] + NOT NULL -created_at - - [TIMESTAMP] - NOT NULL +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +dag_run_conf + + [JSON] + NOT NULL -dag_run_conf - - [JSON] - NOT NULL +from_date + + [TIMESTAMP] + NOT NULL -from_date +is_paused - [TIMESTAMP] - NOT NULL + [BOOLEAN] -is_paused - - [BOOLEAN] +max_active_runs + + [INTEGER] + NOT NULL -max_active_runs - - [INTEGER] - NOT NULL +reprocess_behavior + + [VARCHAR(250)] + NOT NULL -reprocess_behavior - - [VARCHAR(250)] - NOT NULL +to_date + + [TIMESTAMP] + NOT NULL -to_date - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL +updated_at + + [TIMESTAMP] + NOT NULL backfill--dag_run - -0..N -{0,1} + +0..N +{0,1} backfill--backfill_dag_run - -0..N -1 + +0..N +1 session - -session + +session + +id + + [INTEGER] + NOT NULL -id - - [INTEGER] - NOT NULL +data + + [BYTEA] -data - - [BYTEA] +expiry + + [TIMESTAMP] -expiry - - [TIMESTAMP] - -session_id - - [VARCHAR(255)] +session_id + + [VARCHAR(255)] alembic_version - -alembic_version - -version_num - - [VARCHAR(32)] - NOT NULL + +alembic_version + +version_num + + [VARCHAR(32)] + NOT NULL ab_user - -ab_user + +ab_user + +id + + [INTEGER] + NOT NULL -id - - [INTEGER] - NOT NULL +active + + [BOOLEAN] -active - - [BOOLEAN] +changed_by_fk + + [INTEGER] -changed_by_fk - - [INTEGER] +changed_on + + [TIMESTAMP] -changed_on - - [TIMESTAMP] +created_by_fk + + [INTEGER] -created_by_fk - - [INTEGER] +created_on + + [TIMESTAMP] -created_on - - [TIMESTAMP] +email + + [VARCHAR(512)] + NOT NULL -email - - [VARCHAR(512)] - NOT NULL +fail_login_count + + [INTEGER] -fail_login_count - - [INTEGER] +first_name + + [VARCHAR(256)] + NOT NULL -first_name - - [VARCHAR(256)] - NOT NULL +last_login + + [TIMESTAMP] -last_login - - [TIMESTAMP] +last_name + + [VARCHAR(256)] + NOT NULL -last_name - - [VARCHAR(256)] - NOT NULL +login_count + + [INTEGER] -login_count - - [INTEGER] +password + + [VARCHAR(256)] -password - - [VARCHAR(256)] - -username - - [VARCHAR(512)] - NOT NULL +username + + [VARCHAR(512)] + NOT NULL ab_user--ab_user - -0..N -{0,1} + +0..N +{0,1} ab_user--ab_user - -0..N -{0,1} + +0..N +{0,1} ab_user_role - -ab_user_role + +ab_user_role + +id + + [INTEGER] + NOT NULL -id - - [INTEGER] - NOT NULL +role_id + + [INTEGER] -role_id - - [INTEGER] - -user_id - - [INTEGER] +user_id + + [INTEGER] ab_user--ab_user_role - -0..N -{0,1} + +0..N +{0,1} ab_register_user - -ab_register_user + +ab_register_user + +id + + [INTEGER] + NOT NULL -id - - [INTEGER] - NOT NULL +email + + [VARCHAR(512)] + NOT NULL -email - - [VARCHAR(512)] - NOT NULL +first_name + + [VARCHAR(256)] + NOT NULL -first_name - - [VARCHAR(256)] - NOT NULL +last_name + + [VARCHAR(256)] + NOT NULL -last_name - - [VARCHAR(256)] - NOT NULL +password + + [VARCHAR(256)] -password - - [VARCHAR(256)] +registration_date + + [TIMESTAMP] -registration_date - - [TIMESTAMP] +registration_hash + + [VARCHAR(256)] -registration_hash - - [VARCHAR(256)] - -username - - [VARCHAR(512)] - NOT NULL +username + + [VARCHAR(512)] + NOT NULL ab_permission - -ab_permission + +ab_permission + +id + + [INTEGER] + NOT NULL -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(100)] - NOT NULL +name + + [VARCHAR(100)] + NOT NULL ab_permission_view - -ab_permission_view + +ab_permission_view + +id + + [INTEGER] + NOT NULL -id - - [INTEGER] - NOT NULL +permission_id + + [INTEGER] -permission_id - - [INTEGER] - -view_menu_id - - [INTEGER] +view_menu_id + + [INTEGER] ab_permission--ab_permission_view - -0..N -{0,1} + +0..N +{0,1} ab_permission_view_role - -ab_permission_view_role + +ab_permission_view_role + +id + + [INTEGER] + NOT NULL -id - - [INTEGER] - NOT NULL +permission_view_id + + [INTEGER] -permission_view_id - - [INTEGER] - -role_id - - [INTEGER] +role_id + + [INTEGER] ab_permission_view--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} ab_view_menu - -ab_view_menu + +ab_view_menu + +id + + [INTEGER] + NOT NULL -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(250)] - NOT NULL +name + + [VARCHAR(250)] + NOT NULL ab_view_menu--ab_permission_view - -0..N -{0,1} + +0..N +{0,1} ab_role - -ab_role + +ab_role + +id + + [INTEGER] + NOT NULL -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(64)] - NOT NULL +name + + [VARCHAR(64)] + NOT NULL ab_role--ab_user_role - -0..N -{0,1} + +0..N +{0,1} ab_role--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} alembic_version_fab - -alembic_version_fab - -version_num - - [VARCHAR(32)] - NOT NULL + +alembic_version_fab + +version_num + + [VARCHAR(32)] + NOT NULL diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index 166dd0183a6a0..651c15a9088ba 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=========================+==================+===================+==============================================================+ -| ``38770795785f`` (head) | ``5c9c0231baa2`` | ``3.0.0`` | Add asset reference models. | +| ``03de77aaa4ec`` (head) | ``38770795785f`` | ``3.0.0`` | add bundle_name to ParseImportError. | ++-------------------------+------------------+-------------------+--------------------------------------------------------------+ +| ``38770795785f`` | ``5c9c0231baa2`` | ``3.0.0`` | Add asset reference models. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``5c9c0231baa2`` | ``237cef8dfea1`` | ``3.0.0`` | Remove processor_subdir. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ diff --git a/tests/dag_processing/test_collection.py b/tests/dag_processing/test_collection.py index 13d3f571f5e53..6a4d9ee62ef43 100644 --- a/tests/dag_processing/test_collection.py +++ b/tests/dag_processing/test_collection.py @@ -329,9 +329,11 @@ def test_new_import_error_replaces_old(self, session, dag_import_error_listener, Test that existing import error is updated and new record not created for a dag with the same filename """ + bundle_name = "testing" filename = "abc.py" prev_error = ParseImportError( filename=filename, + bundle_name=bundle_name, timestamp=tz.utcnow(), stacktrace="Some error", ) @@ -340,7 +342,7 @@ def test_new_import_error_replaces_old(self, session, dag_import_error_listener, prev_error_id = prev_error.id update_dag_parsing_results_in_db( - bundle_name="testing", + bundle_name=bundle_name, bundle_version=None, dags=[], import_errors={"abc.py": "New error"}, @@ -348,7 +350,11 @@ def test_new_import_error_replaces_old(self, session, dag_import_error_listener, session=session, ) - import_error = session.query(ParseImportError).filter(ParseImportError.filename == filename).one() + import_error = ( + session.query(ParseImportError) + .filter(ParseImportError.filename == filename, ParseImportError.bundle_name == bundle_name) + .one() + ) # assert that the ID of the import error did not change assert import_error.id == prev_error_id @@ -361,9 +367,11 @@ def test_new_import_error_replaces_old(self, session, dag_import_error_listener, def test_remove_error_clears_import_error(self, testing_dag_bundle, session): # Pre-condition: there is an import error for the dag file + bundle_name = "testing" filename = "abc.py" prev_error = ParseImportError( filename=filename, + bundle_name=bundle_name, timestamp=tz.utcnow(), stacktrace="Some error", ) @@ -373,6 +381,7 @@ def test_remove_error_clears_import_error(self, testing_dag_bundle, session): session.add( ParseImportError( filename="def.py", + bundle_name=bundle_name, timestamp=tz.utcnow(), stacktrace="Some error", ) @@ -380,21 +389,21 @@ def test_remove_error_clears_import_error(self, testing_dag_bundle, session): session.flush() # Sanity check of pre-condition - import_errors = set(session.scalars(select(ParseImportError.filename))) - assert import_errors == {"abc.py", "def.py"} + import_errors = set(session.execute(select(ParseImportError.filename, ParseImportError.bundle_name))) + assert import_errors == {("abc.py", bundle_name), ("def.py", bundle_name)} dag = DAG(dag_id="test") dag.fileloc = filename import_errors = {} - update_dag_parsing_results_in_db("testing", None, [dag], import_errors, set(), session) + update_dag_parsing_results_in_db(bundle_name, None, [dag], import_errors, set(), session) dag_model: DagModel = session.get(DagModel, (dag.dag_id,)) assert dag_model.has_import_errors is False - import_errors = set(session.scalars(select(ParseImportError.filename))) + import_errors = set(session.execute(select(ParseImportError.filename, ParseImportError.bundle_name))) - assert import_errors == {"def.py"} + assert import_errors == {("def.py", bundle_name)} def test_sync_perm_for_dag_with_dict_access_control(self, session, spy_agency: SpyAgency): """