Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
36a9905
chore: move standard examples to provider
bhavaniravi Oct 26, 2025
a1d8210
fix: failing testcases to load examples form dagbundle
bhavaniravi Oct 27, 2025
689f502
Resolve provider example DAGs via ProvidersManager
andreahlert Apr 30, 2026
4586cc5
Tests: relax test_get_all_bundle_names assertion
andreahlert Apr 30, 2026
db1057e
Address review feedback on example DAG loading
andreahlert Apr 30, 2026
88fe049
Tests: use SQLAlchemy 2.0 delete() in trigger_dagrun teardown
andreahlert Apr 30, 2026
06f05b5
Tests: update bundle_name for example_python_operator
andreahlert May 1, 2026
457e7cd
Tests: avoid double-loading example DAGs in parse_and_sync_to_db
andreahlert May 1, 2026
84380f0
Trigger CI rerun
andreahlert May 1, 2026
aea523b
Update devel-common/src/tests_common/test_utils/db.py
andreahlert May 2, 2026
4cf8023
Update airflow-core/src/airflow/dag_processing/bundles/manager.py
andreahlert May 2, 2026
3c57649
Address review feedback on provider example DAG bundle discovery
andreahlert May 3, 2026
0b18521
Remove include_examples parameter from DagBag and test helpers
andreahlert May 4, 2026
4909626
Fix mypy and test assertion after DagBag include_examples removal
andreahlert May 4, 2026
38b06a5
Merge remote-tracking branch 'upstream/main' into pr-66161
andreahlert May 4, 2026
9ef9809
Drop include_examples from new DagBag callers and gate dag_maker for …
andreahlert May 4, 2026
f7c43d6
Gate DagBag include_examples on Airflow 3.3+ for older compat runs
andreahlert May 5, 2026
30607c9
Trim 66161 newsfragment to user-visible facts
andreahlert May 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions airflow-core/src/airflow/dag_processing/bundles/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations

import os
import warnings
from typing import TYPE_CHECKING

Expand Down Expand Up @@ -106,6 +107,58 @@ def _add_example_dag_bundle(bundle_config_list: list[_ExternalBundleConfig]):
)


def _add_provider_example_dags_to_bundle(bundle_config_list: list[_ExternalBundleConfig]):
"""
Add an ``example_dags`` folder of every installed provider as a bundle.

Provider locations are resolved through ``ProvidersManager`` instead of
walking ``airflow.providers.__path__`` so that:

- nested providers (e.g. ``apache-airflow-providers-common-sql`` whose
module path is ``airflow.providers.common.sql``) are discovered;
- providers installed outside the ``airflow.providers`` namespace package
are discovered via their entry point.
"""
import importlib
Comment thread
andreahlert marked this conversation as resolved.
Outdated
import logging

from airflow.providers_manager import ProvidersManager

log = logging.getLogger(__name__)
seen: set[str] = set()
Comment thread
andreahlert marked this conversation as resolved.

for package_name in ProvidersManager().providers:
# apache-airflow-providers-foo-bar -> airflow.providers.foo.bar
if not package_name.startswith("apache-airflow-providers-"):
module_name = package_name.replace("-", "_")
else:
suffix = package_name[len("apache-airflow-providers-") :]
module_name = "airflow.providers." + suffix.replace("-", ".")
Comment thread
andreahlert marked this conversation as resolved.
Outdated
Comment thread
andreahlert marked this conversation as resolved.
try:
module = importlib.import_module(module_name)
except ImportError:
Comment thread
andreahlert marked this conversation as resolved.
Outdated
log.warning("Could not import provider module %s for example DAG discovery", module_name)
continue

for module_path in getattr(module, "__path__", []):
example_dag_folder = os.path.join(module_path, "example_dags")
if not os.path.isdir(example_dag_folder):
continue
bundle_name = f"airflow-provider-{package_name}-example-dags"
Comment thread
andreahlert marked this conversation as resolved.
Outdated
if bundle_name in seen:
continue
seen.add(bundle_name)
bundle_config_list.append(
_ExternalBundleConfig(
name=bundle_name,
classpath="airflow.dag_processing.bundles.local.LocalDagBundle",
kwargs={
"path": example_dag_folder,
},
)
)


def _is_safe_bundle_url(url: str) -> bool:
"""
Check if a bundle URL is safe to use.
Expand Down Expand Up @@ -191,6 +244,7 @@ def parse_config(self) -> None:
bundle_config_list = _parse_bundle_config(config_list)
if conf.getboolean("core", "LOAD_EXAMPLES"):
_add_example_dag_bundle(bundle_config_list)
_add_provider_example_dags_to_bundle(bundle_config_list)

for bundle_config in bundle_config_list:
if bundle_config.team_name and not conf.getboolean("core", "multi_team"):
Expand All @@ -210,6 +264,15 @@ def parse_config(self) -> None:

@provide_session
def sync_bundles_to_db(self, *, session: Session = NEW_SESSION) -> None:
"""
Persist the configured DAG bundles into ``DagBundleModel`` rows.

This only reconciles bundle metadata, not the DAGs contained in them.
Parsing each bundle's DAG files and writing the resulting
``DagModel`` / ``SerializedDagModel`` rows is the responsibility of
``DagBag`` plus ``sync_bag_to_db`` (or, in production, the DAG
processor); calling this method does not trigger that work.
"""
self.log.debug("Syncing DAG bundles to the database")

def _extract_and_sign_template(bundle_name: str) -> tuple[str | None, dict]:
Expand Down
7 changes: 0 additions & 7 deletions airflow-core/src/airflow/dag_processing/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,13 +477,6 @@ def collect_dags(
registry = get_importer_registry()
files_to_parse = registry.list_dag_files(dag_folder, safe_mode=safe_mode)

if include_examples:
from airflow import example_dags

example_dag_folder = next(iter(example_dags.__path__))

files_to_parse.extend(registry.list_dag_files(example_dag_folder, safe_mode=safe_mode))

for filepath in files_to_parse:
Comment thread
andreahlert marked this conversation as resolved.
try:
file_parse_start_dttm = timezone.utcnow()
Expand Down
1 change: 0 additions & 1 deletion airflow-core/src/airflow/example_dags/standard

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def test_201_and_400_requests(self, url_safe_serializer, session, test_client):
assert response.status_code == 201
parsing_requests = session.scalars(select(DagPriorityParsingRequest)).all()
assert len(parsing_requests) == 1
assert parsing_requests[0].bundle_name == "dags-folder"
assert parsing_requests[0].bundle_name == "example_dags"
assert parsing_requests[0].relative_fileloc == test_dag.relative_fileloc
_check_last_log(session, dag_id=None, event="reparse_dag_file", logical_date=None)

Expand All @@ -65,7 +65,7 @@ def test_201_and_400_requests(self, url_safe_serializer, session, test_client):
assert response.status_code == 409
parsing_requests = session.scalars(select(DagPriorityParsingRequest)).all()
assert len(parsing_requests) == 1
assert parsing_requests[0].bundle_name == "dags-folder"
assert parsing_requests[0].bundle_name == "example_dags"
assert parsing_requests[0].relative_fileloc == test_dag.relative_fileloc
_check_last_log(session, dag_id=None, event="reparse_dag_file", logical_date=None)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def test_should_respond_200(self, test_client, session):
assert response_data == {
"dag_id": "example_python_operator",
"dag_version": {
"bundle_name": "dags-folder",
"bundle_name": "airflow-provider-apache-airflow-providers-standard-example-dags",
"bundle_url": None,
"bundle_version": None,
"created_at": response_data["dag_version"]["created_at"],
Expand Down Expand Up @@ -356,7 +356,7 @@ def test_should_respond_200_with_task_state_in_deferred(self, test_client, sessi
assert response_data == {
"dag_id": "example_python_operator",
"dag_version": {
"bundle_name": "dags-folder",
"bundle_name": "airflow-provider-apache-airflow-providers-standard-example-dags",
"bundle_url": None,
"bundle_version": None,
"created_at": response_data["dag_version"]["created_at"],
Expand Down Expand Up @@ -420,7 +420,7 @@ def test_should_respond_200_with_task_state_in_removed(self, test_client, sessio
assert response_data == {
"dag_id": "example_python_operator",
"dag_version": {
"bundle_name": "dags-folder",
"bundle_name": "airflow-provider-apache-airflow-providers-standard-example-dags",
"bundle_url": None,
"bundle_version": None,
"created_at": response_data["dag_version"]["created_at"],
Expand Down Expand Up @@ -476,7 +476,7 @@ def test_should_respond_200_task_instance_with_rendered(self, test_client, sessi
assert response.json() == {
"dag_id": "example_python_operator",
"dag_version": {
"bundle_name": "dags-folder",
"bundle_name": "airflow-provider-apache-airflow-providers-standard-example-dags",
"bundle_url": None,
"bundle_version": None,
"created_at": response_data["dag_version"]["created_at"],
Expand Down Expand Up @@ -596,7 +596,7 @@ def test_should_respond_200_mapped_task_instance_with_rtif(self, test_client, se
assert response_data == {
"dag_id": "example_python_operator",
"dag_version": {
"bundle_name": "dags-folder",
"bundle_name": "airflow-provider-apache-airflow-providers-standard-example-dags",
"bundle_url": None,
"bundle_version": None,
"created_at": response_data["dag_version"]["created_at"],
Expand Down Expand Up @@ -2469,7 +2469,7 @@ def test_should_respond_200(self, test_client, session):
"unixname": getuser(),
"dag_run_id": "TEST_DAG_RUN_ID",
"dag_version": {
"bundle_name": "dags-folder",
"bundle_name": "airflow-provider-apache-airflow-providers-standard-example-dags",
"bundle_url": None,
"bundle_version": None,
"created_at": response_data["dag_version"]["created_at"],
Expand Down Expand Up @@ -2515,7 +2515,7 @@ def test_should_respond_200_with_different_try_numbers(self, test_client, try_nu
"unixname": getuser(),
"dag_run_id": "TEST_DAG_RUN_ID",
"dag_version": {
"bundle_name": "dags-folder",
"bundle_name": "airflow-provider-apache-airflow-providers-standard-example-dags",
"bundle_url": None,
"bundle_version": None,
"created_at": response_data["dag_version"]["created_at"],
Expand Down Expand Up @@ -2592,7 +2592,7 @@ def test_should_respond_200_with_mapped_task_at_different_try_numbers(
"unixname": getuser(),
"dag_run_id": "TEST_DAG_RUN_ID",
"dag_version": {
"bundle_name": "dags-folder",
"bundle_name": "airflow-provider-apache-airflow-providers-standard-example-dags",
"bundle_url": None,
"bundle_version": None,
"created_at": response_data["dag_version"]["created_at"],
Expand Down Expand Up @@ -2664,7 +2664,7 @@ def test_should_respond_200_with_task_state_in_deferred(self, test_client, sessi
"unixname": getuser(),
"dag_run_id": "TEST_DAG_RUN_ID",
"dag_version": {
"bundle_name": "dags-folder",
"bundle_name": "airflow-provider-apache-airflow-providers-standard-example-dags",
"bundle_url": None,
"bundle_version": None,
"created_at": response_data["dag_version"]["created_at"],
Expand Down Expand Up @@ -2711,7 +2711,7 @@ def test_should_respond_200_with_task_state_in_removed(self, test_client, sessio
"unixname": getuser(),
"dag_run_id": "TEST_DAG_RUN_ID",
"dag_version": {
"bundle_name": "dags-folder",
"bundle_name": "airflow-provider-apache-airflow-providers-standard-example-dags",
"bundle_url": None,
"bundle_version": None,
"created_at": response_data["dag_version"]["created_at"],
Expand Down Expand Up @@ -3517,7 +3517,7 @@ def test_should_respond_200_with_dag_run_id(
"dag_id": "example_python_operator",
"dag_display_name": "example_python_operator",
"dag_version": {
"bundle_name": "dags-folder",
"bundle_name": "airflow-provider-apache-airflow-providers-standard-example-dags",
"bundle_url": None,
"bundle_version": None,
"created_at": response_data["task_instances"][0]["dag_version"]["created_at"],
Expand Down Expand Up @@ -4009,7 +4009,7 @@ def test_should_respond_200(self, test_client, session):
"unixname": getuser(),
"dag_run_id": "TEST_DAG_RUN_ID",
"dag_version": {
"bundle_name": "dags-folder",
"bundle_name": "airflow-provider-apache-airflow-providers-standard-example-dags",
"bundle_url": None,
"bundle_version": None,
"created_at": response_data["task_instances"][0]["dag_version"]["created_at"],
Expand Down Expand Up @@ -4046,7 +4046,7 @@ def test_should_respond_200(self, test_client, session):
"unixname": getuser(),
"dag_run_id": "TEST_DAG_RUN_ID",
"dag_version": {
"bundle_name": "dags-folder",
"bundle_name": "airflow-provider-apache-airflow-providers-standard-example-dags",
"bundle_url": None,
"bundle_version": None,
"created_at": response_data["task_instances"][1]["dag_version"]["created_at"],
Expand Down Expand Up @@ -4117,7 +4117,7 @@ def test_ti_in_retry_state_not_returned(self, test_client, session):
"unixname": getuser(),
"dag_run_id": "TEST_DAG_RUN_ID",
"dag_version": {
"bundle_name": "dags-folder",
"bundle_name": "airflow-provider-apache-airflow-providers-standard-example-dags",
"bundle_url": None,
"bundle_version": None,
"created_at": response_data["task_instances"][0]["dag_version"]["created_at"],
Expand Down Expand Up @@ -4200,7 +4200,7 @@ def test_mapped_task_should_respond_200(self, test_client, session):
"unixname": getuser(),
"dag_run_id": "TEST_DAG_RUN_ID",
"dag_version": {
"bundle_name": "dags-folder",
"bundle_name": "airflow-provider-apache-airflow-providers-standard-example-dags",
"bundle_url": None,
"bundle_version": None,
"created_at": response_data["task_instances"][0]["dag_version"]["created_at"],
Expand Down Expand Up @@ -4237,7 +4237,7 @@ def test_mapped_task_should_respond_200(self, test_client, session):
"unixname": getuser(),
"dag_run_id": "TEST_DAG_RUN_ID",
"dag_version": {
"bundle_name": "dags-folder",
"bundle_name": "airflow-provider-apache-airflow-providers-standard-example-dags",
"bundle_url": None,
"bundle_version": None,
"created_at": response_data["task_instances"][1]["dag_version"]["created_at"],
Expand Down Expand Up @@ -4439,7 +4439,7 @@ def test_should_call_mocked_api(self, mock_set_ti_state, test_client, session):
"dag_id": self.DAG_ID,
"dag_display_name": self.DAG_DISPLAY_NAME,
"dag_version": {
"bundle_name": "dags-folder",
"bundle_name": "airflow-provider-apache-airflow-providers-standard-example-dags",
"bundle_url": None,
"bundle_version": None,
"created_at": response_data["task_instances"][0]["dag_version"]["created_at"],
Expand Down Expand Up @@ -4715,7 +4715,7 @@ def test_should_raise_422_for_invalid_task_instance_state(self, payload, expecte
"dag_id": "example_python_operator",
"dag_display_name": "example_python_operator",
"dag_version": {
"bundle_name": "dags-folder",
"bundle_name": "airflow-provider-apache-airflow-providers-standard-example-dags",
"bundle_url": None,
"bundle_version": None,
"created_at": mock.ANY,
Expand Down Expand Up @@ -4853,7 +4853,7 @@ def test_update_mask_set_note_should_respond_200(
"dag_id": self.DAG_ID,
"dag_display_name": self.DAG_DISPLAY_NAME,
"dag_version": {
"bundle_name": "dags-folder",
"bundle_name": "airflow-provider-apache-airflow-providers-standard-example-dags",
"bundle_url": None,
"bundle_version": None,
"created_at": response_data["task_instances"][0]["dag_version"]["created_at"],
Expand Down Expand Up @@ -4916,7 +4916,7 @@ def test_set_note_should_respond_200(self, test_client, session):
"dag_id": self.DAG_ID,
"dag_display_name": self.DAG_DISPLAY_NAME,
"dag_version": {
"bundle_name": "dags-folder",
"bundle_name": "airflow-provider-apache-airflow-providers-standard-example-dags",
"bundle_url": None,
"bundle_version": None,
"created_at": response_data["task_instances"][0]["dag_version"]["created_at"],
Expand Down Expand Up @@ -4997,7 +4997,7 @@ def test_set_note_should_respond_200_mapped_task_with_rtif(self, test_client, se
"dag_id": self.DAG_ID,
"dag_display_name": self.DAG_DISPLAY_NAME,
"dag_version": {
"bundle_name": "dags-folder",
"bundle_name": "airflow-provider-apache-airflow-providers-standard-example-dags",
"bundle_url": None,
"bundle_version": None,
"created_at": response_data["task_instances"][0]["dag_version"]["created_at"],
Expand Down Expand Up @@ -5080,7 +5080,7 @@ def test_set_note_should_respond_200_mapped_task_summary_with_rtif(self, test_cl
"dag_id": self.DAG_ID,
"dag_display_name": self.DAG_DISPLAY_NAME,
"dag_version": {
"bundle_name": "dags-folder",
"bundle_name": "airflow-provider-apache-airflow-providers-standard-example-dags",
"bundle_url": None,
"bundle_version": None,
"created_at": response_ti["dag_version"]["created_at"],
Expand Down Expand Up @@ -5196,7 +5196,7 @@ def test_should_call_mocked_api(self, mock_set_ti_state, test_client, session):
"dag_id": self.DAG_ID,
"dag_display_name": self.DAG_DISPLAY_NAME,
"dag_version": {
"bundle_name": "dags-folder",
"bundle_name": "airflow-provider-apache-airflow-providers-standard-example-dags",
"bundle_url": None,
"bundle_version": None,
"created_at": response_data["task_instances"][0]["dag_version"]["created_at"],
Expand Down Expand Up @@ -5484,7 +5484,7 @@ def test_should_raise_422_for_invalid_task_instance_state(self, payload, expecte
"dag_id": "example_python_operator",
"dag_display_name": "example_python_operator",
"dag_version": {
"bundle_name": "dags-folder",
"bundle_name": "airflow-provider-apache-airflow-providers-standard-example-dags",
"bundle_url": None,
"bundle_version": None,
"created_at": mock.ANY,
Expand Down
Loading
Loading