From e3d334fcba2ce26aeed86c45659998cf1591b7d4 Mon Sep 17 00:00:00 2001 From: Jed Cunningham Date: Mon, 9 Dec 2024 21:30:22 -0700 Subject: [PATCH 1/8] Move DAG bundle config into config, not db This moves the DAG bundle config into the Airflow config, instead of being in the db. This: - makes it much easier to configure a fresh Airflow instance - no api/cli calls required - avoids some security concerns by ensuring only deployment managers, with direct access to the instance, can configure these The primary downside is this does mean you cannot reconfigure an existing bundle in a running Airflow instance. --- airflow/config_templates/config.yml | 13 ++ airflow/dag_processing/bundles/base.py | 3 +- airflow/dag_processing/bundles/dagfolder.py | 12 +- airflow/dag_processing/bundles/manager.py | 94 +++++++++++ airflow/dag_processing/manager.py | 4 + .../versions/0050_3_0_0_add_dagbundlemodel.py | 25 ++- airflow/models/dag.py | 3 +- airflow/models/dagbundle.py | 50 +----- tests/dag_processing/bundles/__init__.py | 16 ++ .../bundles/test_dag_bundle_manager.py | 152 ++++++++++++++++++ tests/dag_processing/test_dag_bundles.py | 62 +++++-- tests_common/test_utils/db.py | 8 + 12 files changed, 365 insertions(+), 77 deletions(-) create mode 100644 airflow/dag_processing/bundles/manager.py create mode 100644 tests/dag_processing/bundles/__init__.py create mode 100644 tests/dag_processing/bundles/test_dag_bundle_manager.py diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 5ca17e63ee2d4..a75f21211e507 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2654,3 +2654,16 @@ usage_data_collection: example: ~ default: "True" see_also: ":ref:`Usage data collection FAQ `" +dag_bundles: + description: | + Configuration for the DAG bundles. This allows Airflow to load DAGs from different sources. + options: + dags_folder: + description: | + This is the default DAG bundle that loads DAGs from the traditional `[core] dags_folder`. + It can be disabled by setting it to an empty string. + version_added: ~ + type: string + example: ~ + default: | + {{"classpath": "airflow.dag_processing.bundles.dagfolder.DagsFolderDagBundle", "kwargs": {{}}}} diff --git a/airflow/dag_processing/bundles/base.py b/airflow/dag_processing/bundles/base.py index f3fdb1aed9476..6cae0eca44a9b 100644 --- a/airflow/dag_processing/bundles/base.py +++ b/airflow/dag_processing/bundles/base.py @@ -45,9 +45,10 @@ class BaseDagBundle(ABC): supports_versioning: bool = False - def __init__(self, *, name: str, version: str | None = None) -> None: + def __init__(self, *, name: str, refresh_interval: int, version: str | None = None) -> None: self.name = name self.version = version + self.refresh_interval = refresh_interval @property def _dag_bundle_root_storage_path(self) -> Path: diff --git a/airflow/dag_processing/bundles/dagfolder.py b/airflow/dag_processing/bundles/dagfolder.py index 40ba649644558..92087d6a63798 100644 --- a/airflow/dag_processing/bundles/dagfolder.py +++ b/airflow/dag_processing/bundles/dagfolder.py @@ -18,11 +18,19 @@ from __future__ import annotations from airflow import settings +from airflow.configuration import conf from airflow.dag_processing.bundles.local import LocalDagBundle class DagsFolderDagBundle(LocalDagBundle): """A bundle for the DAGs folder.""" - def __init__(self, **kwargs): - super().__init__(local_folder=settings.DAGS_FOLDER, **kwargs) + def __init__(self, refresh_interval: int | None = None, **kwargs): + if refresh_interval is None: + refresh_interval = conf.getint("scheduler", "dag_dir_list_interval") + + super().__init__( + local_folder=settings.DAGS_FOLDER, + refresh_interval=refresh_interval, + **kwargs, + ) diff --git a/airflow/dag_processing/bundles/manager.py b/airflow/dag_processing/bundles/manager.py new file mode 100644 index 0000000000000..944e26d12910d --- /dev/null +++ b/airflow/dag_processing/bundles/manager.py @@ -0,0 +1,94 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING + +from airflow.configuration import conf +from airflow.exceptions import AirflowConfigException +from airflow.models.dagbundle import DagBundleModel +from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.module_loading import import_string +from airflow.utils.session import NEW_SESSION, provide_session + +if TYPE_CHECKING: + from sqlalchemy.orm import Session + + from airflow.dag_processing.bundles.base import BaseDagBundle + + +class DagBundlesManager(LoggingMixin): + """Manager for Dag Bundles.""" + + @property + def bundle_configs(self) -> dict[str, dict]: + """Get all DAG bundle configurations.""" + configured_bundles = conf.getsection("dag_bundles") + + if not configured_bundles: + return {} + + # If dags_folder is empty string, we remove it. This allows the default dags_folder bundle to be disabled. + if not configured_bundles["dags_folder"]: + del configured_bundles["dags_folder"] + + dict_bundles: dict[str, dict] = {} + for key in configured_bundles.keys(): + config = conf.getjson("dag_bundles", key) + if not isinstance(config, dict): + raise AirflowConfigException(f"Bundle config for {key} is not a dict: {config}") + dict_bundles[key] = config + + return dict_bundles + + @provide_session + def sync_bundles_to_db(self, *, session: Session = NEW_SESSION) -> None: + known_bundles = {b.name: b for b in session.query(DagBundleModel).all()} + + for name in self.bundle_configs.keys(): + if bundle := known_bundles.get(name): + bundle.enabled = True + else: + session.add(DagBundleModel(name=name)) + self.log.info("Added new DAG bundle %s to the database", name) + + for name, bundle in known_bundles.items(): + if name not in self.bundle_configs: + bundle.enabled = False + self.log.warning("DAG bundle %s is no longer found in config and has been disabled", name) + + def get_all_dag_bundles(self) -> list[BaseDagBundle]: + """ + Get all DAG bundles. + + :param session: A database session. + :return: list of DAG bundles. + """ + return [self.get_bundle(name, version=None) for name in self.bundle_configs.keys()] + + def get_bundle(self, name: str, version: str | None = None) -> BaseDagBundle: + """ + Get a DAG bundle by name. + + :param name: The name of the DAG bundle. + :param version: The version of the DAG bundle you need (optional). If not provided, `tracking_ref` will be used instead. + :return: The DAG bundle. + """ + # TODO: proper validation of the bundle configuration so we have better error messages + bundle_config = self.bundle_configs[name] + bundle_class = import_string(bundle_config["classpath"]) + return bundle_class(name=name, version=version, **bundle_config["kwargs"]) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 57c69238a1f7e..38aa82fcda803 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -399,6 +399,10 @@ def start(self): "Checking for new files in %s every %s seconds", self._dag_directory, self.dag_dir_list_interval ) + from airflow.dag_processing.bundles.manager import DagBundlesManager + + DagBundlesManager().sync_bundles_to_db() + return self._run_parsing_loop() def _scan_stale_dags(self): diff --git a/airflow/migrations/versions/0050_3_0_0_add_dagbundlemodel.py b/airflow/migrations/versions/0050_3_0_0_add_dagbundlemodel.py index f95eb3f25df30..fdd87547283e7 100644 --- a/airflow/migrations/versions/0050_3_0_0_add_dagbundlemodel.py +++ b/airflow/migrations/versions/0050_3_0_0_add_dagbundlemodel.py @@ -28,10 +28,8 @@ import sqlalchemy as sa from alembic import op -from sqlalchemy_utils import UUIDType -from airflow.models.base import StringID -from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime +from airflow.utils.sqlalchemy import UtcDateTime revision = "e229247a6cb1" down_revision = "eed27faa34e3" @@ -43,27 +41,24 @@ def upgrade(): op.create_table( "dag_bundle", - sa.Column("id", UUIDType(binary=False), nullable=False), - sa.Column("name", StringID(), nullable=False), - sa.Column("classpath", sa.String(length=1000), nullable=False), - sa.Column("kwargs", ExtendedJSON(), nullable=True), - sa.Column("refresh_interval", sa.Integer(), nullable=True), + sa.Column("name", sa.String(length=250), nullable=False), + sa.Column("enabled", sa.Boolean(), nullable=True), sa.Column("latest_version", sa.String(length=200), nullable=True), sa.Column("last_refreshed", UtcDateTime(timezone=True), nullable=True), - sa.PrimaryKeyConstraint("id", name=op.f("dag_bundle_pkey")), - sa.UniqueConstraint("name", name=op.f("dag_bundle_name_uq")), + sa.PrimaryKeyConstraint("name", name=op.f("dag_bundle_pkey")), ) with op.batch_alter_table("dag", schema=None) as batch_op: - batch_op.add_column(sa.Column("bundle_id", UUIDType(binary=False), nullable=True)) + batch_op.add_column(sa.Column("bundle_name", sa.String(length=250), nullable=True)) batch_op.add_column(sa.Column("latest_bundle_version", sa.String(length=200), nullable=True)) - batch_op.create_foreign_key(batch_op.f("dag_bundle_id_fkey"), "dag_bundle", ["bundle_id"], ["id"]) + batch_op.create_foreign_key( + batch_op.f("dag_bundle_name_fkey"), "dag_bundle", ["bundle_name"], ["name"] + ) def downgrade(): - """Unapply Add DagBundleModel.""" with op.batch_alter_table("dag", schema=None) as batch_op: - batch_op.drop_constraint(batch_op.f("dag_bundle_id_fkey"), type_="foreignkey") + batch_op.drop_constraint(batch_op.f("dag_bundle_name_fkey"), type_="foreignkey") batch_op.drop_column("latest_bundle_version") - batch_op.drop_column("bundle_id") + batch_op.drop_column("bundle_name") op.drop_table("dag_bundle") diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 6163580870c4e..1decc2db683ea 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -66,7 +66,6 @@ from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.orm import backref, relationship from sqlalchemy.sql import Select, expression -from sqlalchemy_utils import UUIDType from airflow import settings, utils from airflow.configuration import conf as airflow_conf, secrets_backend_list @@ -2028,7 +2027,7 @@ class DagModel(Base): fileloc = Column(String(2000)) # The base directory used by Dag Processor that parsed this dag. processor_subdir = Column(String(2000), nullable=True) - bundle_id = Column(UUIDType(binary=False), ForeignKey("dag_bundle.id"), nullable=True) + bundle_name = Column(StringID(), ForeignKey("dag_bundle.name"), nullable=True) # The version of the bundle the last time the DAG was parsed latest_bundle_version = Column(String(200), nullable=True) # String representing the owners diff --git a/airflow/models/dagbundle.py b/airflow/models/dagbundle.py index d4a628cbc8dd4..a68a0283b6b59 100644 --- a/airflow/models/dagbundle.py +++ b/airflow/models/dagbundle.py @@ -16,58 +16,20 @@ # under the License. from __future__ import annotations -from typing import TYPE_CHECKING - -import uuid6 -from sqlalchemy import Column, Integer, String -from sqlalchemy_utils import UUIDType +from sqlalchemy import Boolean, Column, String from airflow.models.base import Base, StringID -from airflow.utils.module_loading import import_string -from airflow.utils.session import NEW_SESSION, provide_session -from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime - -if TYPE_CHECKING: - from sqlalchemy.orm import Session - - from airflow.dag_processing.bundles.base import BaseDagBundle +from airflow.utils.sqlalchemy import UtcDateTime class DagBundleModel(Base): - """A table for DAG Bundle config.""" + """A table for DAG Bundle information.""" __tablename__ = "dag_bundle" - id = Column(UUIDType(binary=False), primary_key=True, default=uuid6.uuid7) - name = Column(StringID(), nullable=False, unique=True) - classpath = Column(String(1000), nullable=False) - kwargs = Column(ExtendedJSON, nullable=True) - refresh_interval = Column(Integer, nullable=True) + name = Column(StringID(), primary_key=True) + enabled = Column(Boolean, default=True) latest_version = Column(String(200), nullable=True) last_refreshed = Column(UtcDateTime, nullable=True) - def __init__(self, *, name, classpath, kwargs, refresh_interval): + def __init__(self, *, name: str): self.name = name - self.classpath = classpath - self.kwargs = kwargs - self.refresh_interval = refresh_interval - - @classmethod - @provide_session - def get_all_dag_bundles( - cls, *, session: Session = NEW_SESSION - ) -> list[tuple[DagBundleModel, BaseDagBundle]]: - """ - Get all DAG bundles. - - :param session: A database session. - :return: list of DAG bundles. - """ - bundle_configs = session.query(cls).all() - - bundles = [] - for bundle_config in bundle_configs: - bundle_class = import_string(bundle_config.classpath) - bundle = bundle_class(name=bundle_config.name, **bundle_config.kwargs) - bundles.append((bundle_config, bundle)) - - return bundles diff --git a/tests/dag_processing/bundles/__init__.py b/tests/dag_processing/bundles/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/dag_processing/bundles/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/dag_processing/bundles/test_dag_bundle_manager.py b/tests/dag_processing/bundles/test_dag_bundle_manager.py new file mode 100644 index 0000000000000..3fe92b3af6444 --- /dev/null +++ b/tests/dag_processing/bundles/test_dag_bundle_manager.py @@ -0,0 +1,152 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import json +import os +from unittest.mock import patch + +import pytest + +from airflow.dag_processing.bundles.base import BaseDagBundle +from airflow.dag_processing.bundles.manager import DagBundlesManager +from airflow.exceptions import AirflowConfigException +from airflow.models.dagbundle import DagBundleModel +from airflow.utils.session import create_session + +from tests_common.test_utils.db import clear_db_dag_bundles + + +@pytest.mark.parametrize( + "envs,expected_names", + [ + pytest.param({}, {"dags_folder"}, id="no_config"), + pytest.param( + {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE": "{}"}, {"testbundle", "dags_folder"}, id="add_bundle" + ), + pytest.param({"AIRFLOW__DAG_BUNDLES__DAGS_FOLDER": ""}, set(), id="remove_dags_folder_default"), + pytest.param( + {"AIRFLOW__DAG_BUNDLES__DAGS_FOLDER": "", "AIRFLOW__DAG_BUNDLES__TESTBUNDLE": "{}"}, + {"testbundle"}, + id="remove_dags_folder_default_add_bundle", + ), + ], +) +def test_bundle_configs_property(envs, expected_names): + """Test that bundle_configs are read from configuration.""" + bundle_manager = DagBundlesManager() + with patch.dict(os.environ, envs): + names = set(bundle_manager.bundle_configs.keys()) + assert names == expected_names + + +@pytest.mark.parametrize( + "config,message", + [ + pytest.param("1", "Bundle config for testbundle is not a dict: 1", id="int"), + pytest.param("[]", r"Bundle config for testbundle is not a dict: \[\]", id="list"), + pytest.param("abc", r"Unable to parse .* as valid json", id="not_json"), + ], +) +def test_bundle_configs_property_raises(config, message): + bundle_manager = DagBundlesManager() + with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE": config}): + with pytest.raises(AirflowConfigException, match=message): + bundle_manager.bundle_configs + + +class BasicBundle(BaseDagBundle): + def refresh(self): + pass + + def get_current_version(self): + pass + + def path(self): + pass + + +BASIC_BUNDLE_CONFIG = { + "classpath": "tests.dag_processing.bundles.test_dag_bundle_manager.BasicBundle", + "kwargs": {"refresh_interval": 1}, +} + + +def test_get_bundle(): + """Test that get_bundle builds and returns a bundle.""" + + bundle_manager = DagBundlesManager() + + with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE": json.dumps(BASIC_BUNDLE_CONFIG)}): + bundle = bundle_manager.get_bundle(name="testbundle", version="hello") + assert isinstance(bundle, BasicBundle) + assert bundle.name == "testbundle" + assert bundle.version == "hello" + assert bundle.refresh_interval == 1 + + # And none for version also works! + with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE": json.dumps(BASIC_BUNDLE_CONFIG)}): + bundle = bundle_manager.get_bundle(name="testbundle") + assert isinstance(bundle, BasicBundle) + assert bundle.name == "testbundle" + assert bundle.version is None + + +def test_get_all_dag_bundles(): + """Test that get_all_dag_bundles returns all bundles.""" + + bundle_manager = DagBundlesManager() + + with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE": json.dumps(BASIC_BUNDLE_CONFIG)}): + bundles = bundle_manager.get_all_dag_bundles() + assert len(bundles) == 2 + assert all(isinstance(x, BaseDagBundle) for x in bundles) + + bundle_names = {x.name for x in bundles} + assert bundle_names == {"testbundle", "dags_folder"} + + +@pytest.fixture +def clear_db(): + clear_db_dag_bundles() + yield + clear_db_dag_bundles() + + +def test_sync_bundles_to_db(clear_db): + bundle_manager = DagBundlesManager() + + def _get_bundle_names_and_enabled(): + with create_session() as session: + return ( + session.query(DagBundleModel.name, DagBundleModel.enabled).order_by(DagBundleModel.name).all() + ) + + # Initial add + with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE": json.dumps(BASIC_BUNDLE_CONFIG)}): + bundle_manager.sync_bundles_to_db() + assert _get_bundle_names_and_enabled() == [("dags_folder", True), ("testbundle", True)] + + # Disable ones that disappear from config + bundle_manager.sync_bundles_to_db() + assert _get_bundle_names_and_enabled() == [("dags_folder", True), ("testbundle", False)] + + # Re-enable one that reappears in config + with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE": json.dumps(BASIC_BUNDLE_CONFIG)}): + bundle_manager.sync_bundles_to_db() + assert _get_bundle_names_and_enabled() == [("dags_folder", True), ("testbundle", True)] diff --git a/tests/dag_processing/test_dag_bundles.py b/tests/dag_processing/test_dag_bundles.py index 65be620fe5ec1..343663fb58238 100644 --- a/tests/dag_processing/test_dag_bundles.py +++ b/tests/dag_processing/test_dag_bundles.py @@ -40,7 +40,7 @@ def bundle_temp_dir(tmp_path): def test_default_dag_storage_path(): with conf_vars({("core", "dag_bundle_storage_path"): ""}): - bundle = LocalDagBundle(name="test", local_folder="/hello") + bundle = LocalDagBundle(name="test", refresh_interval=300, local_folder="/hello") assert bundle._dag_bundle_root_storage_path == Path(tempfile.gettempdir(), "airflow", "dag_bundles") @@ -56,19 +56,19 @@ def path(self): pass with conf_vars({("core", "dag_bundle_storage_path"): None}): - bundle = BasicBundle(name="test") + bundle = BasicBundle(name="test", refresh_interval=300) assert bundle._dag_bundle_root_storage_path == Path(tempfile.gettempdir(), "airflow", "dag_bundles") class TestLocalDagBundle: def test_path(self): - bundle = LocalDagBundle(name="test", local_folder="/hello") + bundle = LocalDagBundle(name="test", refresh_interval=300, local_folder="/hello") assert bundle.path == Path("/hello") def test_none_for_version(self): assert LocalDagBundle.supports_versioning is False - bundle = LocalDagBundle(name="test", local_folder="/hello") + bundle = LocalDagBundle(name="test", refresh_interval=300, local_folder="/hello") assert bundle.get_current_version() is None @@ -79,6 +79,16 @@ def test_path(self): bundle = DagsFolderDagBundle(name="test") assert bundle.path == Path("/tmp/somewhere/dags") + @conf_vars({("scheduler", "dag_dir_list_interval"): "10"}) + def test_refresh_interval_from_config(self): + bundle = DagsFolderDagBundle(name="test") + assert bundle.refresh_interval == 10 + + @conf_vars({("scheduler", "dag_dir_list_interval"): "10"}) + def test_refresh_interval_from_kwarg(self): + bundle = DagsFolderDagBundle(name="test", refresh_interval=30) + assert bundle.refresh_interval == 30 + GIT_DEFAULT_BRANCH = "main" @@ -102,12 +112,16 @@ def test_supports_versioning(self): def test_uses_dag_bundle_root_storage_path(self, git_repo): repo_path, repo = git_repo - bundle = GitDagBundle(name="test", repo_url=repo_path, tracking_ref=GIT_DEFAULT_BRANCH) + bundle = GitDagBundle( + name="test", refresh_interval=300, repo_url=repo_path, tracking_ref=GIT_DEFAULT_BRANCH + ) assert str(bundle._dag_bundle_root_storage_path) in str(bundle.path) def test_get_current_version(self, git_repo): repo_path, repo = git_repo - bundle = GitDagBundle(name="test", repo_url=repo_path, tracking_ref=GIT_DEFAULT_BRANCH) + bundle = GitDagBundle( + name="test", refresh_interval=300, repo_url=repo_path, tracking_ref=GIT_DEFAULT_BRANCH + ) assert bundle.get_current_version() == repo.head.commit.hexsha @@ -123,7 +137,11 @@ def test_get_specific_version(self, git_repo): repo.index.commit("Another commit") bundle = GitDagBundle( - name="test", version=starting_commit.hexsha, repo_url=repo_path, tracking_ref=GIT_DEFAULT_BRANCH + name="test", + refresh_interval=300, + version=starting_commit.hexsha, + repo_url=repo_path, + tracking_ref=GIT_DEFAULT_BRANCH, ) assert bundle.get_current_version() == starting_commit.hexsha @@ -147,7 +165,11 @@ def test_get_tag_version(self, git_repo): repo.index.commit("Another commit") bundle = GitDagBundle( - name="test", version="test", repo_url=repo_path, tracking_ref=GIT_DEFAULT_BRANCH + name="test", + refresh_interval=300, + version="test", + repo_url=repo_path, + tracking_ref=GIT_DEFAULT_BRANCH, ) assert bundle.get_current_version() == starting_commit.hexsha @@ -165,7 +187,9 @@ def test_get_latest(self, git_repo): repo.index.add([file_path]) repo.index.commit("Another commit") - bundle = GitDagBundle(name="test", repo_url=repo_path, tracking_ref=GIT_DEFAULT_BRANCH) + bundle = GitDagBundle( + name="test", refresh_interval=300, repo_url=repo_path, tracking_ref=GIT_DEFAULT_BRANCH + ) assert bundle.get_current_version() != starting_commit.hexsha @@ -176,7 +200,9 @@ def test_refresh(self, git_repo): repo_path, repo = git_repo starting_commit = repo.head.commit - bundle = GitDagBundle(name="test", repo_url=repo_path, tracking_ref=GIT_DEFAULT_BRANCH) + bundle = GitDagBundle( + name="test", refresh_interval=300, repo_url=repo_path, tracking_ref=GIT_DEFAULT_BRANCH + ) assert bundle.get_current_version() == starting_commit.hexsha @@ -200,7 +226,7 @@ def test_head(self, git_repo): repo_path, repo = git_repo repo.create_head("test") - bundle = GitDagBundle(name="test", repo_url=repo_path, tracking_ref="test") + bundle = GitDagBundle(name="test", refresh_interval=300, repo_url=repo_path, tracking_ref="test") assert bundle.repo.head.ref.name == "test" def test_version_not_found(self, git_repo): @@ -208,7 +234,11 @@ def test_version_not_found(self, git_repo): with pytest.raises(AirflowException, match="Version not_found not found in the repository"): GitDagBundle( - name="test", version="not_found", repo_url=repo_path, tracking_ref=GIT_DEFAULT_BRANCH + name="test", + refresh_interval=300, + version="not_found", + repo_url=repo_path, + tracking_ref=GIT_DEFAULT_BRANCH, ) def test_subdir(self, git_repo): @@ -224,7 +254,13 @@ def test_subdir(self, git_repo): repo.index.add([file_path]) repo.index.commit("Initial commit") - bundle = GitDagBundle(name="test", repo_url=repo_path, tracking_ref=GIT_DEFAULT_BRANCH, subdir=subdir) + bundle = GitDagBundle( + name="test", + refresh_interval=300, + repo_url=repo_path, + tracking_ref=GIT_DEFAULT_BRANCH, + subdir=subdir, + ) files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()} assert str(bundle.path).endswith(subdir) diff --git a/tests_common/test_utils/db.py b/tests_common/test_utils/db.py index 61c9c1394a517..b70771aaa7bc2 100644 --- a/tests_common/test_utils/db.py +++ b/tests_common/test_utils/db.py @@ -232,6 +232,13 @@ def clear_db_dag_parsing_requests(): session.query(DagPriorityParsingRequest).delete() +def clear_db_dag_bundles(): + with create_session() as session: + from airflow.models.dagbundle import DagBundleModel + + session.query(DagBundleModel).delete() + + def clear_dag_specific_permissions(): try: from airflow.providers.fab.auth_manager.models import Permission, Resource, assoc_permission_role @@ -286,3 +293,4 @@ def clear_all(): clear_db_pools() clear_db_connections(add_default_connections_back=True) clear_dag_specific_permissions() + clear_db_dag_bundles() From 4fc4f026e9d420cee63266f53fc6db77eacda443 Mon Sep 17 00:00:00 2001 From: Jed Cunningham Date: Fri, 13 Dec 2024 16:53:16 -0700 Subject: [PATCH 2/8] Review comments and fix some tests --- airflow/config_templates/config.yml | 21 +++++++++++++++++-- airflow/dag_processing/bundles/manager.py | 2 +- airflow/models/dagbundle.py | 10 ++++++++- .../bundles/test_dag_bundle_manager.py | 1 + 4 files changed, 30 insertions(+), 4 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index a75f21211e507..b7146644cf565 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2657,11 +2657,28 @@ usage_data_collection: dag_bundles: description: | Configuration for the DAG bundles. This allows Airflow to load DAGs from different sources. + + Airflow will consume all options added to this section. Below you will see only the default, + ``dags_folder``. The option name is the bundle name and the value is a json object with the following + keys: + - classpath: The classpath of the bundle class + - kwargs: The keyword arguments to pass to the bundle class + - refresh_interval: The interval in seconds to refresh the bundle from its source. + + For example, to add a new bundle named ``hello`` to my Airflow instance, add the following to your + airflow.cfg (this is just an example, the classpath and kwargs are not real): + + .. code-block:: ini + + [dag_bundles] + hello: {classpath: "airflow.some.classpath", kwargs: {"hello": "world"}, refresh_interval: 60} options: dags_folder: description: | - This is the default DAG bundle that loads DAGs from the traditional `[core] dags_folder`. - It can be disabled by setting it to an empty string. + This is the default DAG bundle that loads DAGs from the traditional ``[core] dags_folder``. + By default, ``refresh_interval`` it set to ``[scheduler] dag_dir_list_interval``, but that can be + overridden here if desired. + Parsing DAGs from the DAG folder can be disabled by setting this option to an empty string. version_added: ~ type: string example: ~ diff --git a/airflow/dag_processing/bundles/manager.py b/airflow/dag_processing/bundles/manager.py index 944e26d12910d..fac80386ecf7a 100644 --- a/airflow/dag_processing/bundles/manager.py +++ b/airflow/dag_processing/bundles/manager.py @@ -32,7 +32,7 @@ class DagBundlesManager(LoggingMixin): - """Manager for Dag Bundles.""" + """Manager for DAG bundles.""" @property def bundle_configs(self) -> dict[str, dict]: diff --git a/airflow/models/dagbundle.py b/airflow/models/dagbundle.py index a68a0283b6b59..3449d9995d6fa 100644 --- a/airflow/models/dagbundle.py +++ b/airflow/models/dagbundle.py @@ -23,7 +23,15 @@ class DagBundleModel(Base): - """A table for DAG Bundle information.""" + """ + A table for storing DAG bundle metadata. + + We track the following information about each bundle, as it can be useful for + informational purposes and for debugging: + - enabled: Is the bundle currently found in configuration? + - latest_version: The latest version Airflow has seen for the bundle. + - last_refreshed: When the bundle was last refreshed. + """ __tablename__ = "dag_bundle" name = Column(StringID(), primary_key=True) diff --git a/tests/dag_processing/bundles/test_dag_bundle_manager.py b/tests/dag_processing/bundles/test_dag_bundle_manager.py index 3fe92b3af6444..fb08cdc95a9fc 100644 --- a/tests/dag_processing/bundles/test_dag_bundle_manager.py +++ b/tests/dag_processing/bundles/test_dag_bundle_manager.py @@ -128,6 +128,7 @@ def clear_db(): clear_db_dag_bundles() +@pytest.mark.db_test def test_sync_bundles_to_db(clear_db): bundle_manager = DagBundlesManager() From 53638289ae60de723843268dc1917a6188ff28da Mon Sep 17 00:00:00 2001 From: Jed Cunningham Date: Fri, 13 Dec 2024 17:06:17 -0700 Subject: [PATCH 3/8] Update erd --- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/img/airflow_erd.svg | 3360 ++++++++++---------- 2 files changed, 1674 insertions(+), 1688 deletions(-) diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 6ee5972cc1624..acd5f7aeaaaee 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -ccb8ef5583b2a6b3ee3ab4212139c112b92953675655010a6775fffb4945b206 \ No newline at end of file +dc1ed8fb08456efddbcfcb0a1665b90091b5157432f11654fc4d0744baa90cdb \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index 9c37f5c320686..5c7a834fcfaac 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -4,11 +4,11 @@ - - + + %3 - + log @@ -304,2137 +304,2123 @@ asset_alias - -asset_alias - -id - - [INTEGER] - NOT NULL - -group - - [VARCHAR(1500)] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL + +asset_alias + +id + + [INTEGER] + NOT NULL + +group + + [VARCHAR(1500)] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL asset_alias_asset - -asset_alias_asset - -alias_id - - [INTEGER] - NOT NULL - -asset_id - - [INTEGER] - NOT NULL + +asset_alias_asset + +alias_id + + [INTEGER] + NOT NULL + +asset_id + + [INTEGER] + NOT NULL asset_alias--asset_alias_asset - -0..N -1 + +0..N +1 asset_alias_asset_event - -asset_alias_asset_event - -alias_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL + +asset_alias_asset_event + +alias_id + + [INTEGER] + NOT NULL + +event_id + + [INTEGER] + NOT NULL asset_alias--asset_alias_asset_event - -0..N -1 + +0..N +1 dag_schedule_asset_alias_reference - -dag_schedule_asset_alias_reference - -alias_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_alias_reference + +alias_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset_alias--dag_schedule_asset_alias_reference - -0..N -1 + +0..N +1 asset - -asset - -id - - [INTEGER] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -extra - - [JSON] - NOT NULL - -group - - [VARCHAR(1500)] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL + +asset + +id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +extra + + [JSON] + NOT NULL + +group + + [VARCHAR(1500)] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +uri + + [VARCHAR(1500)] + NOT NULL asset--asset_alias_asset - -0..N -1 + +0..N +1 asset_trigger - -asset_trigger - -asset_id - - [INTEGER] - NOT NULL - -trigger_id - - [INTEGER] - NOT NULL + +asset_trigger + +asset_id + + [INTEGER] + NOT NULL + +trigger_id + + [INTEGER] + NOT NULL asset--asset_trigger - -0..N -1 + +0..N +1 asset_active - -asset_active - -name - - [VARCHAR(1500)] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL + +asset_active + +name + + [VARCHAR(1500)] + NOT NULL + +uri + + [VARCHAR(1500)] + NOT NULL asset--asset_active - -1 -1 + +1 +1 asset--asset_active - -1 -1 + +1 +1 dag_schedule_asset_reference - -dag_schedule_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset--dag_schedule_asset_reference - -0..N -1 + +0..N +1 task_outlet_asset_reference - -task_outlet_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +task_outlet_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset--task_outlet_asset_reference - -0..N -1 + +0..N +1 asset_dag_run_queue - -asset_dag_run_queue - -asset_id - - [INTEGER] - NOT NULL - -target_dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL + +asset_dag_run_queue + +asset_id + + [INTEGER] + NOT NULL + +target_dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL asset--asset_dag_run_queue - -0..N -1 + +0..N +1 asset_event - -asset_event - -id - - [INTEGER] - NOT NULL - -asset_id - - [INTEGER] - NOT NULL - -extra - - [JSON] - NOT NULL - -source_dag_id - - [VARCHAR(250)] - -source_map_index - - [INTEGER] - -source_run_id - - [VARCHAR(250)] - -source_task_id - - [VARCHAR(250)] - -timestamp - - [TIMESTAMP] - NOT NULL + +asset_event + +id + + [INTEGER] + NOT NULL + +asset_id + + [INTEGER] + NOT NULL + +extra + + [JSON] + NOT NULL + +source_dag_id + + [VARCHAR(250)] + +source_map_index + + [INTEGER] + +source_run_id + + [VARCHAR(250)] + +source_task_id + + [VARCHAR(250)] + +timestamp + + [TIMESTAMP] + NOT NULL asset_event--asset_alias_asset_event - -0..N -1 + +0..N +1 dagrun_asset_event - -dagrun_asset_event - -dag_run_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL + +dagrun_asset_event + +dag_run_id + + [INTEGER] + NOT NULL + +event_id + + [INTEGER] + NOT NULL asset_event--dagrun_asset_event - -0..N -1 + +0..N +1 trigger - -trigger - -id - - [INTEGER] - NOT NULL - -classpath - - [VARCHAR(1000)] - NOT NULL - -created_date - - [TIMESTAMP] - NOT NULL - -kwargs - - [TEXT] - NOT NULL - -triggerer_id - - [INTEGER] + +trigger + +id + + [INTEGER] + NOT NULL + +classpath + + [VARCHAR(1000)] + NOT NULL + +created_date + + [TIMESTAMP] + NOT NULL + +kwargs + + [TEXT] + NOT NULL + +triggerer_id + + [INTEGER] trigger--asset_trigger - -0..N -1 + +0..N +1 task_instance - -task_instance - -id - - [UUID] - NOT NULL - -custom_operator_name - - [VARCHAR(1000)] - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - -duration - - [DOUBLE_PRECISION] - -end_date - - [TIMESTAMP] - -executor - - [VARCHAR(1000)] - -executor_config - - [BYTEA] - -external_executor_id - - [VARCHAR(250)] - -hostname - - [VARCHAR(1000)] - -last_heartbeat_at - - [TIMESTAMP] - -map_index - - [INTEGER] - NOT NULL - -max_tries - - [INTEGER] - -next_kwargs - - [JSON] - -next_method - - [VARCHAR(1000)] - -operator - - [VARCHAR(1000)] - -pid - - [INTEGER] - -pool - - [VARCHAR(256)] - NOT NULL - -pool_slots - - [INTEGER] - NOT NULL - -priority_weight - - [INTEGER] - -queue - - [VARCHAR(256)] - -queued_by_job_id - - [INTEGER] - -queued_dttm - - [TIMESTAMP] - -rendered_map_index - - [VARCHAR(250)] - -run_id - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -task_display_name - - [VARCHAR(2000)] - -task_id - - [VARCHAR(250)] - NOT NULL - -trigger_id - - [INTEGER] - -trigger_timeout - - [TIMESTAMP] - -try_number - - [INTEGER] - -unixname - - [VARCHAR(1000)] - -updated_at - - [TIMESTAMP] + +task_instance + +id + + [UUID] + NOT NULL + +custom_operator_name + + [VARCHAR(1000)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + +duration + + [DOUBLE_PRECISION] + +end_date + + [TIMESTAMP] + +executor + + [VARCHAR(1000)] + +executor_config + + [BYTEA] + +external_executor_id + + [VARCHAR(250)] + +hostname + + [VARCHAR(1000)] + +last_heartbeat_at + + [TIMESTAMP] + +map_index + + [INTEGER] + NOT NULL + +max_tries + + [INTEGER] + +next_kwargs + + [JSON] + +next_method + + [VARCHAR(1000)] + +operator + + [VARCHAR(1000)] + +pid + + [INTEGER] + +pool + + [VARCHAR(256)] + NOT NULL + +pool_slots + + [INTEGER] + NOT NULL + +priority_weight + + [INTEGER] + +queue + + [VARCHAR(256)] + +queued_by_job_id + + [INTEGER] + +queued_dttm + + [TIMESTAMP] + +rendered_map_index + + [VARCHAR(250)] + +run_id + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +task_display_name + + [VARCHAR(2000)] + +task_id + + [VARCHAR(250)] + NOT NULL + +trigger_id + + [INTEGER] + +trigger_timeout + + [TIMESTAMP] + +try_number + + [INTEGER] + +unixname + + [VARCHAR(1000)] + +updated_at + + [TIMESTAMP] trigger--task_instance - -0..N -{0,1} + +0..N +{0,1} task_reschedule - -task_reschedule - -id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -duration - - [INTEGER] - NOT NULL - -end_date - - [TIMESTAMP] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -reschedule_date - - [TIMESTAMP] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -try_number - - [INTEGER] - NOT NULL + +task_reschedule + +id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +duration + + [INTEGER] + NOT NULL + +end_date + + [TIMESTAMP] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +reschedule_date + + [TIMESTAMP] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +try_number + + [INTEGER] + NOT NULL task_instance--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 rendered_task_instance_fields - -rendered_task_instance_fields - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -k8s_pod_yaml - - [JSON] - -rendered_fields - - [JSON] - NOT NULL + +rendered_task_instance_fields + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +k8s_pod_yaml + + [JSON] + +rendered_fields + + [JSON] + NOT NULL task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_map - -task_map - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -keys - - [JSON] - -length - - [INTEGER] - NOT NULL + +task_map + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +keys + + [JSON] + +length + + [INTEGER] + NOT NULL task_instance--task_map - -0..N -1 + +0..N +1 task_instance--task_map - -0..N -1 + +0..N +1 task_instance--task_map - -0..N -1 + +0..N +1 task_instance--task_map - -0..N -1 + +0..N +1 xcom - -xcom - -dag_run_id - - [INTEGER] - NOT NULL - -key - - [VARCHAR(512)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -timestamp - - [TIMESTAMP] - NOT NULL - -value - - [JSONB] + +xcom + +dag_run_id + + [INTEGER] + NOT NULL + +key + + [VARCHAR(512)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +timestamp + + [TIMESTAMP] + NOT NULL + +value + + [JSONB] task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance_note - -task_instance_note - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] + +task_instance_note + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [VARCHAR(128)] task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance_history - -task_instance_history - -id - - [INTEGER] - NOT NULL - -custom_operator_name - - [VARCHAR(1000)] - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - -duration - - [DOUBLE_PRECISION] - -end_date - - [TIMESTAMP] - -executor - - [VARCHAR(1000)] - -executor_config - - [BYTEA] - -external_executor_id - - [VARCHAR(250)] - -hostname - - [VARCHAR(1000)] - -map_index - - [INTEGER] - NOT NULL - -max_tries - - [INTEGER] - -next_kwargs - - [JSON] - -next_method - - [VARCHAR(1000)] - -operator - - [VARCHAR(1000)] - -pid - - [INTEGER] - -pool - - [VARCHAR(256)] - NOT NULL - -pool_slots - - [INTEGER] - NOT NULL - -priority_weight - - [INTEGER] - -queue - - [VARCHAR(256)] - -queued_by_job_id - - [INTEGER] - -queued_dttm - - [TIMESTAMP] - -rendered_map_index - - [VARCHAR(250)] - -run_id - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -task_display_name - - [VARCHAR(2000)] - -task_id - - [VARCHAR(250)] - NOT NULL - -trigger_id - - [INTEGER] - -trigger_timeout - - [TIMESTAMP] - -try_number - - [INTEGER] - NOT NULL - -unixname - - [VARCHAR(1000)] - -updated_at - - [TIMESTAMP] + +task_instance_history + +id + + [INTEGER] + NOT NULL + +custom_operator_name + + [VARCHAR(1000)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + +duration + + [DOUBLE_PRECISION] + +end_date + + [TIMESTAMP] + +executor + + [VARCHAR(1000)] + +executor_config + + [BYTEA] + +external_executor_id + + [VARCHAR(250)] + +hostname + + [VARCHAR(1000)] + +map_index + + [INTEGER] + NOT NULL + +max_tries + + [INTEGER] + +next_kwargs + + [JSON] + +next_method + + [VARCHAR(1000)] + +operator + + [VARCHAR(1000)] + +pid + + [INTEGER] + +pool + + [VARCHAR(256)] + NOT NULL + +pool_slots + + [INTEGER] + NOT NULL + +priority_weight + + [INTEGER] + +queue + + [VARCHAR(256)] + +queued_by_job_id + + [INTEGER] + +queued_dttm + + [TIMESTAMP] + +rendered_map_index + + [VARCHAR(250)] + +run_id + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +task_display_name + + [VARCHAR(2000)] + +task_id + + [VARCHAR(250)] + NOT NULL + +trigger_id + + [INTEGER] + +trigger_timeout + + [TIMESTAMP] + +try_number + + [INTEGER] + NOT NULL + +unixname + + [VARCHAR(1000)] + +updated_at + + [TIMESTAMP] task_instance--task_instance_history - -0..N -1 + +0..N +1 task_instance--task_instance_history - -0..N -1 + +0..N +1 task_instance--task_instance_history - -0..N -1 + +0..N +1 task_instance--task_instance_history - -0..N -1 + +0..N +1 dag_bundle - -dag_bundle - -id - - [UUID] - NOT NULL - -classpath - - [VARCHAR(1000)] - NOT NULL - -kwargs - - [JSON] - -last_refreshed - - [TIMESTAMP] - -latest_version - - [VARCHAR(200)] - -name - - [VARCHAR(250)] - NOT NULL - -refresh_interval - - [INTEGER] + +dag_bundle + +name + + [VARCHAR(250)] + NOT NULL + +enabled + + [BOOLEAN] + +last_refreshed + + [TIMESTAMP] + +latest_version + + [VARCHAR(200)] dag - -dag - -dag_id - - [VARCHAR(250)] - NOT NULL - -asset_expression - - [JSON] - -bundle_id - - [UUID] - -dag_display_name - - [VARCHAR(2000)] - -default_view - - [VARCHAR(25)] - -description - - [TEXT] - -fileloc - - [VARCHAR(2000)] - -has_import_errors - - [BOOLEAN] - -has_task_concurrency_limits - - [BOOLEAN] - NOT NULL - -is_active - - [BOOLEAN] - -is_paused - - [BOOLEAN] - -last_expired - - [TIMESTAMP] - -last_parsed_time - - [TIMESTAMP] - -latest_bundle_version - - [VARCHAR(200)] - -max_active_runs - - [INTEGER] - -max_active_tasks - - [INTEGER] - NOT NULL - -max_consecutive_failed_dag_runs - - [INTEGER] - NOT NULL - -next_dagrun - - [TIMESTAMP] - -next_dagrun_create_after - - [TIMESTAMP] - -next_dagrun_data_interval_end - - [TIMESTAMP] - -next_dagrun_data_interval_start - - [TIMESTAMP] - -owners - - [VARCHAR(2000)] - -processor_subdir - - [VARCHAR(2000)] - -timetable_description - - [VARCHAR(1000)] - -timetable_summary - - [TEXT] + +dag + +dag_id + + [VARCHAR(250)] + NOT NULL + +asset_expression + + [JSON] + +bundle_name + + [VARCHAR(250)] + +dag_display_name + + [VARCHAR(2000)] + +default_view + + [VARCHAR(25)] + +description + + [TEXT] + +fileloc + + [VARCHAR(2000)] + +has_import_errors + + [BOOLEAN] + +has_task_concurrency_limits + + [BOOLEAN] + NOT NULL + +is_active + + [BOOLEAN] + +is_paused + + [BOOLEAN] + +last_expired + + [TIMESTAMP] + +last_parsed_time + + [TIMESTAMP] + +latest_bundle_version + + [VARCHAR(200)] + +max_active_runs + + [INTEGER] + +max_active_tasks + + [INTEGER] + NOT NULL + +max_consecutive_failed_dag_runs + + [INTEGER] + NOT NULL + +next_dagrun + + [TIMESTAMP] + +next_dagrun_create_after + + [TIMESTAMP] + +next_dagrun_data_interval_end + + [TIMESTAMP] + +next_dagrun_data_interval_start + + [TIMESTAMP] + +owners + + [VARCHAR(2000)] + +processor_subdir + + [VARCHAR(2000)] + +timetable_description + + [VARCHAR(1000)] + +timetable_summary + + [TEXT] dag_bundle--dag - -0..N -{0,1} + +0..N +{0,1} dag--dag_schedule_asset_alias_reference - -0..N -1 + +0..N +1 dag--dag_schedule_asset_reference - -0..N -1 + +0..N +1 dag--task_outlet_asset_reference - -0..N -1 + +0..N +1 dag--asset_dag_run_queue - -0..N -1 + +0..N +1 dag_version - -dag_version - -id - - [UUID] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -version_number - - [INTEGER] - NOT NULL + +dag_version + +id + + [UUID] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +version_number + + [INTEGER] + NOT NULL dag--dag_version - -0..N -1 + +0..N +1 dag_tag - -dag_tag - -dag_id - - [VARCHAR(250)] - NOT NULL - -name - - [VARCHAR(100)] - NOT NULL + +dag_tag + +dag_id + + [VARCHAR(250)] + NOT NULL + +name + + [VARCHAR(100)] + NOT NULL dag--dag_tag - -0..N -1 + +0..N +1 dag_owner_attributes - -dag_owner_attributes - -dag_id - - [VARCHAR(250)] - NOT NULL - -owner - - [VARCHAR(500)] - NOT NULL - -link - - [VARCHAR(500)] - NOT NULL + +dag_owner_attributes + +dag_id + + [VARCHAR(250)] + NOT NULL + +owner + + [VARCHAR(500)] + NOT NULL + +link + + [VARCHAR(500)] + NOT NULL dag--dag_owner_attributes - -0..N -1 + +0..N +1 dag_warning - -dag_warning - -dag_id - - [VARCHAR(250)] - NOT NULL - -warning_type - - [VARCHAR(50)] - NOT NULL - -message - - [TEXT] - NOT NULL - -timestamp - - [TIMESTAMP] - NOT NULL + +dag_warning + +dag_id + + [VARCHAR(250)] + NOT NULL + +warning_type + + [VARCHAR(50)] + NOT NULL + +message + + [TEXT] + NOT NULL + +timestamp + + [TIMESTAMP] + NOT NULL dag--dag_warning - -0..N -1 + +0..N +1 dag_version--task_instance - -0..N -{0,1} + +0..N +{0,1} dag_run - -dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - -clear_number - - [INTEGER] - NOT NULL - -conf - - [BYTEA] - -creating_job_id - - [INTEGER] - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - -data_interval_end - - [TIMESTAMP] - -data_interval_start - - [TIMESTAMP] - -end_date - - [TIMESTAMP] - -external_trigger - - [BOOLEAN] - -last_scheduling_decision - - [TIMESTAMP] - -log_template_id - - [INTEGER] - -logical_date - - [TIMESTAMP] - NOT NULL - -queued_at - - [TIMESTAMP] - -run_id - - [VARCHAR(250)] - NOT NULL - -run_type - - [VARCHAR(50)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(50)] - -triggered_by - - [VARCHAR(50)] - -updated_at - - [TIMESTAMP] + +dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + +clear_number + + [INTEGER] + NOT NULL + +conf + + [BYTEA] + +creating_job_id + + [INTEGER] + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + +data_interval_end + + [TIMESTAMP] + +data_interval_start + + [TIMESTAMP] + +end_date + + [TIMESTAMP] + +external_trigger + + [BOOLEAN] + +last_scheduling_decision + + [TIMESTAMP] + +log_template_id + + [INTEGER] + +logical_date + + [TIMESTAMP] + NOT NULL + +queued_at + + [TIMESTAMP] + +run_id + + [VARCHAR(250)] + NOT NULL + +run_type + + [VARCHAR(50)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(50)] + +triggered_by + + [VARCHAR(50)] + +updated_at + + [TIMESTAMP] dag_version--dag_run - -0..N -{0,1} + +0..N +{0,1} dag_code - -dag_code - -id - - [UUID] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - NOT NULL - -fileloc - - [VARCHAR(2000)] - NOT NULL - -last_updated - - [TIMESTAMP] - NOT NULL - -source_code - - [TEXT] - NOT NULL - -source_code_hash - - [VARCHAR(32)] - NOT NULL + +dag_code + +id + + [UUID] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + NOT NULL + +fileloc + + [VARCHAR(2000)] + NOT NULL + +last_updated + + [TIMESTAMP] + NOT NULL + +source_code + + [TEXT] + NOT NULL + +source_code_hash + + [VARCHAR(32)] + NOT NULL dag_version--dag_code - -0..N -1 + +0..N +1 serialized_dag - -serialized_dag - -id - - [UUID] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -dag_hash - - [VARCHAR(32)] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - NOT NULL - -data - - [JSON] - -data_compressed - - [BYTEA] - -processor_subdir - - [VARCHAR(2000)] + +serialized_dag + +id + + [UUID] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +dag_hash + + [VARCHAR(32)] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + NOT NULL + +data + + [JSON] + +data_compressed + + [BYTEA] + +processor_subdir + + [VARCHAR(2000)] dag_version--serialized_dag - -0..N -1 + +0..N +1 dag_run--dagrun_asset_event - -0..N -1 + +0..N +1 dag_run--task_instance - -0..N -1 + +0..N +1 dag_run--task_instance - -0..N -1 + +0..N +1 backfill_dag_run - -backfill_dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - NOT NULL - -dag_run_id - - [INTEGER] - -exception_reason - - [VARCHAR(250)] - -logical_date - - [TIMESTAMP] - NOT NULL - -sort_ordinal - - [INTEGER] - NOT NULL + +backfill_dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + NOT NULL + +dag_run_id + + [INTEGER] + +exception_reason + + [VARCHAR(250)] + +logical_date + + [TIMESTAMP] + NOT NULL + +sort_ordinal + + [INTEGER] + NOT NULL dag_run--backfill_dag_run - -0..N -{0,1} + +0..N +{0,1} dag_run_note - -dag_run_note - -dag_run_id - - [INTEGER] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] + +dag_run_note + +dag_run_id + + [INTEGER] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [VARCHAR(128)] dag_run--dag_run_note - -1 -1 + +1 +1 dag_run--task_reschedule - -0..N -1 + +0..N +1 dag_run--task_reschedule - -0..N -1 + +0..N +1 log_template - -log_template - -id - - [INTEGER] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -elasticsearch_id - - [TEXT] - NOT NULL - -filename - - [TEXT] - NOT NULL + +log_template + +id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +elasticsearch_id + + [TEXT] + NOT NULL + +filename + + [TEXT] + NOT NULL log_template--dag_run - -0..N -{0,1} + +0..N +{0,1} backfill - -backfill - -id - - [INTEGER] - NOT NULL - -completed_at - - [TIMESTAMP] - -created_at - - [TIMESTAMP] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_run_conf - - [JSON] - NOT NULL - -from_date - - [TIMESTAMP] - NOT NULL - -is_paused - - [BOOLEAN] - -max_active_runs - - [INTEGER] - NOT NULL - -reprocess_behavior - - [VARCHAR(250)] - NOT NULL - -to_date - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +backfill + +id + + [INTEGER] + NOT NULL + +completed_at + + [TIMESTAMP] + +created_at + + [TIMESTAMP] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_run_conf + + [JSON] + NOT NULL + +from_date + + [TIMESTAMP] + NOT NULL + +is_paused + + [BOOLEAN] + +max_active_runs + + [INTEGER] + NOT NULL + +reprocess_behavior + + [VARCHAR(250)] + NOT NULL + +to_date + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL backfill--dag_run - -0..N -{0,1} + +0..N +{0,1} backfill--backfill_dag_run - -0..N -1 + +0..N +1 session - -session - -id - - [INTEGER] - NOT NULL - -data - - [BYTEA] - -expiry - - [TIMESTAMP] - -session_id - - [VARCHAR(255)] + +session + +id + + [INTEGER] + NOT NULL + +data + + [BYTEA] + +expiry + + [TIMESTAMP] + +session_id + + [VARCHAR(255)] alembic_version - -alembic_version - -version_num - - [VARCHAR(32)] - NOT NULL + +alembic_version + +version_num + + [VARCHAR(32)] + NOT NULL ab_user - -ab_user - -id - - [INTEGER] - NOT NULL - -active - - [BOOLEAN] - -changed_by_fk - - [INTEGER] - -changed_on - - [TIMESTAMP] - -created_by_fk - - [INTEGER] - -created_on - - [TIMESTAMP] - -email - - [VARCHAR(512)] - NOT NULL - -fail_login_count - - [INTEGER] - -first_name - - [VARCHAR(256)] - NOT NULL - -last_login - - [TIMESTAMP] - -last_name - - [VARCHAR(256)] - NOT NULL - -login_count - - [INTEGER] - -password - - [VARCHAR(256)] - -username - - [VARCHAR(512)] - NOT NULL + +ab_user + +id + + [INTEGER] + NOT NULL + +active + + [BOOLEAN] + +changed_by_fk + + [INTEGER] + +changed_on + + [TIMESTAMP] + +created_by_fk + + [INTEGER] + +created_on + + [TIMESTAMP] + +email + + [VARCHAR(512)] + NOT NULL + +fail_login_count + + [INTEGER] + +first_name + + [VARCHAR(256)] + NOT NULL + +last_login + + [TIMESTAMP] + +last_name + + [VARCHAR(256)] + NOT NULL + +login_count + + [INTEGER] + +password + + [VARCHAR(256)] + +username + + [VARCHAR(512)] + NOT NULL ab_user--ab_user - -0..N -{0,1} + +0..N +{0,1} ab_user--ab_user - -0..N -{0,1} + +0..N +{0,1} ab_user_role - -ab_user_role - -id - - [INTEGER] - NOT NULL - -role_id - - [INTEGER] - -user_id - - [INTEGER] + +ab_user_role + +id + + [INTEGER] + NOT NULL + +role_id + + [INTEGER] + +user_id + + [INTEGER] ab_user--ab_user_role - -0..N -{0,1} + +0..N +{0,1} ab_register_user - -ab_register_user - -id - - [INTEGER] - NOT NULL - -email - - [VARCHAR(512)] - NOT NULL - -first_name - - [VARCHAR(256)] - NOT NULL - -last_name - - [VARCHAR(256)] - NOT NULL - -password - - [VARCHAR(256)] - -registration_date - - [TIMESTAMP] - -registration_hash - - [VARCHAR(256)] - -username - - [VARCHAR(512)] - NOT NULL + +ab_register_user + +id + + [INTEGER] + NOT NULL + +email + + [VARCHAR(512)] + NOT NULL + +first_name + + [VARCHAR(256)] + NOT NULL + +last_name + + [VARCHAR(256)] + NOT NULL + +password + + [VARCHAR(256)] + +registration_date + + [TIMESTAMP] + +registration_hash + + [VARCHAR(256)] + +username + + [VARCHAR(512)] + NOT NULL ab_permission - -ab_permission - -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(100)] - NOT NULL + +ab_permission + +id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(100)] + NOT NULL ab_permission_view - -ab_permission_view - -id - - [INTEGER] - NOT NULL - -permission_id - - [INTEGER] - -view_menu_id - - [INTEGER] + +ab_permission_view + +id + + [INTEGER] + NOT NULL + +permission_id + + [INTEGER] + +view_menu_id + + [INTEGER] ab_permission--ab_permission_view - -0..N -{0,1} + +0..N +{0,1} ab_permission_view_role - -ab_permission_view_role - -id - - [INTEGER] - NOT NULL - -permission_view_id - - [INTEGER] - -role_id - - [INTEGER] + +ab_permission_view_role + +id + + [INTEGER] + NOT NULL + +permission_view_id + + [INTEGER] + +role_id + + [INTEGER] ab_permission_view--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} ab_view_menu - -ab_view_menu - -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(250)] - NOT NULL + +ab_view_menu + +id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(250)] + NOT NULL ab_view_menu--ab_permission_view - -0..N -{0,1} + +0..N +{0,1} ab_role - -ab_role - -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(64)] - NOT NULL + +ab_role + +id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(64)] + NOT NULL ab_role--ab_user_role - -0..N -{0,1} + +0..N +{0,1} ab_role--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} alembic_version_fab - -alembic_version_fab - -version_num - - [VARCHAR(32)] - NOT NULL + +alembic_version_fab + +version_num + + [VARCHAR(32)] + NOT NULL From afbf18180185ced4b31a7bec47fee18b8413eaf1 Mon Sep 17 00:00:00 2001 From: Jed Cunningham Date: Fri, 13 Dec 2024 22:48:40 -0700 Subject: [PATCH 4/8] Fix docs build --- airflow/config_templates/config.yml | 11 ++++++----- airflow/dag_processing/bundles/manager.py | 4 +++- docs/spelling_wordlist.txt | 2 ++ 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index b7146644cf565..62544f7183faa 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2661,9 +2661,10 @@ dag_bundles: Airflow will consume all options added to this section. Below you will see only the default, ``dags_folder``. The option name is the bundle name and the value is a json object with the following keys: - - classpath: The classpath of the bundle class - - kwargs: The keyword arguments to pass to the bundle class - - refresh_interval: The interval in seconds to refresh the bundle from its source. + + * classpath: The classpath of the bundle class + * kwargs: The keyword arguments to pass to the bundle class + * refresh_interval: The interval in seconds to refresh the bundle from its source. For example, to add a new bundle named ``hello`` to my Airflow instance, add the following to your airflow.cfg (this is just an example, the classpath and kwargs are not real): @@ -2682,5 +2683,5 @@ dag_bundles: version_added: ~ type: string example: ~ - default: | - {{"classpath": "airflow.dag_processing.bundles.dagfolder.DagsFolderDagBundle", "kwargs": {{}}}} + default: '{{"classpath": "airflow.dag_processing.bundles.dagfolder.DagsFolderDagBundle", + "kwargs": {{}}}}' diff --git a/airflow/dag_processing/bundles/manager.py b/airflow/dag_processing/bundles/manager.py index fac80386ecf7a..86e07572a0847 100644 --- a/airflow/dag_processing/bundles/manager.py +++ b/airflow/dag_processing/bundles/manager.py @@ -76,6 +76,7 @@ def get_all_dag_bundles(self) -> list[BaseDagBundle]: Get all DAG bundles. :param session: A database session. + :return: list of DAG bundles. """ return [self.get_bundle(name, version=None) for name in self.bundle_configs.keys()] @@ -85,7 +86,8 @@ def get_bundle(self, name: str, version: str | None = None) -> BaseDagBundle: Get a DAG bundle by name. :param name: The name of the DAG bundle. - :param version: The version of the DAG bundle you need (optional). If not provided, `tracking_ref` will be used instead. + :param version: The version of the DAG bundle you need (optional). If not provided, ``tracking_ref`` will be used instead. + :return: The DAG bundle. """ # TODO: proper validation of the bundle configuration so we have better error messages diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index cee69fcf032cd..019d6cdfa2d6a 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -356,6 +356,7 @@ dagbag dagbags DagCallbackRequest DagFileProcessorManager +dagfolder dagmodel DagParam Dagre @@ -367,6 +368,7 @@ dagruns DagRunState dagRunState DAGs +DagsFolderDagBundle Dask dask daskexecutor From c77d57e3b959fadc9257d8c8cc48a4c5133a02f5 Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Sat, 14 Dec 2024 08:14:26 -0700 Subject: [PATCH 5/8] typo Co-authored-by: Ephraim Anierobi --- airflow/config_templates/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 62544f7183faa..b7526eff8372a 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2677,7 +2677,7 @@ dag_bundles: dags_folder: description: | This is the default DAG bundle that loads DAGs from the traditional ``[core] dags_folder``. - By default, ``refresh_interval`` it set to ``[scheduler] dag_dir_list_interval``, but that can be + By default, ``refresh_interval`` is set to ``[scheduler] dag_dir_list_interval``, but that can be overridden here if desired. Parsing DAGs from the DAG folder can be disabled by setting this option to an empty string. version_added: ~ From 58dc4d17f45008c78230d6a939759865bbc38e08 Mon Sep 17 00:00:00 2001 From: Jed Cunningham Date: Mon, 16 Dec 2024 12:59:46 -0700 Subject: [PATCH 6/8] Add refresh_interval to docstring --- airflow/dag_processing/bundles/base.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/dag_processing/bundles/base.py b/airflow/dag_processing/bundles/base.py index 6cae0eca44a9b..a0061d97e8f21 100644 --- a/airflow/dag_processing/bundles/base.py +++ b/airflow/dag_processing/bundles/base.py @@ -40,6 +40,7 @@ class BaseDagBundle(ABC): multiple versions of the same bundle in use at the same time. The DAG processor will always use the latest version. :param name: String identifier for the DAG bundle + :param refresh_interval: How often the bundle should be refreshed from the source (in seconds) :param version: Version of the DAG bundle (Optional) """ From aa23b88b3bde78ade3523a2ee8dacaeec9d9dae4 Mon Sep 17 00:00:00 2001 From: Jed Cunningham Date: Tue, 17 Dec 2024 11:18:19 -0700 Subject: [PATCH 7/8] rename column to active --- airflow/dag_processing/bundles/manager.py | 4 ++-- .../versions/0050_3_0_0_add_dagbundlemodel.py | 2 +- airflow/models/dagbundle.py | 4 ++-- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/img/airflow_erd.svg | 6 +++--- .../dag_processing/bundles/test_dag_bundle_manager.py | 10 +++++----- 6 files changed, 14 insertions(+), 14 deletions(-) diff --git a/airflow/dag_processing/bundles/manager.py b/airflow/dag_processing/bundles/manager.py index 86e07572a0847..4f8b59b956e18 100644 --- a/airflow/dag_processing/bundles/manager.py +++ b/airflow/dag_processing/bundles/manager.py @@ -61,14 +61,14 @@ def sync_bundles_to_db(self, *, session: Session = NEW_SESSION) -> None: for name in self.bundle_configs.keys(): if bundle := known_bundles.get(name): - bundle.enabled = True + bundle.active = True else: session.add(DagBundleModel(name=name)) self.log.info("Added new DAG bundle %s to the database", name) for name, bundle in known_bundles.items(): if name not in self.bundle_configs: - bundle.enabled = False + bundle.active = False self.log.warning("DAG bundle %s is no longer found in config and has been disabled", name) def get_all_dag_bundles(self) -> list[BaseDagBundle]: diff --git a/airflow/migrations/versions/0050_3_0_0_add_dagbundlemodel.py b/airflow/migrations/versions/0050_3_0_0_add_dagbundlemodel.py index fdd87547283e7..7e6cb756c5cb2 100644 --- a/airflow/migrations/versions/0050_3_0_0_add_dagbundlemodel.py +++ b/airflow/migrations/versions/0050_3_0_0_add_dagbundlemodel.py @@ -42,7 +42,7 @@ def upgrade(): op.create_table( "dag_bundle", sa.Column("name", sa.String(length=250), nullable=False), - sa.Column("enabled", sa.Boolean(), nullable=True), + sa.Column("active", sa.Boolean(), nullable=True), sa.Column("latest_version", sa.String(length=200), nullable=True), sa.Column("last_refreshed", UtcDateTime(timezone=True), nullable=True), sa.PrimaryKeyConstraint("name", name=op.f("dag_bundle_pkey")), diff --git a/airflow/models/dagbundle.py b/airflow/models/dagbundle.py index 3449d9995d6fa..08429db0b0bcb 100644 --- a/airflow/models/dagbundle.py +++ b/airflow/models/dagbundle.py @@ -28,14 +28,14 @@ class DagBundleModel(Base): We track the following information about each bundle, as it can be useful for informational purposes and for debugging: - - enabled: Is the bundle currently found in configuration? + - active: Is the bundle currently found in configuration? - latest_version: The latest version Airflow has seen for the bundle. - last_refreshed: When the bundle was last refreshed. """ __tablename__ = "dag_bundle" name = Column(StringID(), primary_key=True) - enabled = Column(Boolean, default=True) + active = Column(Boolean, default=True) latest_version = Column(String(200), nullable=True) last_refreshed = Column(UtcDateTime, nullable=True) diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index acd5f7aeaaaee..b503e8dfaf91a 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -dc1ed8fb08456efddbcfcb0a1665b90091b5157432f11654fc4d0744baa90cdb \ No newline at end of file +ba10504bc54d15b2faca37ae9db172848a498e471bbf332e031715f728158ff8 \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index 5c7a834fcfaac..9f19f0f920a5c 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -1406,9 +1406,9 @@ [VARCHAR(250)] NOT NULL -enabled - - [BOOLEAN] +active + + [BOOLEAN] last_refreshed diff --git a/tests/dag_processing/bundles/test_dag_bundle_manager.py b/tests/dag_processing/bundles/test_dag_bundle_manager.py index fb08cdc95a9fc..c79acfc2ed2a5 100644 --- a/tests/dag_processing/bundles/test_dag_bundle_manager.py +++ b/tests/dag_processing/bundles/test_dag_bundle_manager.py @@ -132,22 +132,22 @@ def clear_db(): def test_sync_bundles_to_db(clear_db): bundle_manager = DagBundlesManager() - def _get_bundle_names_and_enabled(): + def _get_bundle_names_and_active(): with create_session() as session: return ( - session.query(DagBundleModel.name, DagBundleModel.enabled).order_by(DagBundleModel.name).all() + session.query(DagBundleModel.name, DagBundleModel.active).order_by(DagBundleModel.name).all() ) # Initial add with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE": json.dumps(BASIC_BUNDLE_CONFIG)}): bundle_manager.sync_bundles_to_db() - assert _get_bundle_names_and_enabled() == [("dags_folder", True), ("testbundle", True)] + assert _get_bundle_names_and_active() == [("dags_folder", True), ("testbundle", True)] # Disable ones that disappear from config bundle_manager.sync_bundles_to_db() - assert _get_bundle_names_and_enabled() == [("dags_folder", True), ("testbundle", False)] + assert _get_bundle_names_and_active() == [("dags_folder", True), ("testbundle", False)] # Re-enable one that reappears in config with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE": json.dumps(BASIC_BUNDLE_CONFIG)}): bundle_manager.sync_bundles_to_db() - assert _get_bundle_names_and_enabled() == [("dags_folder", True), ("testbundle", True)] + assert _get_bundle_names_and_active() == [("dags_folder", True), ("testbundle", True)] From b05cee5ac065831129b505c83cbe5057f0fa927a Mon Sep 17 00:00:00 2001 From: Jed Cunningham Date: Tue, 17 Dec 2024 13:30:11 -0700 Subject: [PATCH 8/8] Fix compat tests --- tests_common/test_utils/db.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests_common/test_utils/db.py b/tests_common/test_utils/db.py index b70771aaa7bc2..c5e595240d36e 100644 --- a/tests_common/test_utils/db.py +++ b/tests_common/test_utils/db.py @@ -293,4 +293,5 @@ def clear_all(): clear_db_pools() clear_db_connections(add_default_connections_back=True) clear_dag_specific_permissions() - clear_db_dag_bundles() + if AIRFLOW_V_3_0_PLUS: + clear_db_dag_bundles()