Skip to content
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
36a9905
chore: move standard examples to provider
bhavaniravi Oct 26, 2025
a1d8210
fix: failing testcases to load examples form dagbundle
bhavaniravi Oct 27, 2025
689f502
Resolve provider example DAGs via ProvidersManager
andreahlert Apr 30, 2026
4586cc5
Tests: relax test_get_all_bundle_names assertion
andreahlert Apr 30, 2026
db1057e
Address review feedback on example DAG loading
andreahlert Apr 30, 2026
88fe049
Tests: use SQLAlchemy 2.0 delete() in trigger_dagrun teardown
andreahlert Apr 30, 2026
06f05b5
Tests: update bundle_name for example_python_operator
andreahlert May 1, 2026
457e7cd
Tests: avoid double-loading example DAGs in parse_and_sync_to_db
andreahlert May 1, 2026
84380f0
Trigger CI rerun
andreahlert May 1, 2026
aea523b
Update devel-common/src/tests_common/test_utils/db.py
andreahlert May 2, 2026
4cf8023
Update airflow-core/src/airflow/dag_processing/bundles/manager.py
andreahlert May 2, 2026
3c57649
Address review feedback on provider example DAG bundle discovery
andreahlert May 3, 2026
0b18521
Remove include_examples parameter from DagBag and test helpers
andreahlert May 4, 2026
4909626
Fix mypy and test assertion after DagBag include_examples removal
andreahlert May 4, 2026
38b06a5
Merge remote-tracking branch 'upstream/main' into pr-66161
andreahlert May 4, 2026
9ef9809
Drop include_examples from new DagBag callers and gate dag_maker for …
andreahlert May 4, 2026
f7c43d6
Gate DagBag include_examples on Airflow 3.3+ for older compat runs
andreahlert May 5, 2026
30607c9
Trim 66161 newsfragment to user-visible facts
andreahlert May 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions airflow-core/newsfragments/66161.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
Provider example DAGs are exposed as dedicated bundles

Example DAGs that ship with provider distributions are now discovered via
``ProvidersManager`` and registered as their own DAG bundles, one per
provider that ships an ``example_dags/`` folder. Bundle names follow the
shape ``apache-airflow-providers-<distribution>-example-dags`` (for
canonical Apache providers) or ``<distribution-name>-example-dags`` (for
third-party providers). The ``[core] load_examples`` configuration option
remains the single switch that controls whether any example bundles are
registered.

**What changed:**

- Example DAGs that previously came in under the implicit ``dags-folder``
bundle are now persisted in ``DagBundleModel`` rows and emitted in REST
API responses (``GET /api/v2/dags/{dag_id}/dag-versions`` and the
``bundle_name`` field on task-instance responses) under the new
per-provider bundle names.
- Nested providers such as ``apache-airflow-providers-common-sql`` are
discovered correctly (previously they were missed because discovery
walked ``airflow.providers.__path__`` directly).

**Behaviour changes:**

- Clients filtering or tracking bundles by ``"dags-folder"`` for
previously-shipped example DAGs (e.g. ``example_python_operator``) need
to update to the new per-provider bundle names. The DAG identifiers
themselves are unchanged.

**Removals:**

- The ``include_examples`` parameter has been removed from
``DagBag.__init__``, ``DagBag.collect_dags``, ``BundleDagBag.__init__``,
and ``tests_common.test_utils.db.parse_and_sync_to_db``. Example DAG
loading is now controlled exclusively by the ``[core] load_examples``
configuration option, which gates whether the per-provider example
bundles are registered. Callers that previously passed
``include_examples=True`` should set
``conf_vars({("core", "load_examples"): "true"})`` (or equivalent
configuration). Callers that previously passed
``include_examples=False`` can drop the argument: it matches the new
default behaviour where ``DagBag`` only walks the configured
``dag_folder`` and example DAGs come in via dedicated bundles.

* Types of change

* [ ] Dag changes
* [x] Config changes
* [x] API changes
* [ ] CLI changes
* [x] Behaviour changes
* [ ] Plugin changes
* [ ] Dependency changes
* [x] Code interface changes

* Migration rules needed

* Update clients that filter REST API responses by ``bundle_name`` to
match the new per-provider bundle names for example DAGs.
* Replace ``include_examples=True`` calls to ``DagBag`` /
``parse_and_sync_to_db`` with
``conf_vars({("core", "load_examples"): "true"})`` (or equivalent
configuration). The argument has been removed.
* Drop ``include_examples=False`` arguments from ``DagBag`` /
``parse_and_sync_to_db`` calls; the default behaviour is unchanged.
71 changes: 71 additions & 0 deletions airflow-core/src/airflow/dag_processing/bundles/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
# under the License.
from __future__ import annotations

import importlib
import logging
import os
import warnings
from typing import TYPE_CHECKING

Expand All @@ -29,6 +32,7 @@
from airflow.exceptions import AirflowConfigException
from airflow.models.dagbundle import DagBundleModel
from airflow.models.team import Team
from airflow.providers_manager import ProvidersManager
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import NEW_SESSION, provide_session

Expand All @@ -37,6 +41,8 @@

from sqlalchemy.orm import Session

log = logging.getLogger(__name__)

_example_dag_bundle_name = "example_dags"


Expand Down Expand Up @@ -106,6 +112,61 @@ def _add_example_dag_bundle(bundle_config_list: list[_ExternalBundleConfig]):
)


def _add_provider_example_dags_to_bundle(bundle_config_list: list[_ExternalBundleConfig]):
"""
Add an ``example_dags`` folder of every installed provider as a bundle.

Provider locations are resolved through ``ProvidersManager`` instead of
walking ``airflow.providers.__path__`` so that:

- nested providers (e.g. ``apache-airflow-providers-common-sql`` whose
module path is ``airflow.providers.common.sql``) are discovered;
- providers installed outside the ``airflow.providers`` namespace package
are discovered via their entry point.
"""
# Dedup on the resolved on-disk folder rather than the bundle name: distributions
# under ``airflow.providers.common.*`` use ``pkgutil.extend_path``, so when several
# ``common-*`` packages are installed ``airflow.providers.common.__path__`` has
# multiple entries and the inner loop iterates more than once. Path-based dedup
# only skips when the same folder is seen twice; distinct folders are preserved.
seen: set[str] = set()
Comment thread
andreahlert marked this conversation as resolved.

for package_name in ProvidersManager().providers:
# Heuristic: derive the import path from the canonical
# ``apache-airflow-providers-*`` distribution name. Tracked as a follow-up
# to record the provider module path on ``ProviderInfo`` (see
# https://github.com/apache/airflow/issues/66305).
if package_name.startswith("apache-airflow-providers-"):
suffix = package_name[len("apache-airflow-providers-") :]
module_name = "airflow.providers." + suffix.replace("-", ".")
Comment thread
andreahlert marked this conversation as resolved.
else:
module_name = package_name.replace("-", "_")
try:
module = importlib.import_module(module_name)
module_paths = list(getattr(module, "__path__", []))
except Exception:
log.exception("Could not load provider module %s for example DAG discovery", module_name)
continue

for module_path in module_paths:
example_dag_folder = os.path.join(module_path, "example_dags")
if not os.path.isdir(example_dag_folder):
continue
if example_dag_folder in seen:
continue
seen.add(example_dag_folder)
bundle_name = f"{package_name}-example-dags"
bundle_config_list.append(
_ExternalBundleConfig(
name=bundle_name,
classpath="airflow.dag_processing.bundles.local.LocalDagBundle",
kwargs={
"path": example_dag_folder,
},
)
)


def _is_safe_bundle_url(url: str) -> bool:
"""
Check if a bundle URL is safe to use.
Expand Down Expand Up @@ -191,6 +252,7 @@ def parse_config(self) -> None:
bundle_config_list = _parse_bundle_config(config_list)
if conf.getboolean("core", "LOAD_EXAMPLES"):
_add_example_dag_bundle(bundle_config_list)
_add_provider_example_dags_to_bundle(bundle_config_list)

for bundle_config in bundle_config_list:
if bundle_config.team_name and not conf.getboolean("core", "multi_team"):
Expand All @@ -210,6 +272,15 @@ def parse_config(self) -> None:

@provide_session
def sync_bundles_to_db(self, *, session: Session = NEW_SESSION) -> None:
"""
Persist the configured DAG bundles into ``DagBundleModel`` rows.

This only reconciles bundle metadata, not the DAGs contained in them.
Parsing each bundle's DAG files and writing the resulting
``DagModel`` / ``SerializedDagModel`` rows is the responsibility of
``DagBag`` plus ``sync_bag_to_db`` (or, in production, the DAG
processor); calling this method does not trigger that work.
"""
self.log.debug("Syncing DAG bundles to the database")

def _extract_and_sign_template(bundle_name: str) -> tuple[str | None, dict]:
Expand Down
26 changes: 0 additions & 26 deletions airflow-core/src/airflow/dag_processing/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,6 @@ class DagBag(LoggingMixin):
that one system can run multiple, independent settings sets.

:param dag_folder: the folder to scan to find DAGs
:param include_examples: whether to include the examples that ship
with airflow or not
:param safe_mode: when ``False``, scans all python modules for dags.
When ``True`` uses heuristics (files containing ``DAG`` and ``airflow`` strings)
to filter python modules to scan for dags.
Expand All @@ -187,7 +185,6 @@ class DagBag(LoggingMixin):
def __init__(
self,
dag_folder: str | Path | None = None, # todo AIP-66: rename this to path
include_examples: bool | ArgNotSet = NOTSET,
safe_mode: bool | ArgNotSet = NOTSET,
load_op_links: bool = True,
collect_dags: bool = True,
Expand Down Expand Up @@ -218,11 +215,6 @@ def __init__(
if collect_dags:
self.collect_dags(
dag_folder=dag_folder,
include_examples=(
include_examples
if is_arg_set(include_examples)
else conf.getboolean("core", "LOAD_EXAMPLES")
),
safe_mode=(
safe_mode if is_arg_set(safe_mode) else conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE")
),
Expand Down Expand Up @@ -451,7 +443,6 @@ def collect_dags(
self,
dag_folder: str | Path | None = None,
only_if_updated: bool = True,
include_examples: bool = conf.getboolean("core", "LOAD_EXAMPLES"),
safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE"),
):
"""
Expand All @@ -477,13 +468,6 @@ def collect_dags(
registry = get_importer_registry()
files_to_parse = registry.list_dag_files(dag_folder, safe_mode=safe_mode)

if include_examples:
from airflow import example_dags

example_dag_folder = next(iter(example_dags.__path__))

files_to_parse.extend(registry.list_dag_files(example_dag_folder, safe_mode=safe_mode))

for filepath in files_to_parse:
Comment thread
andreahlert marked this conversation as resolved.
try:
file_parse_start_dttm = timezone.utcnow()
Expand Down Expand Up @@ -554,17 +538,7 @@ def __init__(self, *args, bundle_path: Path | None = None, **kwargs):
if str(bundle_path) not in sys.path:
sys.path.append(str(bundle_path))

# Warn if user explicitly set include_examples=True, since bundles never contain examples
if kwargs.get("include_examples") is True:
warnings.warn(
"include_examples=True is ignored for BundleDagBag. "
"Bundles do not contain example DAGs, so include_examples is always False.",
UserWarning,
stacklevel=2,
)

kwargs["bundle_path"] = bundle_path
kwargs["include_examples"] = False
super().__init__(*args, **kwargs)


Expand Down
1 change: 0 additions & 1 deletion airflow-core/src/airflow/example_dags/standard

This file was deleted.

2 changes: 1 addition & 1 deletion airflow-core/tests/integration/otel/test_otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ def setup_class(cls):
def serialize_and_get_dags(cls) -> dict[str, SerializedDAG]:
log.info("Serializing Dags from directory %s", cls.dag_folder)
# Load DAGs from the dag directory.
dag_bag = DagBag(dag_folder=cls.dag_folder, include_examples=False)
dag_bag = DagBag(dag_folder=cls.dag_folder)

dag_ids = dag_bag.dag_ids
assert len(dag_ids) == 1
Expand Down
3 changes: 0 additions & 3 deletions airflow-core/tests/unit/always/test_example_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ def patch_get_dagbag_import_timeout():
def test_should_be_importable(example: str, patch_get_dagbag_import_timeout):
dagbag = DagBag(
dag_folder=example,
include_examples=False,
)
if len(dagbag.import_errors) == 1 and "AirflowOptionalProviderFeatureException" in str(
dagbag.import_errors
Expand All @@ -231,7 +230,6 @@ def test_should_not_do_database_queries(example: str, patch_get_dagbag_import_ti
with assert_queries_count(1, stacklevel_from_module=example.rsplit(os.sep, 1)[-1]):
DagBag(
dag_folder=example,
include_examples=False,
)


Expand All @@ -243,7 +241,6 @@ def test_should_not_run_hook_connections(example: str, patch_get_dagbag_import_t
mock_get_connection.return_value = Connection()
DagBag(
dag_folder=example,
include_examples=False,
)
assert mock_get_connection.call_count == 0, (
f"BaseHook.get_connection() should not be called during DAG parsing. "
Expand Down
3 changes: 2 additions & 1 deletion airflow-core/tests/unit/api_fastapi/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ def make_dag_with_multiple_versions(dag_maker, configure_git_connection_for_dag_
def dagbag():
from airflow.models.dagbag import DBDagBag

parse_and_sync_to_db(os.devnull, include_examples=True)
with conf_vars({("core", "load_examples"): "True"}):
parse_and_sync_to_db(os.devnull)
return DBDagBag()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def make_dags():
with DAG(DAG3_ID, schedule=None) as dag3: # DAG start_date set to None
EmptyOperator(task_id=TASK_ID, start_date=datetime(2019, 6, 12))

dag_bag = DagBag(os.devnull, include_examples=False)
dag_bag = DagBag(os.devnull)
dag_bag.dags = {dag.dag_id: dag, dag2.dag_id: dag2, dag3.dag_id: dag3}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def test_201_and_400_requests(self, url_safe_serializer, session, test_client):
assert response.status_code == 201
parsing_requests = session.scalars(select(DagPriorityParsingRequest)).all()
assert len(parsing_requests) == 1
assert parsing_requests[0].bundle_name == "dags-folder"
assert parsing_requests[0].bundle_name == "example_dags"
assert parsing_requests[0].relative_fileloc == test_dag.relative_fileloc
_check_last_log(session, dag_id=None, event="reparse_dag_file", logical_date=None)

Expand All @@ -65,7 +65,7 @@ def test_201_and_400_requests(self, url_safe_serializer, session, test_client):
assert response.status_code == 409
parsing_requests = session.scalars(select(DagPriorityParsingRequest)).all()
assert len(parsing_requests) == 1
assert parsing_requests[0].bundle_name == "dags-folder"
assert parsing_requests[0].bundle_name == "example_dags"
assert parsing_requests[0].relative_fileloc == test_dag.relative_fileloc
_check_last_log(session, dag_id=None, event="reparse_dag_file", logical_date=None)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@

@pytest.fixture
def real_dag_bag():
return parse_and_sync_to_db(EXAMPLE_DAG_FILE, include_examples=False)
return parse_and_sync_to_db(EXAMPLE_DAG_FILE)


@pytest.fixture
Expand Down
Loading
Loading