diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 5ca17e63ee2d4..b7526eff8372a 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2654,3 +2654,34 @@ usage_data_collection: example: ~ default: "True" see_also: ":ref:`Usage data collection FAQ `" +dag_bundles: + description: | + Configuration for the DAG bundles. This allows Airflow to load DAGs from different sources. + + Airflow will consume all options added to this section. Below you will see only the default, + ``dags_folder``. The option name is the bundle name and the value is a json object with the following + keys: + + * classpath: The classpath of the bundle class + * kwargs: The keyword arguments to pass to the bundle class + * refresh_interval: The interval in seconds to refresh the bundle from its source. + + For example, to add a new bundle named ``hello`` to my Airflow instance, add the following to your + airflow.cfg (this is just an example, the classpath and kwargs are not real): + + .. code-block:: ini + + [dag_bundles] + hello: {classpath: "airflow.some.classpath", kwargs: {"hello": "world"}, refresh_interval: 60} + options: + dags_folder: + description: | + This is the default DAG bundle that loads DAGs from the traditional ``[core] dags_folder``. + By default, ``refresh_interval`` is set to ``[scheduler] dag_dir_list_interval``, but that can be + overridden here if desired. + Parsing DAGs from the DAG folder can be disabled by setting this option to an empty string. + version_added: ~ + type: string + example: ~ + default: '{{"classpath": "airflow.dag_processing.bundles.dagfolder.DagsFolderDagBundle", + "kwargs": {{}}}}' diff --git a/airflow/dag_processing/bundles/base.py b/airflow/dag_processing/bundles/base.py index f3fdb1aed9476..a0061d97e8f21 100644 --- a/airflow/dag_processing/bundles/base.py +++ b/airflow/dag_processing/bundles/base.py @@ -40,14 +40,16 @@ class BaseDagBundle(ABC): multiple versions of the same bundle in use at the same time. The DAG processor will always use the latest version. :param name: String identifier for the DAG bundle + :param refresh_interval: How often the bundle should be refreshed from the source (in seconds) :param version: Version of the DAG bundle (Optional) """ supports_versioning: bool = False - def __init__(self, *, name: str, version: str | None = None) -> None: + def __init__(self, *, name: str, refresh_interval: int, version: str | None = None) -> None: self.name = name self.version = version + self.refresh_interval = refresh_interval @property def _dag_bundle_root_storage_path(self) -> Path: diff --git a/airflow/dag_processing/bundles/dagfolder.py b/airflow/dag_processing/bundles/dagfolder.py index 40ba649644558..92087d6a63798 100644 --- a/airflow/dag_processing/bundles/dagfolder.py +++ b/airflow/dag_processing/bundles/dagfolder.py @@ -18,11 +18,19 @@ from __future__ import annotations from airflow import settings +from airflow.configuration import conf from airflow.dag_processing.bundles.local import LocalDagBundle class DagsFolderDagBundle(LocalDagBundle): """A bundle for the DAGs folder.""" - def __init__(self, **kwargs): - super().__init__(local_folder=settings.DAGS_FOLDER, **kwargs) + def __init__(self, refresh_interval: int | None = None, **kwargs): + if refresh_interval is None: + refresh_interval = conf.getint("scheduler", "dag_dir_list_interval") + + super().__init__( + local_folder=settings.DAGS_FOLDER, + refresh_interval=refresh_interval, + **kwargs, + ) diff --git a/airflow/dag_processing/bundles/manager.py b/airflow/dag_processing/bundles/manager.py new file mode 100644 index 0000000000000..4f8b59b956e18 --- /dev/null +++ b/airflow/dag_processing/bundles/manager.py @@ -0,0 +1,96 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING + +from airflow.configuration import conf +from airflow.exceptions import AirflowConfigException +from airflow.models.dagbundle import DagBundleModel +from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.module_loading import import_string +from airflow.utils.session import NEW_SESSION, provide_session + +if TYPE_CHECKING: + from sqlalchemy.orm import Session + + from airflow.dag_processing.bundles.base import BaseDagBundle + + +class DagBundlesManager(LoggingMixin): + """Manager for DAG bundles.""" + + @property + def bundle_configs(self) -> dict[str, dict]: + """Get all DAG bundle configurations.""" + configured_bundles = conf.getsection("dag_bundles") + + if not configured_bundles: + return {} + + # If dags_folder is empty string, we remove it. This allows the default dags_folder bundle to be disabled. + if not configured_bundles["dags_folder"]: + del configured_bundles["dags_folder"] + + dict_bundles: dict[str, dict] = {} + for key in configured_bundles.keys(): + config = conf.getjson("dag_bundles", key) + if not isinstance(config, dict): + raise AirflowConfigException(f"Bundle config for {key} is not a dict: {config}") + dict_bundles[key] = config + + return dict_bundles + + @provide_session + def sync_bundles_to_db(self, *, session: Session = NEW_SESSION) -> None: + known_bundles = {b.name: b for b in session.query(DagBundleModel).all()} + + for name in self.bundle_configs.keys(): + if bundle := known_bundles.get(name): + bundle.active = True + else: + session.add(DagBundleModel(name=name)) + self.log.info("Added new DAG bundle %s to the database", name) + + for name, bundle in known_bundles.items(): + if name not in self.bundle_configs: + bundle.active = False + self.log.warning("DAG bundle %s is no longer found in config and has been disabled", name) + + def get_all_dag_bundles(self) -> list[BaseDagBundle]: + """ + Get all DAG bundles. + + :param session: A database session. + + :return: list of DAG bundles. + """ + return [self.get_bundle(name, version=None) for name in self.bundle_configs.keys()] + + def get_bundle(self, name: str, version: str | None = None) -> BaseDagBundle: + """ + Get a DAG bundle by name. + + :param name: The name of the DAG bundle. + :param version: The version of the DAG bundle you need (optional). If not provided, ``tracking_ref`` will be used instead. + + :return: The DAG bundle. + """ + # TODO: proper validation of the bundle configuration so we have better error messages + bundle_config = self.bundle_configs[name] + bundle_class = import_string(bundle_config["classpath"]) + return bundle_class(name=name, version=version, **bundle_config["kwargs"]) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 57c69238a1f7e..38aa82fcda803 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -399,6 +399,10 @@ def start(self): "Checking for new files in %s every %s seconds", self._dag_directory, self.dag_dir_list_interval ) + from airflow.dag_processing.bundles.manager import DagBundlesManager + + DagBundlesManager().sync_bundles_to_db() + return self._run_parsing_loop() def _scan_stale_dags(self): diff --git a/airflow/migrations/versions/0050_3_0_0_add_dagbundlemodel.py b/airflow/migrations/versions/0050_3_0_0_add_dagbundlemodel.py index f95eb3f25df30..7e6cb756c5cb2 100644 --- a/airflow/migrations/versions/0050_3_0_0_add_dagbundlemodel.py +++ b/airflow/migrations/versions/0050_3_0_0_add_dagbundlemodel.py @@ -28,10 +28,8 @@ import sqlalchemy as sa from alembic import op -from sqlalchemy_utils import UUIDType -from airflow.models.base import StringID -from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime +from airflow.utils.sqlalchemy import UtcDateTime revision = "e229247a6cb1" down_revision = "eed27faa34e3" @@ -43,27 +41,24 @@ def upgrade(): op.create_table( "dag_bundle", - sa.Column("id", UUIDType(binary=False), nullable=False), - sa.Column("name", StringID(), nullable=False), - sa.Column("classpath", sa.String(length=1000), nullable=False), - sa.Column("kwargs", ExtendedJSON(), nullable=True), - sa.Column("refresh_interval", sa.Integer(), nullable=True), + sa.Column("name", sa.String(length=250), nullable=False), + sa.Column("active", sa.Boolean(), nullable=True), sa.Column("latest_version", sa.String(length=200), nullable=True), sa.Column("last_refreshed", UtcDateTime(timezone=True), nullable=True), - sa.PrimaryKeyConstraint("id", name=op.f("dag_bundle_pkey")), - sa.UniqueConstraint("name", name=op.f("dag_bundle_name_uq")), + sa.PrimaryKeyConstraint("name", name=op.f("dag_bundle_pkey")), ) with op.batch_alter_table("dag", schema=None) as batch_op: - batch_op.add_column(sa.Column("bundle_id", UUIDType(binary=False), nullable=True)) + batch_op.add_column(sa.Column("bundle_name", sa.String(length=250), nullable=True)) batch_op.add_column(sa.Column("latest_bundle_version", sa.String(length=200), nullable=True)) - batch_op.create_foreign_key(batch_op.f("dag_bundle_id_fkey"), "dag_bundle", ["bundle_id"], ["id"]) + batch_op.create_foreign_key( + batch_op.f("dag_bundle_name_fkey"), "dag_bundle", ["bundle_name"], ["name"] + ) def downgrade(): - """Unapply Add DagBundleModel.""" with op.batch_alter_table("dag", schema=None) as batch_op: - batch_op.drop_constraint(batch_op.f("dag_bundle_id_fkey"), type_="foreignkey") + batch_op.drop_constraint(batch_op.f("dag_bundle_name_fkey"), type_="foreignkey") batch_op.drop_column("latest_bundle_version") - batch_op.drop_column("bundle_id") + batch_op.drop_column("bundle_name") op.drop_table("dag_bundle") diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 6163580870c4e..1decc2db683ea 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -66,7 +66,6 @@ from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.orm import backref, relationship from sqlalchemy.sql import Select, expression -from sqlalchemy_utils import UUIDType from airflow import settings, utils from airflow.configuration import conf as airflow_conf, secrets_backend_list @@ -2028,7 +2027,7 @@ class DagModel(Base): fileloc = Column(String(2000)) # The base directory used by Dag Processor that parsed this dag. processor_subdir = Column(String(2000), nullable=True) - bundle_id = Column(UUIDType(binary=False), ForeignKey("dag_bundle.id"), nullable=True) + bundle_name = Column(StringID(), ForeignKey("dag_bundle.name"), nullable=True) # The version of the bundle the last time the DAG was parsed latest_bundle_version = Column(String(200), nullable=True) # String representing the owners diff --git a/airflow/models/dagbundle.py b/airflow/models/dagbundle.py index d4a628cbc8dd4..08429db0b0bcb 100644 --- a/airflow/models/dagbundle.py +++ b/airflow/models/dagbundle.py @@ -16,58 +16,28 @@ # under the License. from __future__ import annotations -from typing import TYPE_CHECKING - -import uuid6 -from sqlalchemy import Column, Integer, String -from sqlalchemy_utils import UUIDType +from sqlalchemy import Boolean, Column, String from airflow.models.base import Base, StringID -from airflow.utils.module_loading import import_string -from airflow.utils.session import NEW_SESSION, provide_session -from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime - -if TYPE_CHECKING: - from sqlalchemy.orm import Session - - from airflow.dag_processing.bundles.base import BaseDagBundle +from airflow.utils.sqlalchemy import UtcDateTime class DagBundleModel(Base): - """A table for DAG Bundle config.""" + """ + A table for storing DAG bundle metadata. + + We track the following information about each bundle, as it can be useful for + informational purposes and for debugging: + - active: Is the bundle currently found in configuration? + - latest_version: The latest version Airflow has seen for the bundle. + - last_refreshed: When the bundle was last refreshed. + """ __tablename__ = "dag_bundle" - id = Column(UUIDType(binary=False), primary_key=True, default=uuid6.uuid7) - name = Column(StringID(), nullable=False, unique=True) - classpath = Column(String(1000), nullable=False) - kwargs = Column(ExtendedJSON, nullable=True) - refresh_interval = Column(Integer, nullable=True) + name = Column(StringID(), primary_key=True) + active = Column(Boolean, default=True) latest_version = Column(String(200), nullable=True) last_refreshed = Column(UtcDateTime, nullable=True) - def __init__(self, *, name, classpath, kwargs, refresh_interval): + def __init__(self, *, name: str): self.name = name - self.classpath = classpath - self.kwargs = kwargs - self.refresh_interval = refresh_interval - - @classmethod - @provide_session - def get_all_dag_bundles( - cls, *, session: Session = NEW_SESSION - ) -> list[tuple[DagBundleModel, BaseDagBundle]]: - """ - Get all DAG bundles. - - :param session: A database session. - :return: list of DAG bundles. - """ - bundle_configs = session.query(cls).all() - - bundles = [] - for bundle_config in bundle_configs: - bundle_class = import_string(bundle_config.classpath) - bundle = bundle_class(name=bundle_config.name, **bundle_config.kwargs) - bundles.append((bundle_config, bundle)) - - return bundles diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 6ee5972cc1624..b503e8dfaf91a 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -ccb8ef5583b2a6b3ee3ab4212139c112b92953675655010a6775fffb4945b206 \ No newline at end of file +ba10504bc54d15b2faca37ae9db172848a498e471bbf332e031715f728158ff8 \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index 9c37f5c320686..9f19f0f920a5c 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -4,11 +4,11 @@ - - + + %3 - + log @@ -304,2137 +304,2123 @@ asset_alias - -asset_alias - -id - - [INTEGER] - NOT NULL - -group - - [VARCHAR(1500)] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL + +asset_alias + +id + + [INTEGER] + NOT NULL + +group + + [VARCHAR(1500)] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL asset_alias_asset - -asset_alias_asset - -alias_id - - [INTEGER] - NOT NULL - -asset_id - - [INTEGER] - NOT NULL + +asset_alias_asset + +alias_id + + [INTEGER] + NOT NULL + +asset_id + + [INTEGER] + NOT NULL asset_alias--asset_alias_asset - -0..N -1 + +0..N +1 asset_alias_asset_event - -asset_alias_asset_event - -alias_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL + +asset_alias_asset_event + +alias_id + + [INTEGER] + NOT NULL + +event_id + + [INTEGER] + NOT NULL asset_alias--asset_alias_asset_event - -0..N -1 + +0..N +1 dag_schedule_asset_alias_reference - -dag_schedule_asset_alias_reference - -alias_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_alias_reference + +alias_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset_alias--dag_schedule_asset_alias_reference - -0..N -1 + +0..N +1 asset - -asset - -id - - [INTEGER] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -extra - - [JSON] - NOT NULL - -group - - [VARCHAR(1500)] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL + +asset + +id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +extra + + [JSON] + NOT NULL + +group + + [VARCHAR(1500)] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +uri + + [VARCHAR(1500)] + NOT NULL asset--asset_alias_asset - -0..N -1 + +0..N +1 asset_trigger - -asset_trigger - -asset_id - - [INTEGER] - NOT NULL - -trigger_id - - [INTEGER] - NOT NULL + +asset_trigger + +asset_id + + [INTEGER] + NOT NULL + +trigger_id + + [INTEGER] + NOT NULL asset--asset_trigger - -0..N -1 + +0..N +1 asset_active - -asset_active - -name - - [VARCHAR(1500)] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL + +asset_active + +name + + [VARCHAR(1500)] + NOT NULL + +uri + + [VARCHAR(1500)] + NOT NULL asset--asset_active - -1 -1 + +1 +1 asset--asset_active - -1 -1 + +1 +1 dag_schedule_asset_reference - -dag_schedule_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset--dag_schedule_asset_reference - -0..N -1 + +0..N +1 task_outlet_asset_reference - -task_outlet_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +task_outlet_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset--task_outlet_asset_reference - -0..N -1 + +0..N +1 asset_dag_run_queue - -asset_dag_run_queue - -asset_id - - [INTEGER] - NOT NULL - -target_dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL + +asset_dag_run_queue + +asset_id + + [INTEGER] + NOT NULL + +target_dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL asset--asset_dag_run_queue - -0..N -1 + +0..N +1 asset_event - -asset_event - -id - - [INTEGER] - NOT NULL - -asset_id - - [INTEGER] - NOT NULL - -extra - - [JSON] - NOT NULL - -source_dag_id - - [VARCHAR(250)] - -source_map_index - - [INTEGER] - -source_run_id - - [VARCHAR(250)] - -source_task_id - - [VARCHAR(250)] - -timestamp - - [TIMESTAMP] - NOT NULL + +asset_event + +id + + [INTEGER] + NOT NULL + +asset_id + + [INTEGER] + NOT NULL + +extra + + [JSON] + NOT NULL + +source_dag_id + + [VARCHAR(250)] + +source_map_index + + [INTEGER] + +source_run_id + + [VARCHAR(250)] + +source_task_id + + [VARCHAR(250)] + +timestamp + + [TIMESTAMP] + NOT NULL asset_event--asset_alias_asset_event - -0..N -1 + +0..N +1 dagrun_asset_event - -dagrun_asset_event - -dag_run_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL + +dagrun_asset_event + +dag_run_id + + [INTEGER] + NOT NULL + +event_id + + [INTEGER] + NOT NULL asset_event--dagrun_asset_event - -0..N -1 + +0..N +1 trigger - -trigger - -id - - [INTEGER] - NOT NULL - -classpath - - [VARCHAR(1000)] - NOT NULL - -created_date - - [TIMESTAMP] - NOT NULL - -kwargs - - [TEXT] - NOT NULL - -triggerer_id - - [INTEGER] + +trigger + +id + + [INTEGER] + NOT NULL + +classpath + + [VARCHAR(1000)] + NOT NULL + +created_date + + [TIMESTAMP] + NOT NULL + +kwargs + + [TEXT] + NOT NULL + +triggerer_id + + [INTEGER] trigger--asset_trigger - -0..N -1 + +0..N +1 task_instance - -task_instance - -id - - [UUID] - NOT NULL - -custom_operator_name - - [VARCHAR(1000)] - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - -duration - - [DOUBLE_PRECISION] - -end_date - - [TIMESTAMP] - -executor - - [VARCHAR(1000)] - -executor_config - - [BYTEA] - -external_executor_id - - [VARCHAR(250)] - -hostname - - [VARCHAR(1000)] - -last_heartbeat_at - - [TIMESTAMP] - -map_index - - [INTEGER] - NOT NULL - -max_tries - - [INTEGER] - -next_kwargs - - [JSON] - -next_method - - [VARCHAR(1000)] - -operator - - [VARCHAR(1000)] - -pid - - [INTEGER] - -pool - - [VARCHAR(256)] - NOT NULL - -pool_slots - - [INTEGER] - NOT NULL - -priority_weight - - [INTEGER] - -queue - - [VARCHAR(256)] - -queued_by_job_id - - [INTEGER] - -queued_dttm - - [TIMESTAMP] - -rendered_map_index - - [VARCHAR(250)] - -run_id - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -task_display_name - - [VARCHAR(2000)] - -task_id - - [VARCHAR(250)] - NOT NULL - -trigger_id - - [INTEGER] - -trigger_timeout - - [TIMESTAMP] - -try_number - - [INTEGER] - -unixname - - [VARCHAR(1000)] - -updated_at - - [TIMESTAMP] + +task_instance + +id + + [UUID] + NOT NULL + +custom_operator_name + + [VARCHAR(1000)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + +duration + + [DOUBLE_PRECISION] + +end_date + + [TIMESTAMP] + +executor + + [VARCHAR(1000)] + +executor_config + + [BYTEA] + +external_executor_id + + [VARCHAR(250)] + +hostname + + [VARCHAR(1000)] + +last_heartbeat_at + + [TIMESTAMP] + +map_index + + [INTEGER] + NOT NULL + +max_tries + + [INTEGER] + +next_kwargs + + [JSON] + +next_method + + [VARCHAR(1000)] + +operator + + [VARCHAR(1000)] + +pid + + [INTEGER] + +pool + + [VARCHAR(256)] + NOT NULL + +pool_slots + + [INTEGER] + NOT NULL + +priority_weight + + [INTEGER] + +queue + + [VARCHAR(256)] + +queued_by_job_id + + [INTEGER] + +queued_dttm + + [TIMESTAMP] + +rendered_map_index + + [VARCHAR(250)] + +run_id + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +task_display_name + + [VARCHAR(2000)] + +task_id + + [VARCHAR(250)] + NOT NULL + +trigger_id + + [INTEGER] + +trigger_timeout + + [TIMESTAMP] + +try_number + + [INTEGER] + +unixname + + [VARCHAR(1000)] + +updated_at + + [TIMESTAMP] trigger--task_instance - -0..N -{0,1} + +0..N +{0,1} task_reschedule - -task_reschedule - -id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -duration - - [INTEGER] - NOT NULL - -end_date - - [TIMESTAMP] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -reschedule_date - - [TIMESTAMP] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -try_number - - [INTEGER] - NOT NULL + +task_reschedule + +id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +duration + + [INTEGER] + NOT NULL + +end_date + + [TIMESTAMP] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +reschedule_date + + [TIMESTAMP] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +try_number + + [INTEGER] + NOT NULL task_instance--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 rendered_task_instance_fields - -rendered_task_instance_fields - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -k8s_pod_yaml - - [JSON] - -rendered_fields - - [JSON] - NOT NULL + +rendered_task_instance_fields + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +k8s_pod_yaml + + [JSON] + +rendered_fields + + [JSON] + NOT NULL task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_map - -task_map - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -keys - - [JSON] - -length - - [INTEGER] - NOT NULL + +task_map + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +keys + + [JSON] + +length + + [INTEGER] + NOT NULL task_instance--task_map - -0..N -1 + +0..N +1 task_instance--task_map - -0..N -1 + +0..N +1 task_instance--task_map - -0..N -1 + +0..N +1 task_instance--task_map - -0..N -1 + +0..N +1 xcom - -xcom - -dag_run_id - - [INTEGER] - NOT NULL - -key - - [VARCHAR(512)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -timestamp - - [TIMESTAMP] - NOT NULL - -value - - [JSONB] + +xcom + +dag_run_id + + [INTEGER] + NOT NULL + +key + + [VARCHAR(512)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +timestamp + + [TIMESTAMP] + NOT NULL + +value + + [JSONB] task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance_note - -task_instance_note - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] + +task_instance_note + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [VARCHAR(128)] task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance_history - -task_instance_history - -id - - [INTEGER] - NOT NULL - -custom_operator_name - - [VARCHAR(1000)] - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - -duration - - [DOUBLE_PRECISION] - -end_date - - [TIMESTAMP] - -executor - - [VARCHAR(1000)] - -executor_config - - [BYTEA] - -external_executor_id - - [VARCHAR(250)] - -hostname - - [VARCHAR(1000)] - -map_index - - [INTEGER] - NOT NULL - -max_tries - - [INTEGER] - -next_kwargs - - [JSON] - -next_method - - [VARCHAR(1000)] - -operator - - [VARCHAR(1000)] - -pid - - [INTEGER] - -pool - - [VARCHAR(256)] - NOT NULL - -pool_slots - - [INTEGER] - NOT NULL - -priority_weight - - [INTEGER] - -queue - - [VARCHAR(256)] - -queued_by_job_id - - [INTEGER] - -queued_dttm - - [TIMESTAMP] - -rendered_map_index - - [VARCHAR(250)] - -run_id - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -task_display_name - - [VARCHAR(2000)] - -task_id - - [VARCHAR(250)] - NOT NULL - -trigger_id - - [INTEGER] - -trigger_timeout - - [TIMESTAMP] - -try_number - - [INTEGER] - NOT NULL - -unixname - - [VARCHAR(1000)] - -updated_at - - [TIMESTAMP] + +task_instance_history + +id + + [INTEGER] + NOT NULL + +custom_operator_name + + [VARCHAR(1000)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + +duration + + [DOUBLE_PRECISION] + +end_date + + [TIMESTAMP] + +executor + + [VARCHAR(1000)] + +executor_config + + [BYTEA] + +external_executor_id + + [VARCHAR(250)] + +hostname + + [VARCHAR(1000)] + +map_index + + [INTEGER] + NOT NULL + +max_tries + + [INTEGER] + +next_kwargs + + [JSON] + +next_method + + [VARCHAR(1000)] + +operator + + [VARCHAR(1000)] + +pid + + [INTEGER] + +pool + + [VARCHAR(256)] + NOT NULL + +pool_slots + + [INTEGER] + NOT NULL + +priority_weight + + [INTEGER] + +queue + + [VARCHAR(256)] + +queued_by_job_id + + [INTEGER] + +queued_dttm + + [TIMESTAMP] + +rendered_map_index + + [VARCHAR(250)] + +run_id + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +task_display_name + + [VARCHAR(2000)] + +task_id + + [VARCHAR(250)] + NOT NULL + +trigger_id + + [INTEGER] + +trigger_timeout + + [TIMESTAMP] + +try_number + + [INTEGER] + NOT NULL + +unixname + + [VARCHAR(1000)] + +updated_at + + [TIMESTAMP] task_instance--task_instance_history - -0..N -1 + +0..N +1 task_instance--task_instance_history - -0..N -1 + +0..N +1 task_instance--task_instance_history - -0..N -1 + +0..N +1 task_instance--task_instance_history - -0..N -1 + +0..N +1 dag_bundle - -dag_bundle - -id - - [UUID] - NOT NULL - -classpath - - [VARCHAR(1000)] - NOT NULL - -kwargs - - [JSON] - -last_refreshed - - [TIMESTAMP] - -latest_version - - [VARCHAR(200)] - -name - - [VARCHAR(250)] - NOT NULL - -refresh_interval - - [INTEGER] + +dag_bundle + +name + + [VARCHAR(250)] + NOT NULL + +active + + [BOOLEAN] + +last_refreshed + + [TIMESTAMP] + +latest_version + + [VARCHAR(200)] dag - -dag - -dag_id - - [VARCHAR(250)] - NOT NULL - -asset_expression - - [JSON] - -bundle_id - - [UUID] - -dag_display_name - - [VARCHAR(2000)] - -default_view - - [VARCHAR(25)] - -description - - [TEXT] - -fileloc - - [VARCHAR(2000)] - -has_import_errors - - [BOOLEAN] - -has_task_concurrency_limits - - [BOOLEAN] - NOT NULL - -is_active - - [BOOLEAN] - -is_paused - - [BOOLEAN] - -last_expired - - [TIMESTAMP] - -last_parsed_time - - [TIMESTAMP] - -latest_bundle_version - - [VARCHAR(200)] - -max_active_runs - - [INTEGER] - -max_active_tasks - - [INTEGER] - NOT NULL - -max_consecutive_failed_dag_runs - - [INTEGER] - NOT NULL - -next_dagrun - - [TIMESTAMP] - -next_dagrun_create_after - - [TIMESTAMP] - -next_dagrun_data_interval_end - - [TIMESTAMP] - -next_dagrun_data_interval_start - - [TIMESTAMP] - -owners - - [VARCHAR(2000)] - -processor_subdir - - [VARCHAR(2000)] - -timetable_description - - [VARCHAR(1000)] - -timetable_summary - - [TEXT] + +dag + +dag_id + + [VARCHAR(250)] + NOT NULL + +asset_expression + + [JSON] + +bundle_name + + [VARCHAR(250)] + +dag_display_name + + [VARCHAR(2000)] + +default_view + + [VARCHAR(25)] + +description + + [TEXT] + +fileloc + + [VARCHAR(2000)] + +has_import_errors + + [BOOLEAN] + +has_task_concurrency_limits + + [BOOLEAN] + NOT NULL + +is_active + + [BOOLEAN] + +is_paused + + [BOOLEAN] + +last_expired + + [TIMESTAMP] + +last_parsed_time + + [TIMESTAMP] + +latest_bundle_version + + [VARCHAR(200)] + +max_active_runs + + [INTEGER] + +max_active_tasks + + [INTEGER] + NOT NULL + +max_consecutive_failed_dag_runs + + [INTEGER] + NOT NULL + +next_dagrun + + [TIMESTAMP] + +next_dagrun_create_after + + [TIMESTAMP] + +next_dagrun_data_interval_end + + [TIMESTAMP] + +next_dagrun_data_interval_start + + [TIMESTAMP] + +owners + + [VARCHAR(2000)] + +processor_subdir + + [VARCHAR(2000)] + +timetable_description + + [VARCHAR(1000)] + +timetable_summary + + [TEXT] dag_bundle--dag - -0..N -{0,1} + +0..N +{0,1} dag--dag_schedule_asset_alias_reference - -0..N -1 + +0..N +1 dag--dag_schedule_asset_reference - -0..N -1 + +0..N +1 dag--task_outlet_asset_reference - -0..N -1 + +0..N +1 dag--asset_dag_run_queue - -0..N -1 + +0..N +1 dag_version - -dag_version - -id - - [UUID] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -version_number - - [INTEGER] - NOT NULL + +dag_version + +id + + [UUID] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +version_number + + [INTEGER] + NOT NULL dag--dag_version - -0..N -1 + +0..N +1 dag_tag - -dag_tag - -dag_id - - [VARCHAR(250)] - NOT NULL - -name - - [VARCHAR(100)] - NOT NULL + +dag_tag + +dag_id + + [VARCHAR(250)] + NOT NULL + +name + + [VARCHAR(100)] + NOT NULL dag--dag_tag - -0..N -1 + +0..N +1 dag_owner_attributes - -dag_owner_attributes - -dag_id - - [VARCHAR(250)] - NOT NULL - -owner - - [VARCHAR(500)] - NOT NULL - -link - - [VARCHAR(500)] - NOT NULL + +dag_owner_attributes + +dag_id + + [VARCHAR(250)] + NOT NULL + +owner + + [VARCHAR(500)] + NOT NULL + +link + + [VARCHAR(500)] + NOT NULL dag--dag_owner_attributes - -0..N -1 + +0..N +1 dag_warning - -dag_warning - -dag_id - - [VARCHAR(250)] - NOT NULL - -warning_type - - [VARCHAR(50)] - NOT NULL - -message - - [TEXT] - NOT NULL - -timestamp - - [TIMESTAMP] - NOT NULL + +dag_warning + +dag_id + + [VARCHAR(250)] + NOT NULL + +warning_type + + [VARCHAR(50)] + NOT NULL + +message + + [TEXT] + NOT NULL + +timestamp + + [TIMESTAMP] + NOT NULL dag--dag_warning - -0..N -1 + +0..N +1 dag_version--task_instance - -0..N -{0,1} + +0..N +{0,1} dag_run - -dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - -clear_number - - [INTEGER] - NOT NULL - -conf - - [BYTEA] - -creating_job_id - - [INTEGER] - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - -data_interval_end - - [TIMESTAMP] - -data_interval_start - - [TIMESTAMP] - -end_date - - [TIMESTAMP] - -external_trigger - - [BOOLEAN] - -last_scheduling_decision - - [TIMESTAMP] - -log_template_id - - [INTEGER] - -logical_date - - [TIMESTAMP] - NOT NULL - -queued_at - - [TIMESTAMP] - -run_id - - [VARCHAR(250)] - NOT NULL - -run_type - - [VARCHAR(50)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(50)] - -triggered_by - - [VARCHAR(50)] - -updated_at - - [TIMESTAMP] + +dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + +clear_number + + [INTEGER] + NOT NULL + +conf + + [BYTEA] + +creating_job_id + + [INTEGER] + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + +data_interval_end + + [TIMESTAMP] + +data_interval_start + + [TIMESTAMP] + +end_date + + [TIMESTAMP] + +external_trigger + + [BOOLEAN] + +last_scheduling_decision + + [TIMESTAMP] + +log_template_id + + [INTEGER] + +logical_date + + [TIMESTAMP] + NOT NULL + +queued_at + + [TIMESTAMP] + +run_id + + [VARCHAR(250)] + NOT NULL + +run_type + + [VARCHAR(50)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(50)] + +triggered_by + + [VARCHAR(50)] + +updated_at + + [TIMESTAMP] dag_version--dag_run - -0..N -{0,1} + +0..N +{0,1} dag_code - -dag_code - -id - - [UUID] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - NOT NULL - -fileloc - - [VARCHAR(2000)] - NOT NULL - -last_updated - - [TIMESTAMP] - NOT NULL - -source_code - - [TEXT] - NOT NULL - -source_code_hash - - [VARCHAR(32)] - NOT NULL + +dag_code + +id + + [UUID] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + NOT NULL + +fileloc + + [VARCHAR(2000)] + NOT NULL + +last_updated + + [TIMESTAMP] + NOT NULL + +source_code + + [TEXT] + NOT NULL + +source_code_hash + + [VARCHAR(32)] + NOT NULL dag_version--dag_code - -0..N -1 + +0..N +1 serialized_dag - -serialized_dag - -id - - [UUID] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -dag_hash - - [VARCHAR(32)] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - NOT NULL - -data - - [JSON] - -data_compressed - - [BYTEA] - -processor_subdir - - [VARCHAR(2000)] + +serialized_dag + +id + + [UUID] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +dag_hash + + [VARCHAR(32)] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + NOT NULL + +data + + [JSON] + +data_compressed + + [BYTEA] + +processor_subdir + + [VARCHAR(2000)] dag_version--serialized_dag - -0..N -1 + +0..N +1 dag_run--dagrun_asset_event - -0..N -1 + +0..N +1 dag_run--task_instance - -0..N -1 + +0..N +1 dag_run--task_instance - -0..N -1 + +0..N +1 backfill_dag_run - -backfill_dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - NOT NULL - -dag_run_id - - [INTEGER] - -exception_reason - - [VARCHAR(250)] - -logical_date - - [TIMESTAMP] - NOT NULL - -sort_ordinal - - [INTEGER] - NOT NULL + +backfill_dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + NOT NULL + +dag_run_id + + [INTEGER] + +exception_reason + + [VARCHAR(250)] + +logical_date + + [TIMESTAMP] + NOT NULL + +sort_ordinal + + [INTEGER] + NOT NULL dag_run--backfill_dag_run - -0..N -{0,1} + +0..N +{0,1} dag_run_note - -dag_run_note - -dag_run_id - - [INTEGER] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] + +dag_run_note + +dag_run_id + + [INTEGER] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [VARCHAR(128)] dag_run--dag_run_note - -1 -1 + +1 +1 dag_run--task_reschedule - -0..N -1 + +0..N +1 dag_run--task_reschedule - -0..N -1 + +0..N +1 log_template - -log_template - -id - - [INTEGER] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -elasticsearch_id - - [TEXT] - NOT NULL - -filename - - [TEXT] - NOT NULL + +log_template + +id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +elasticsearch_id + + [TEXT] + NOT NULL + +filename + + [TEXT] + NOT NULL log_template--dag_run - -0..N -{0,1} + +0..N +{0,1} backfill - -backfill - -id - - [INTEGER] - NOT NULL - -completed_at - - [TIMESTAMP] - -created_at - - [TIMESTAMP] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_run_conf - - [JSON] - NOT NULL - -from_date - - [TIMESTAMP] - NOT NULL - -is_paused - - [BOOLEAN] - -max_active_runs - - [INTEGER] - NOT NULL - -reprocess_behavior - - [VARCHAR(250)] - NOT NULL - -to_date - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +backfill + +id + + [INTEGER] + NOT NULL + +completed_at + + [TIMESTAMP] + +created_at + + [TIMESTAMP] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_run_conf + + [JSON] + NOT NULL + +from_date + + [TIMESTAMP] + NOT NULL + +is_paused + + [BOOLEAN] + +max_active_runs + + [INTEGER] + NOT NULL + +reprocess_behavior + + [VARCHAR(250)] + NOT NULL + +to_date + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL backfill--dag_run - -0..N -{0,1} + +0..N +{0,1} backfill--backfill_dag_run - -0..N -1 + +0..N +1 session - -session - -id - - [INTEGER] - NOT NULL - -data - - [BYTEA] - -expiry - - [TIMESTAMP] - -session_id - - [VARCHAR(255)] + +session + +id + + [INTEGER] + NOT NULL + +data + + [BYTEA] + +expiry + + [TIMESTAMP] + +session_id + + [VARCHAR(255)] alembic_version - -alembic_version - -version_num - - [VARCHAR(32)] - NOT NULL + +alembic_version + +version_num + + [VARCHAR(32)] + NOT NULL ab_user - -ab_user - -id - - [INTEGER] - NOT NULL - -active - - [BOOLEAN] - -changed_by_fk - - [INTEGER] - -changed_on - - [TIMESTAMP] - -created_by_fk - - [INTEGER] - -created_on - - [TIMESTAMP] - -email - - [VARCHAR(512)] - NOT NULL - -fail_login_count - - [INTEGER] - -first_name - - [VARCHAR(256)] - NOT NULL - -last_login - - [TIMESTAMP] - -last_name - - [VARCHAR(256)] - NOT NULL - -login_count - - [INTEGER] - -password - - [VARCHAR(256)] - -username - - [VARCHAR(512)] - NOT NULL + +ab_user + +id + + [INTEGER] + NOT NULL + +active + + [BOOLEAN] + +changed_by_fk + + [INTEGER] + +changed_on + + [TIMESTAMP] + +created_by_fk + + [INTEGER] + +created_on + + [TIMESTAMP] + +email + + [VARCHAR(512)] + NOT NULL + +fail_login_count + + [INTEGER] + +first_name + + [VARCHAR(256)] + NOT NULL + +last_login + + [TIMESTAMP] + +last_name + + [VARCHAR(256)] + NOT NULL + +login_count + + [INTEGER] + +password + + [VARCHAR(256)] + +username + + [VARCHAR(512)] + NOT NULL ab_user--ab_user - -0..N -{0,1} + +0..N +{0,1} ab_user--ab_user - -0..N -{0,1} + +0..N +{0,1} ab_user_role - -ab_user_role - -id - - [INTEGER] - NOT NULL - -role_id - - [INTEGER] - -user_id - - [INTEGER] + +ab_user_role + +id + + [INTEGER] + NOT NULL + +role_id + + [INTEGER] + +user_id + + [INTEGER] ab_user--ab_user_role - -0..N -{0,1} + +0..N +{0,1} ab_register_user - -ab_register_user - -id - - [INTEGER] - NOT NULL - -email - - [VARCHAR(512)] - NOT NULL - -first_name - - [VARCHAR(256)] - NOT NULL - -last_name - - [VARCHAR(256)] - NOT NULL - -password - - [VARCHAR(256)] - -registration_date - - [TIMESTAMP] - -registration_hash - - [VARCHAR(256)] - -username - - [VARCHAR(512)] - NOT NULL + +ab_register_user + +id + + [INTEGER] + NOT NULL + +email + + [VARCHAR(512)] + NOT NULL + +first_name + + [VARCHAR(256)] + NOT NULL + +last_name + + [VARCHAR(256)] + NOT NULL + +password + + [VARCHAR(256)] + +registration_date + + [TIMESTAMP] + +registration_hash + + [VARCHAR(256)] + +username + + [VARCHAR(512)] + NOT NULL ab_permission - -ab_permission - -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(100)] - NOT NULL + +ab_permission + +id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(100)] + NOT NULL ab_permission_view - -ab_permission_view - -id - - [INTEGER] - NOT NULL - -permission_id - - [INTEGER] - -view_menu_id - - [INTEGER] + +ab_permission_view + +id + + [INTEGER] + NOT NULL + +permission_id + + [INTEGER] + +view_menu_id + + [INTEGER] ab_permission--ab_permission_view - -0..N -{0,1} + +0..N +{0,1} ab_permission_view_role - -ab_permission_view_role - -id - - [INTEGER] - NOT NULL - -permission_view_id - - [INTEGER] - -role_id - - [INTEGER] + +ab_permission_view_role + +id + + [INTEGER] + NOT NULL + +permission_view_id + + [INTEGER] + +role_id + + [INTEGER] ab_permission_view--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} ab_view_menu - -ab_view_menu - -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(250)] - NOT NULL + +ab_view_menu + +id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(250)] + NOT NULL ab_view_menu--ab_permission_view - -0..N -{0,1} + +0..N +{0,1} ab_role - -ab_role - -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(64)] - NOT NULL + +ab_role + +id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(64)] + NOT NULL ab_role--ab_user_role - -0..N -{0,1} + +0..N +{0,1} ab_role--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} alembic_version_fab - -alembic_version_fab - -version_num - - [VARCHAR(32)] - NOT NULL + +alembic_version_fab + +version_num + + [VARCHAR(32)] + NOT NULL diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index cee69fcf032cd..019d6cdfa2d6a 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -356,6 +356,7 @@ dagbag dagbags DagCallbackRequest DagFileProcessorManager +dagfolder dagmodel DagParam Dagre @@ -367,6 +368,7 @@ dagruns DagRunState dagRunState DAGs +DagsFolderDagBundle Dask dask daskexecutor diff --git a/tests/dag_processing/bundles/__init__.py b/tests/dag_processing/bundles/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/dag_processing/bundles/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/dag_processing/bundles/test_dag_bundle_manager.py b/tests/dag_processing/bundles/test_dag_bundle_manager.py new file mode 100644 index 0000000000000..c79acfc2ed2a5 --- /dev/null +++ b/tests/dag_processing/bundles/test_dag_bundle_manager.py @@ -0,0 +1,153 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import json +import os +from unittest.mock import patch + +import pytest + +from airflow.dag_processing.bundles.base import BaseDagBundle +from airflow.dag_processing.bundles.manager import DagBundlesManager +from airflow.exceptions import AirflowConfigException +from airflow.models.dagbundle import DagBundleModel +from airflow.utils.session import create_session + +from tests_common.test_utils.db import clear_db_dag_bundles + + +@pytest.mark.parametrize( + "envs,expected_names", + [ + pytest.param({}, {"dags_folder"}, id="no_config"), + pytest.param( + {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE": "{}"}, {"testbundle", "dags_folder"}, id="add_bundle" + ), + pytest.param({"AIRFLOW__DAG_BUNDLES__DAGS_FOLDER": ""}, set(), id="remove_dags_folder_default"), + pytest.param( + {"AIRFLOW__DAG_BUNDLES__DAGS_FOLDER": "", "AIRFLOW__DAG_BUNDLES__TESTBUNDLE": "{}"}, + {"testbundle"}, + id="remove_dags_folder_default_add_bundle", + ), + ], +) +def test_bundle_configs_property(envs, expected_names): + """Test that bundle_configs are read from configuration.""" + bundle_manager = DagBundlesManager() + with patch.dict(os.environ, envs): + names = set(bundle_manager.bundle_configs.keys()) + assert names == expected_names + + +@pytest.mark.parametrize( + "config,message", + [ + pytest.param("1", "Bundle config for testbundle is not a dict: 1", id="int"), + pytest.param("[]", r"Bundle config for testbundle is not a dict: \[\]", id="list"), + pytest.param("abc", r"Unable to parse .* as valid json", id="not_json"), + ], +) +def test_bundle_configs_property_raises(config, message): + bundle_manager = DagBundlesManager() + with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE": config}): + with pytest.raises(AirflowConfigException, match=message): + bundle_manager.bundle_configs + + +class BasicBundle(BaseDagBundle): + def refresh(self): + pass + + def get_current_version(self): + pass + + def path(self): + pass + + +BASIC_BUNDLE_CONFIG = { + "classpath": "tests.dag_processing.bundles.test_dag_bundle_manager.BasicBundle", + "kwargs": {"refresh_interval": 1}, +} + + +def test_get_bundle(): + """Test that get_bundle builds and returns a bundle.""" + + bundle_manager = DagBundlesManager() + + with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE": json.dumps(BASIC_BUNDLE_CONFIG)}): + bundle = bundle_manager.get_bundle(name="testbundle", version="hello") + assert isinstance(bundle, BasicBundle) + assert bundle.name == "testbundle" + assert bundle.version == "hello" + assert bundle.refresh_interval == 1 + + # And none for version also works! + with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE": json.dumps(BASIC_BUNDLE_CONFIG)}): + bundle = bundle_manager.get_bundle(name="testbundle") + assert isinstance(bundle, BasicBundle) + assert bundle.name == "testbundle" + assert bundle.version is None + + +def test_get_all_dag_bundles(): + """Test that get_all_dag_bundles returns all bundles.""" + + bundle_manager = DagBundlesManager() + + with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE": json.dumps(BASIC_BUNDLE_CONFIG)}): + bundles = bundle_manager.get_all_dag_bundles() + assert len(bundles) == 2 + assert all(isinstance(x, BaseDagBundle) for x in bundles) + + bundle_names = {x.name for x in bundles} + assert bundle_names == {"testbundle", "dags_folder"} + + +@pytest.fixture +def clear_db(): + clear_db_dag_bundles() + yield + clear_db_dag_bundles() + + +@pytest.mark.db_test +def test_sync_bundles_to_db(clear_db): + bundle_manager = DagBundlesManager() + + def _get_bundle_names_and_active(): + with create_session() as session: + return ( + session.query(DagBundleModel.name, DagBundleModel.active).order_by(DagBundleModel.name).all() + ) + + # Initial add + with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE": json.dumps(BASIC_BUNDLE_CONFIG)}): + bundle_manager.sync_bundles_to_db() + assert _get_bundle_names_and_active() == [("dags_folder", True), ("testbundle", True)] + + # Disable ones that disappear from config + bundle_manager.sync_bundles_to_db() + assert _get_bundle_names_and_active() == [("dags_folder", True), ("testbundle", False)] + + # Re-enable one that reappears in config + with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE": json.dumps(BASIC_BUNDLE_CONFIG)}): + bundle_manager.sync_bundles_to_db() + assert _get_bundle_names_and_active() == [("dags_folder", True), ("testbundle", True)] diff --git a/tests/dag_processing/test_dag_bundles.py b/tests/dag_processing/test_dag_bundles.py index 65be620fe5ec1..343663fb58238 100644 --- a/tests/dag_processing/test_dag_bundles.py +++ b/tests/dag_processing/test_dag_bundles.py @@ -40,7 +40,7 @@ def bundle_temp_dir(tmp_path): def test_default_dag_storage_path(): with conf_vars({("core", "dag_bundle_storage_path"): ""}): - bundle = LocalDagBundle(name="test", local_folder="/hello") + bundle = LocalDagBundle(name="test", refresh_interval=300, local_folder="/hello") assert bundle._dag_bundle_root_storage_path == Path(tempfile.gettempdir(), "airflow", "dag_bundles") @@ -56,19 +56,19 @@ def path(self): pass with conf_vars({("core", "dag_bundle_storage_path"): None}): - bundle = BasicBundle(name="test") + bundle = BasicBundle(name="test", refresh_interval=300) assert bundle._dag_bundle_root_storage_path == Path(tempfile.gettempdir(), "airflow", "dag_bundles") class TestLocalDagBundle: def test_path(self): - bundle = LocalDagBundle(name="test", local_folder="/hello") + bundle = LocalDagBundle(name="test", refresh_interval=300, local_folder="/hello") assert bundle.path == Path("/hello") def test_none_for_version(self): assert LocalDagBundle.supports_versioning is False - bundle = LocalDagBundle(name="test", local_folder="/hello") + bundle = LocalDagBundle(name="test", refresh_interval=300, local_folder="/hello") assert bundle.get_current_version() is None @@ -79,6 +79,16 @@ def test_path(self): bundle = DagsFolderDagBundle(name="test") assert bundle.path == Path("/tmp/somewhere/dags") + @conf_vars({("scheduler", "dag_dir_list_interval"): "10"}) + def test_refresh_interval_from_config(self): + bundle = DagsFolderDagBundle(name="test") + assert bundle.refresh_interval == 10 + + @conf_vars({("scheduler", "dag_dir_list_interval"): "10"}) + def test_refresh_interval_from_kwarg(self): + bundle = DagsFolderDagBundle(name="test", refresh_interval=30) + assert bundle.refresh_interval == 30 + GIT_DEFAULT_BRANCH = "main" @@ -102,12 +112,16 @@ def test_supports_versioning(self): def test_uses_dag_bundle_root_storage_path(self, git_repo): repo_path, repo = git_repo - bundle = GitDagBundle(name="test", repo_url=repo_path, tracking_ref=GIT_DEFAULT_BRANCH) + bundle = GitDagBundle( + name="test", refresh_interval=300, repo_url=repo_path, tracking_ref=GIT_DEFAULT_BRANCH + ) assert str(bundle._dag_bundle_root_storage_path) in str(bundle.path) def test_get_current_version(self, git_repo): repo_path, repo = git_repo - bundle = GitDagBundle(name="test", repo_url=repo_path, tracking_ref=GIT_DEFAULT_BRANCH) + bundle = GitDagBundle( + name="test", refresh_interval=300, repo_url=repo_path, tracking_ref=GIT_DEFAULT_BRANCH + ) assert bundle.get_current_version() == repo.head.commit.hexsha @@ -123,7 +137,11 @@ def test_get_specific_version(self, git_repo): repo.index.commit("Another commit") bundle = GitDagBundle( - name="test", version=starting_commit.hexsha, repo_url=repo_path, tracking_ref=GIT_DEFAULT_BRANCH + name="test", + refresh_interval=300, + version=starting_commit.hexsha, + repo_url=repo_path, + tracking_ref=GIT_DEFAULT_BRANCH, ) assert bundle.get_current_version() == starting_commit.hexsha @@ -147,7 +165,11 @@ def test_get_tag_version(self, git_repo): repo.index.commit("Another commit") bundle = GitDagBundle( - name="test", version="test", repo_url=repo_path, tracking_ref=GIT_DEFAULT_BRANCH + name="test", + refresh_interval=300, + version="test", + repo_url=repo_path, + tracking_ref=GIT_DEFAULT_BRANCH, ) assert bundle.get_current_version() == starting_commit.hexsha @@ -165,7 +187,9 @@ def test_get_latest(self, git_repo): repo.index.add([file_path]) repo.index.commit("Another commit") - bundle = GitDagBundle(name="test", repo_url=repo_path, tracking_ref=GIT_DEFAULT_BRANCH) + bundle = GitDagBundle( + name="test", refresh_interval=300, repo_url=repo_path, tracking_ref=GIT_DEFAULT_BRANCH + ) assert bundle.get_current_version() != starting_commit.hexsha @@ -176,7 +200,9 @@ def test_refresh(self, git_repo): repo_path, repo = git_repo starting_commit = repo.head.commit - bundle = GitDagBundle(name="test", repo_url=repo_path, tracking_ref=GIT_DEFAULT_BRANCH) + bundle = GitDagBundle( + name="test", refresh_interval=300, repo_url=repo_path, tracking_ref=GIT_DEFAULT_BRANCH + ) assert bundle.get_current_version() == starting_commit.hexsha @@ -200,7 +226,7 @@ def test_head(self, git_repo): repo_path, repo = git_repo repo.create_head("test") - bundle = GitDagBundle(name="test", repo_url=repo_path, tracking_ref="test") + bundle = GitDagBundle(name="test", refresh_interval=300, repo_url=repo_path, tracking_ref="test") assert bundle.repo.head.ref.name == "test" def test_version_not_found(self, git_repo): @@ -208,7 +234,11 @@ def test_version_not_found(self, git_repo): with pytest.raises(AirflowException, match="Version not_found not found in the repository"): GitDagBundle( - name="test", version="not_found", repo_url=repo_path, tracking_ref=GIT_DEFAULT_BRANCH + name="test", + refresh_interval=300, + version="not_found", + repo_url=repo_path, + tracking_ref=GIT_DEFAULT_BRANCH, ) def test_subdir(self, git_repo): @@ -224,7 +254,13 @@ def test_subdir(self, git_repo): repo.index.add([file_path]) repo.index.commit("Initial commit") - bundle = GitDagBundle(name="test", repo_url=repo_path, tracking_ref=GIT_DEFAULT_BRANCH, subdir=subdir) + bundle = GitDagBundle( + name="test", + refresh_interval=300, + repo_url=repo_path, + tracking_ref=GIT_DEFAULT_BRANCH, + subdir=subdir, + ) files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()} assert str(bundle.path).endswith(subdir) diff --git a/tests_common/test_utils/db.py b/tests_common/test_utils/db.py index 61c9c1394a517..c5e595240d36e 100644 --- a/tests_common/test_utils/db.py +++ b/tests_common/test_utils/db.py @@ -232,6 +232,13 @@ def clear_db_dag_parsing_requests(): session.query(DagPriorityParsingRequest).delete() +def clear_db_dag_bundles(): + with create_session() as session: + from airflow.models.dagbundle import DagBundleModel + + session.query(DagBundleModel).delete() + + def clear_dag_specific_permissions(): try: from airflow.providers.fab.auth_manager.models import Permission, Resource, assoc_permission_role @@ -286,3 +293,5 @@ def clear_all(): clear_db_pools() clear_db_connections(add_default_connections_back=True) clear_dag_specific_permissions() + if AIRFLOW_V_3_0_PLUS: + clear_db_dag_bundles()