diff --git a/airflow-core/newsfragments/66161.significant.rst b/airflow-core/newsfragments/66161.significant.rst new file mode 100644 index 0000000000000..79b7313bb291d --- /dev/null +++ b/airflow-core/newsfragments/66161.significant.rst @@ -0,0 +1,12 @@ +Provider example DAGs are exposed as dedicated bundles + +Example DAGs shipped by provider distributions are now discovered via +``ProvidersManager`` and registered as their own DAG bundles, one per +provider, named ``apache-airflow-providers--example-dags`` +(or ``-example-dags`` for third-party providers). The +``[core] load_examples`` option still gates whether they are registered. + +REST API clients that filtered ``bundle_name`` by ``"dags-folder"`` for +provider-shipped example DAGs (e.g. ``example_python_operator``) must +update to the new per-provider bundle names. DAG identifiers are +unchanged. diff --git a/airflow-core/src/airflow/dag_processing/bundles/manager.py b/airflow-core/src/airflow/dag_processing/bundles/manager.py index f2f66cd7d2ad5..78c54266eda9f 100644 --- a/airflow-core/src/airflow/dag_processing/bundles/manager.py +++ b/airflow-core/src/airflow/dag_processing/bundles/manager.py @@ -16,6 +16,9 @@ # under the License. from __future__ import annotations +import importlib +import logging +import os import warnings from typing import TYPE_CHECKING @@ -29,6 +32,7 @@ from airflow.exceptions import AirflowConfigException from airflow.models.dagbundle import DagBundleModel from airflow.models.team import Team +from airflow.providers_manager import ProvidersManager from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.session import NEW_SESSION, provide_session @@ -37,6 +41,8 @@ from sqlalchemy.orm import Session +log = logging.getLogger(__name__) + _example_dag_bundle_name = "example_dags" @@ -106,6 +112,61 @@ def _add_example_dag_bundle(bundle_config_list: list[_ExternalBundleConfig]): ) +def _add_provider_example_dags_to_bundle(bundle_config_list: list[_ExternalBundleConfig]): + """ + Add an ``example_dags`` folder of every installed provider as a bundle. + + Provider locations are resolved through ``ProvidersManager`` instead of + walking ``airflow.providers.__path__`` so that: + + - nested providers (e.g. ``apache-airflow-providers-common-sql`` whose + module path is ``airflow.providers.common.sql``) are discovered; + - providers installed outside the ``airflow.providers`` namespace package + are discovered via their entry point. + """ + # Dedup on the resolved on-disk folder rather than the bundle name: distributions + # under ``airflow.providers.common.*`` use ``pkgutil.extend_path``, so when several + # ``common-*`` packages are installed ``airflow.providers.common.__path__`` has + # multiple entries and the inner loop iterates more than once. Path-based dedup + # only skips when the same folder is seen twice; distinct folders are preserved. + seen: set[str] = set() + + for package_name in ProvidersManager().providers: + # Heuristic: derive the import path from the canonical + # ``apache-airflow-providers-*`` distribution name. Tracked as a follow-up + # to record the provider module path on ``ProviderInfo`` (see + # https://github.com/apache/airflow/issues/66305). + if package_name.startswith("apache-airflow-providers-"): + suffix = package_name[len("apache-airflow-providers-") :] + module_name = "airflow.providers." + suffix.replace("-", ".") + else: + module_name = package_name.replace("-", "_") + try: + module = importlib.import_module(module_name) + module_paths = list(getattr(module, "__path__", [])) + except Exception: + log.exception("Could not load provider module %s for example DAG discovery", module_name) + continue + + for module_path in module_paths: + example_dag_folder = os.path.join(module_path, "example_dags") + if not os.path.isdir(example_dag_folder): + continue + if example_dag_folder in seen: + continue + seen.add(example_dag_folder) + bundle_name = f"{package_name}-example-dags" + bundle_config_list.append( + _ExternalBundleConfig( + name=bundle_name, + classpath="airflow.dag_processing.bundles.local.LocalDagBundle", + kwargs={ + "path": example_dag_folder, + }, + ) + ) + + def _is_safe_bundle_url(url: str) -> bool: """ Check if a bundle URL is safe to use. @@ -191,6 +252,7 @@ def parse_config(self) -> None: bundle_config_list = _parse_bundle_config(config_list) if conf.getboolean("core", "LOAD_EXAMPLES"): _add_example_dag_bundle(bundle_config_list) + _add_provider_example_dags_to_bundle(bundle_config_list) for bundle_config in bundle_config_list: if bundle_config.team_name and not conf.getboolean("core", "multi_team"): @@ -210,6 +272,15 @@ def parse_config(self) -> None: @provide_session def sync_bundles_to_db(self, *, session: Session = NEW_SESSION) -> None: + """ + Persist the configured DAG bundles into ``DagBundleModel`` rows. + + This only reconciles bundle metadata, not the DAGs contained in them. + Parsing each bundle's DAG files and writing the resulting + ``DagModel`` / ``SerializedDagModel`` rows is the responsibility of + ``DagBag`` plus ``sync_bag_to_db`` (or, in production, the DAG + processor); calling this method does not trigger that work. + """ self.log.debug("Syncing DAG bundles to the database") def _extract_and_sign_template(bundle_name: str) -> tuple[str | None, dict]: diff --git a/airflow-core/src/airflow/dag_processing/dagbag.py b/airflow-core/src/airflow/dag_processing/dagbag.py index 5062a47bc7ede..c59ae47b9f86c 100644 --- a/airflow-core/src/airflow/dag_processing/dagbag.py +++ b/airflow-core/src/airflow/dag_processing/dagbag.py @@ -172,8 +172,6 @@ class DagBag(LoggingMixin): that one system can run multiple, independent settings sets. :param dag_folder: the folder to scan to find DAGs - :param include_examples: whether to include the examples that ship - with airflow or not :param safe_mode: when ``False``, scans all python modules for dags. When ``True`` uses heuristics (files containing ``DAG`` and ``airflow`` strings) to filter python modules to scan for dags. @@ -187,7 +185,6 @@ class DagBag(LoggingMixin): def __init__( self, dag_folder: str | Path | None = None, # todo AIP-66: rename this to path - include_examples: bool | ArgNotSet = NOTSET, safe_mode: bool | ArgNotSet = NOTSET, load_op_links: bool = True, collect_dags: bool = True, @@ -218,11 +215,6 @@ def __init__( if collect_dags: self.collect_dags( dag_folder=dag_folder, - include_examples=( - include_examples - if is_arg_set(include_examples) - else conf.getboolean("core", "LOAD_EXAMPLES") - ), safe_mode=( safe_mode if is_arg_set(safe_mode) else conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE") ), @@ -451,7 +443,6 @@ def collect_dags( self, dag_folder: str | Path | None = None, only_if_updated: bool = True, - include_examples: bool = conf.getboolean("core", "LOAD_EXAMPLES"), safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE"), ): """ @@ -477,13 +468,6 @@ def collect_dags( registry = get_importer_registry() files_to_parse = registry.list_dag_files(dag_folder, safe_mode=safe_mode) - if include_examples: - from airflow import example_dags - - example_dag_folder = next(iter(example_dags.__path__)) - - files_to_parse.extend(registry.list_dag_files(example_dag_folder, safe_mode=safe_mode)) - for filepath in files_to_parse: try: file_parse_start_dttm = timezone.utcnow() @@ -554,17 +538,7 @@ def __init__(self, *args, bundle_path: Path | None = None, **kwargs): if str(bundle_path) not in sys.path: sys.path.append(str(bundle_path)) - # Warn if user explicitly set include_examples=True, since bundles never contain examples - if kwargs.get("include_examples") is True: - warnings.warn( - "include_examples=True is ignored for BundleDagBag. " - "Bundles do not contain example DAGs, so include_examples is always False.", - UserWarning, - stacklevel=2, - ) - kwargs["bundle_path"] = bundle_path - kwargs["include_examples"] = False super().__init__(*args, **kwargs) diff --git a/airflow-core/src/airflow/example_dags/standard b/airflow-core/src/airflow/example_dags/standard deleted file mode 120000 index 3c2ef23d52c55..0000000000000 --- a/airflow-core/src/airflow/example_dags/standard +++ /dev/null @@ -1 +0,0 @@ -../../../../providers/standard/src/airflow/providers/standard/example_dags \ No newline at end of file diff --git a/airflow-core/tests/integration/otel/test_otel.py b/airflow-core/tests/integration/otel/test_otel.py index 1b23fe0f7549a..5119738c2065f 100644 --- a/airflow-core/tests/integration/otel/test_otel.py +++ b/airflow-core/tests/integration/otel/test_otel.py @@ -252,7 +252,7 @@ def setup_class(cls): def serialize_and_get_dags(cls) -> dict[str, SerializedDAG]: log.info("Serializing Dags from directory %s", cls.dag_folder) # Load DAGs from the dag directory. - dag_bag = DagBag(dag_folder=cls.dag_folder, include_examples=False) + dag_bag = DagBag(dag_folder=cls.dag_folder) dag_ids = dag_bag.dag_ids assert len(dag_ids) == 1 diff --git a/airflow-core/tests/unit/always/test_example_dags.py b/airflow-core/tests/unit/always/test_example_dags.py index 0b84f6ae26ce5..6ef8a306b5774 100644 --- a/airflow-core/tests/unit/always/test_example_dags.py +++ b/airflow-core/tests/unit/always/test_example_dags.py @@ -212,7 +212,6 @@ def patch_get_dagbag_import_timeout(): def test_should_be_importable(example: str, patch_get_dagbag_import_timeout): dagbag = DagBag( dag_folder=example, - include_examples=False, ) if len(dagbag.import_errors) == 1 and "AirflowOptionalProviderFeatureException" in str( dagbag.import_errors @@ -231,7 +230,6 @@ def test_should_not_do_database_queries(example: str, patch_get_dagbag_import_ti with assert_queries_count(1, stacklevel_from_module=example.rsplit(os.sep, 1)[-1]): DagBag( dag_folder=example, - include_examples=False, ) @@ -243,7 +241,6 @@ def test_should_not_run_hook_connections(example: str, patch_get_dagbag_import_t mock_get_connection.return_value = Connection() DagBag( dag_folder=example, - include_examples=False, ) assert mock_get_connection.call_count == 0, ( f"BaseHook.get_connection() should not be called during DAG parsing. " diff --git a/airflow-core/tests/unit/api_fastapi/conftest.py b/airflow-core/tests/unit/api_fastapi/conftest.py index aace17f8a1479..03c43a178a090 100644 --- a/airflow-core/tests/unit/api_fastapi/conftest.py +++ b/airflow-core/tests/unit/api_fastapi/conftest.py @@ -187,7 +187,8 @@ def make_dag_with_multiple_versions(dag_maker, configure_git_connection_for_dag_ def dagbag(): from airflow.models.dagbag import DBDagBag - parse_and_sync_to_db(os.devnull, include_examples=True) + with conf_vars({("core", "load_examples"): "True"}): + parse_and_sync_to_db(os.devnull) return DBDagBag() diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py index 80495fc9a8d70..60882ff8a3900 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py @@ -86,7 +86,7 @@ def make_dags(): with DAG(DAG3_ID, schedule=None) as dag3: # DAG start_date set to None EmptyOperator(task_id=TASK_ID, start_date=datetime(2019, 6, 12)) - dag_bag = DagBag(os.devnull, include_examples=False) + dag_bag = DagBag(os.devnull) dag_bag.dags = {dag.dag_id: dag, dag2.dag_id: dag2, dag3.dag_id: dag3} diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_parsing.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_parsing.py index d2abb5e672a5a..763e5e88669e2 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_parsing.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_parsing.py @@ -56,7 +56,7 @@ def test_201_and_400_requests(self, url_safe_serializer, session, test_client): assert response.status_code == 201 parsing_requests = session.scalars(select(DagPriorityParsingRequest)).all() assert len(parsing_requests) == 1 - assert parsing_requests[0].bundle_name == "dags-folder" + assert parsing_requests[0].bundle_name == "example_dags" assert parsing_requests[0].relative_fileloc == test_dag.relative_fileloc _check_last_log(session, dag_id=None, event="reparse_dag_file", logical_date=None) @@ -65,7 +65,7 @@ def test_201_and_400_requests(self, url_safe_serializer, session, test_client): assert response.status_code == 409 parsing_requests = session.scalars(select(DagPriorityParsingRequest)).all() assert len(parsing_requests) == 1 - assert parsing_requests[0].bundle_name == "dags-folder" + assert parsing_requests[0].bundle_name == "example_dags" assert parsing_requests[0].relative_fileloc == test_dag.relative_fileloc _check_last_log(session, dag_id=None, event="reparse_dag_file", logical_date=None) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_sources.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_sources.py index 59970da27ea6b..fe9d523c86ad8 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_sources.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_sources.py @@ -47,7 +47,7 @@ @pytest.fixture def real_dag_bag(): - return parse_and_sync_to_db(EXAMPLE_DAG_FILE, include_examples=False) + return parse_and_sync_to_db(EXAMPLE_DAG_FILE) @pytest.fixture diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index 505a390f7bf33..d6e924950c772 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -198,7 +198,7 @@ def test_should_respond_200(self, test_client, session): assert response_data == { "dag_id": "example_python_operator", "dag_version": { - "bundle_name": "dags-folder", + "bundle_name": "apache-airflow-providers-standard-example-dags", "bundle_url": None, "bundle_version": None, "created_at": response_data["dag_version"]["created_at"], @@ -356,7 +356,7 @@ def test_should_respond_200_with_task_state_in_deferred(self, test_client, sessi assert response_data == { "dag_id": "example_python_operator", "dag_version": { - "bundle_name": "dags-folder", + "bundle_name": "apache-airflow-providers-standard-example-dags", "bundle_url": None, "bundle_version": None, "created_at": response_data["dag_version"]["created_at"], @@ -420,7 +420,7 @@ def test_should_respond_200_with_task_state_in_removed(self, test_client, sessio assert response_data == { "dag_id": "example_python_operator", "dag_version": { - "bundle_name": "dags-folder", + "bundle_name": "apache-airflow-providers-standard-example-dags", "bundle_url": None, "bundle_version": None, "created_at": response_data["dag_version"]["created_at"], @@ -476,7 +476,7 @@ def test_should_respond_200_task_instance_with_rendered(self, test_client, sessi assert response.json() == { "dag_id": "example_python_operator", "dag_version": { - "bundle_name": "dags-folder", + "bundle_name": "apache-airflow-providers-standard-example-dags", "bundle_url": None, "bundle_version": None, "created_at": response_data["dag_version"]["created_at"], @@ -596,7 +596,7 @@ def test_should_respond_200_mapped_task_instance_with_rtif(self, test_client, se assert response_data == { "dag_id": "example_python_operator", "dag_version": { - "bundle_name": "dags-folder", + "bundle_name": "apache-airflow-providers-standard-example-dags", "bundle_url": None, "bundle_version": None, "created_at": response_data["dag_version"]["created_at"], @@ -733,7 +733,7 @@ def create_dag_runs_with_mapped_tasks(self, dag_maker, session, dags=None): session.add(ti) DagBundlesManager().sync_bundles_to_db() - dagbag = DagBag(os.devnull, include_examples=False) + dagbag = DagBag(os.devnull) dagbag.dags = {dag_id: dag_maker.dag} sync_bag_to_db(dagbag, "dags-folder", None) session.flush() @@ -2469,7 +2469,7 @@ def test_should_respond_200(self, test_client, session): "unixname": getuser(), "dag_run_id": "TEST_DAG_RUN_ID", "dag_version": { - "bundle_name": "dags-folder", + "bundle_name": "apache-airflow-providers-standard-example-dags", "bundle_url": None, "bundle_version": None, "created_at": response_data["dag_version"]["created_at"], @@ -2515,7 +2515,7 @@ def test_should_respond_200_with_different_try_numbers(self, test_client, try_nu "unixname": getuser(), "dag_run_id": "TEST_DAG_RUN_ID", "dag_version": { - "bundle_name": "dags-folder", + "bundle_name": "apache-airflow-providers-standard-example-dags", "bundle_url": None, "bundle_version": None, "created_at": response_data["dag_version"]["created_at"], @@ -2592,7 +2592,7 @@ def test_should_respond_200_with_mapped_task_at_different_try_numbers( "unixname": getuser(), "dag_run_id": "TEST_DAG_RUN_ID", "dag_version": { - "bundle_name": "dags-folder", + "bundle_name": "apache-airflow-providers-standard-example-dags", "bundle_url": None, "bundle_version": None, "created_at": response_data["dag_version"]["created_at"], @@ -2664,7 +2664,7 @@ def test_should_respond_200_with_task_state_in_deferred(self, test_client, sessi "unixname": getuser(), "dag_run_id": "TEST_DAG_RUN_ID", "dag_version": { - "bundle_name": "dags-folder", + "bundle_name": "apache-airflow-providers-standard-example-dags", "bundle_url": None, "bundle_version": None, "created_at": response_data["dag_version"]["created_at"], @@ -2711,7 +2711,7 @@ def test_should_respond_200_with_task_state_in_removed(self, test_client, sessio "unixname": getuser(), "dag_run_id": "TEST_DAG_RUN_ID", "dag_version": { - "bundle_name": "dags-folder", + "bundle_name": "apache-airflow-providers-standard-example-dags", "bundle_url": None, "bundle_version": None, "created_at": response_data["dag_version"]["created_at"], @@ -3517,7 +3517,7 @@ def test_should_respond_200_with_dag_run_id( "dag_id": "example_python_operator", "dag_display_name": "example_python_operator", "dag_version": { - "bundle_name": "dags-folder", + "bundle_name": "apache-airflow-providers-standard-example-dags", "bundle_url": None, "bundle_version": None, "created_at": response_data["task_instances"][0]["dag_version"]["created_at"], @@ -4009,7 +4009,7 @@ def test_should_respond_200(self, test_client, session): "unixname": getuser(), "dag_run_id": "TEST_DAG_RUN_ID", "dag_version": { - "bundle_name": "dags-folder", + "bundle_name": "apache-airflow-providers-standard-example-dags", "bundle_url": None, "bundle_version": None, "created_at": response_data["task_instances"][0]["dag_version"]["created_at"], @@ -4046,7 +4046,7 @@ def test_should_respond_200(self, test_client, session): "unixname": getuser(), "dag_run_id": "TEST_DAG_RUN_ID", "dag_version": { - "bundle_name": "dags-folder", + "bundle_name": "apache-airflow-providers-standard-example-dags", "bundle_url": None, "bundle_version": None, "created_at": response_data["task_instances"][1]["dag_version"]["created_at"], @@ -4117,7 +4117,7 @@ def test_ti_in_retry_state_not_returned(self, test_client, session): "unixname": getuser(), "dag_run_id": "TEST_DAG_RUN_ID", "dag_version": { - "bundle_name": "dags-folder", + "bundle_name": "apache-airflow-providers-standard-example-dags", "bundle_url": None, "bundle_version": None, "created_at": response_data["task_instances"][0]["dag_version"]["created_at"], @@ -4200,7 +4200,7 @@ def test_mapped_task_should_respond_200(self, test_client, session): "unixname": getuser(), "dag_run_id": "TEST_DAG_RUN_ID", "dag_version": { - "bundle_name": "dags-folder", + "bundle_name": "apache-airflow-providers-standard-example-dags", "bundle_url": None, "bundle_version": None, "created_at": response_data["task_instances"][0]["dag_version"]["created_at"], @@ -4237,7 +4237,7 @@ def test_mapped_task_should_respond_200(self, test_client, session): "unixname": getuser(), "dag_run_id": "TEST_DAG_RUN_ID", "dag_version": { - "bundle_name": "dags-folder", + "bundle_name": "apache-airflow-providers-standard-example-dags", "bundle_url": None, "bundle_version": None, "created_at": response_data["task_instances"][1]["dag_version"]["created_at"], @@ -4439,7 +4439,7 @@ def test_should_call_mocked_api(self, mock_set_ti_state, test_client, session): "dag_id": self.DAG_ID, "dag_display_name": self.DAG_DISPLAY_NAME, "dag_version": { - "bundle_name": "dags-folder", + "bundle_name": "apache-airflow-providers-standard-example-dags", "bundle_url": None, "bundle_version": None, "created_at": response_data["task_instances"][0]["dag_version"]["created_at"], @@ -4715,7 +4715,7 @@ def test_should_raise_422_for_invalid_task_instance_state(self, payload, expecte "dag_id": "example_python_operator", "dag_display_name": "example_python_operator", "dag_version": { - "bundle_name": "dags-folder", + "bundle_name": "apache-airflow-providers-standard-example-dags", "bundle_url": None, "bundle_version": None, "created_at": mock.ANY, @@ -4853,7 +4853,7 @@ def test_update_mask_set_note_should_respond_200( "dag_id": self.DAG_ID, "dag_display_name": self.DAG_DISPLAY_NAME, "dag_version": { - "bundle_name": "dags-folder", + "bundle_name": "apache-airflow-providers-standard-example-dags", "bundle_url": None, "bundle_version": None, "created_at": response_data["task_instances"][0]["dag_version"]["created_at"], @@ -4916,7 +4916,7 @@ def test_set_note_should_respond_200(self, test_client, session): "dag_id": self.DAG_ID, "dag_display_name": self.DAG_DISPLAY_NAME, "dag_version": { - "bundle_name": "dags-folder", + "bundle_name": "apache-airflow-providers-standard-example-dags", "bundle_url": None, "bundle_version": None, "created_at": response_data["task_instances"][0]["dag_version"]["created_at"], @@ -4997,7 +4997,7 @@ def test_set_note_should_respond_200_mapped_task_with_rtif(self, test_client, se "dag_id": self.DAG_ID, "dag_display_name": self.DAG_DISPLAY_NAME, "dag_version": { - "bundle_name": "dags-folder", + "bundle_name": "apache-airflow-providers-standard-example-dags", "bundle_url": None, "bundle_version": None, "created_at": response_data["task_instances"][0]["dag_version"]["created_at"], @@ -5080,7 +5080,7 @@ def test_set_note_should_respond_200_mapped_task_summary_with_rtif(self, test_cl "dag_id": self.DAG_ID, "dag_display_name": self.DAG_DISPLAY_NAME, "dag_version": { - "bundle_name": "dags-folder", + "bundle_name": "apache-airflow-providers-standard-example-dags", "bundle_url": None, "bundle_version": None, "created_at": response_ti["dag_version"]["created_at"], @@ -5196,7 +5196,7 @@ def test_should_call_mocked_api(self, mock_set_ti_state, test_client, session): "dag_id": self.DAG_ID, "dag_display_name": self.DAG_DISPLAY_NAME, "dag_version": { - "bundle_name": "dags-folder", + "bundle_name": "apache-airflow-providers-standard-example-dags", "bundle_url": None, "bundle_version": None, "created_at": response_data["task_instances"][0]["dag_version"]["created_at"], @@ -5484,7 +5484,7 @@ def test_should_raise_422_for_invalid_task_instance_state(self, payload, expecte "dag_id": "example_python_operator", "dag_display_name": "example_python_operator", "dag_version": { - "bundle_name": "dags-folder", + "bundle_name": "apache-airflow-providers-standard-example-dags", "bundle_url": None, "bundle_version": None, "created_at": mock.ANY, diff --git a/airflow-core/tests/unit/cli/commands/test_asset_command.py b/airflow-core/tests/unit/cli/commands/test_asset_command.py index 7b17f2a5cea52..6efd8293534f3 100644 --- a/airflow-core/tests/unit/cli/commands/test_asset_command.py +++ b/airflow-core/tests/unit/cli/commands/test_asset_command.py @@ -28,6 +28,7 @@ from airflow.cli import cli_parser from airflow.cli.commands import asset_command +from tests_common.test_utils.config import conf_vars from tests_common.test_utils.db import clear_db_dags, clear_db_runs, parse_and_sync_to_db if typing.TYPE_CHECKING: @@ -38,7 +39,8 @@ @pytest.fixture(scope="module", autouse=True) def prepare_examples(): - parse_and_sync_to_db(os.devnull, include_examples=True) + with conf_vars({("core", "load_examples"): "True"}): + parse_and_sync_to_db(os.devnull) yield clear_db_runs() clear_db_dags() diff --git a/airflow-core/tests/unit/cli/commands/test_backfill_command.py b/airflow-core/tests/unit/cli/commands/test_backfill_command.py index 79d133f84c88a..8f956d331d2f5 100644 --- a/airflow-core/tests/unit/cli/commands/test_backfill_command.py +++ b/airflow-core/tests/unit/cli/commands/test_backfill_command.py @@ -30,6 +30,7 @@ from airflow.cli import cli_parser from airflow.models.backfill import ReprocessBehavior +from tests_common.test_utils.config import conf_vars from tests_common.test_utils.db import clear_db_backfills, clear_db_dags, clear_db_runs, parse_and_sync_to_db DEFAULT_DATE = timezone.make_aware(datetime(2015, 1, 1), timezone=timezone.utc) @@ -48,7 +49,8 @@ class TestCliBackfill: @classmethod def setup_class(cls): - parse_and_sync_to_db(os.devnull, include_examples=True) + with conf_vars({("core", "load_examples"): "True"}): + parse_and_sync_to_db(os.devnull) cls.parser = cli_parser.get_parser() @classmethod diff --git a/airflow-core/tests/unit/cli/commands/test_dag_command.py b/airflow-core/tests/unit/cli/commands/test_dag_command.py index 0ec4bb931aa53..c2c3dae4b51f3 100644 --- a/airflow-core/tests/unit/cli/commands/test_dag_command.py +++ b/airflow-core/tests/unit/cli/commands/test_dag_command.py @@ -81,7 +81,8 @@ class TestCliDags: @classmethod def setup_class(cls): - parse_and_sync_to_db(os.devnull, include_examples=True) + with conf_vars({("core", "load_examples"): "True"}): + parse_and_sync_to_db(os.devnull) cls.parser = cli_parser.get_parser() @classmethod @@ -252,7 +253,7 @@ def test_next_execution(self, dag_id, delta, schedule, catchup, first, second, t print(file_content) with time_machine.travel(DEFAULT_DATE): clear_db_dags() - parse_and_sync_to_db(tmp_path, include_examples=False) + parse_and_sync_to_db(tmp_path) # Test num-executions = 1 (default) args = self.parser.parse_args(["dags", "next-execution", dag_id]) @@ -270,9 +271,8 @@ def test_next_execution(self, dag_id, delta, schedule, catchup, first, second, t # Rebuild Test DB for other tests clear_db_dags() - parse_and_sync_to_db(os.devnull, include_examples=True) + self.setup_class() - @conf_vars({("core", "load_examples"): "true"}) def test_cli_report(self, stdout_capture): args = self.parser.parse_args(["dags", "report", "--output", "json"]) with stdout_capture as temp_stdout: @@ -283,7 +283,6 @@ def test_cli_report(self, stdout_capture): assert any(item["file"].endswith("example_complex.py") for item in data) assert any("example_complex" in item["dags"] for item in data) - @conf_vars({("core", "load_examples"): "true"}) def test_cli_get_dag_details(self, stdout_capture): args = self.parser.parse_args(["dags", "details", "example_complex", "--output", "yaml"]) with stdout_capture as temp_stdout: @@ -300,7 +299,6 @@ def test_cli_get_dag_details(self, stdout_capture): for value in dag_details_values: assert value in out - @conf_vars({("core", "load_examples"): "true"}) def test_cli_list_dags(self, stdout_capture): args = self.parser.parse_args(["dags", "list", "--output", "json"]) with stdout_capture as temp_stdout: @@ -311,11 +309,12 @@ def test_cli_list_dags(self, stdout_capture): assert key in dag_list[0] assert any("airflow/example_dags/example_complex.py" in d["fileloc"] for d in dag_list) - @conf_vars({("core", "load_examples"): "true"}) def test_cli_list_local_dags(self, stdout_capture): # Clear the database clear_db_dags() - args = self.parser.parse_args(["dags", "list", "--output", "json", "--local"]) + args = self.parser.parse_args( + ["dags", "list", "--output", "json", "--local", "--bundle-name", "example_dags"] + ) with stdout_capture as temp_stdout: dag_command.dag_list_dags(args) out = temp_stdout.getvalue() @@ -324,7 +323,7 @@ def test_cli_list_local_dags(self, stdout_capture): assert key in dag_list[0] assert any("airflow/example_dags/example_complex.py" in d["fileloc"] for d in dag_list) # Rebuild Test DB for other tests - parse_and_sync_to_db(os.devnull, include_examples=True) + self.setup_class() @conf_vars({("core", "load_examples"): "false"}) def test_cli_list_local_dags_with_bundle_name(self, configure_testing_dag_bundle, stdout_capture): @@ -345,9 +344,8 @@ def test_cli_list_local_dags_with_bundle_name(self, configure_testing_dag_bundle str(TEST_DAGS_FOLDER / "test_example_bash_operator.py") in d["fileloc"] for d in dag_list ) # Rebuild Test DB for other tests - parse_and_sync_to_db(os.devnull, include_examples=True) + self.setup_class() - @conf_vars({("core", "load_examples"): "true"}) def test_cli_list_dags_custom_cols(self, stdout_capture): args = self.parser.parse_args( ["dags", "list", "--output", "json", "--columns", "dag_id,last_parsed_time"] @@ -361,7 +359,6 @@ def test_cli_list_dags_custom_cols(self, stdout_capture): for key in ["fileloc", "owners", "is_paused"]: assert key not in dag_list[0] - @conf_vars({("core", "load_examples"): "true"}) def test_cli_list_dags_invalid_cols(self, stderr_capture): args = self.parser.parse_args(["dags", "list", "--output", "json", "--columns", "dag_id,invalid_col"]) with stderr_capture as temp_stderr: @@ -405,9 +402,8 @@ def test_cli_list_dags_prints_local_import_errors( assert "Failed to load all files." in out # Rebuild Test DB for other tests - parse_and_sync_to_db(os.devnull, include_examples=True) + self.setup_class() - @conf_vars({("core", "load_examples"): "true"}) @mock.patch("airflow.models.DagModel.get_dagmodel") def test_list_dags_none_get_dagmodel(self, mock_get_dagmodel, stdout_capture): mock_get_dagmodel.return_value = None @@ -420,7 +416,6 @@ def test_list_dags_none_get_dagmodel(self, mock_get_dagmodel, stdout_capture): assert key in dag_list[0] assert any("airflow/example_dags/example_complex.py" in d["fileloc"] for d in dag_list) - @conf_vars({("core", "load_examples"): "true"}) def test_dagbag_dag_col(self, session): dagbag = DBDagBag() dag_details = dag_command._get_dagbag_dag_details( @@ -920,7 +915,7 @@ def test_dag_with_parsing_context( path_to_parse = TEST_DAGS_FOLDER / "test_dag_parsing_context.py" with configure_testing_dag_bundle(path_to_parse): - bag = DagBag(dag_folder=path_to_parse, include_examples=False) + bag = DagBag(dag_folder=path_to_parse) sync_bag_to_db(bag, "testing", None) cli_args = self.parser.parse_args( ["dags", "test", "test_dag_parsing_context", DEFAULT_DATE.isoformat()] @@ -1014,7 +1009,7 @@ def test_get_dag_excludes_examples_with_bundle(self, configure_testing_dag_bundl from airflow.utils.cli import get_dag as get_bagged_dag # type: ignore with configure_testing_dag_bundle(TEST_DAGS_FOLDER / "test_sensor.py"): - # example DAG should not be found since include_examples=False + # example DAG should not be found since the testing bundle only exposes test_sensor.py with pytest.raises(AirflowException, match="could not be found"): get_bagged_dag(bundle_names=["testing"], dag_id="example_simplest_dag") diff --git a/airflow-core/tests/unit/cli/commands/test_pool_command.py b/airflow-core/tests/unit/cli/commands/test_pool_command.py index 8fea33d7a7ffa..828497e9c2d38 100644 --- a/airflow-core/tests/unit/cli/commands/test_pool_command.py +++ b/airflow-core/tests/unit/cli/commands/test_pool_command.py @@ -35,7 +35,7 @@ class TestCliPools: @classmethod def setup_class(cls): - cls.dagbag = models.DagBag(include_examples=True) + cls.dagbag = models.DagBag() cls.parser = cli_parser.get_parser() settings.configure_orm() cls.session = Session diff --git a/airflow-core/tests/unit/cli/commands/test_task_command.py b/airflow-core/tests/unit/cli/commands/test_task_command.py index f39a8409bc243..038cbab811849 100644 --- a/airflow-core/tests/unit/cli/commands/test_task_command.py +++ b/airflow-core/tests/unit/cli/commands/test_task_command.py @@ -87,7 +87,8 @@ class TestCliTasks: @classmethod def setup_class(cls): - parse_and_sync_to_db(os.devnull, include_examples=True) + with conf_vars({("core", "load_examples"): "True"}): + parse_and_sync_to_db(os.devnull) cls.parser = cli_parser.get_parser() clear_db_runs() @@ -459,7 +460,20 @@ def test_task_state(self): ) def test_task_states_for_dag_run(self): - dag2 = DagBag().dags["example_python_operator"] + # Build a minimal DAG inline rather than importing one from the + # standard provider's example_dags. The test only asserts CLI + # behaviour around a known dag_id/task_id pair, so reproducing the + # name and a single task is enough and keeps this core test + # decoupled from the standard provider's example DAGs. + from airflow.sdk import DAG + + with DAG( + dag_id="example_python_operator", + schedule=None, + start_date=timezone.datetime(2021, 1, 1), + ) as dag2: + BashOperator(task_id="print_the_context", bash_command="echo hello") + lazy_deserialized_dag2 = LazyDeserializedDAG.from_dag(dag2) SerializedDagModel.write_dag(lazy_deserialized_dag2, bundle_name="testing") diff --git a/airflow-core/tests/unit/cli/commands/test_team_command.py b/airflow-core/tests/unit/cli/commands/test_team_command.py index 55892489f8b77..a10837894a780 100644 --- a/airflow-core/tests/unit/cli/commands/test_team_command.py +++ b/airflow-core/tests/unit/cli/commands/test_team_command.py @@ -52,7 +52,7 @@ def _cleanup(cls): @classmethod def setup_class(cls): - cls.dagbag = models.DagBag(include_examples=True) + cls.dagbag = models.DagBag() cls.parser = cli_parser.get_parser() settings.configure_orm() cls.session = Session diff --git a/airflow-core/tests/unit/cli/commands/test_variable_command.py b/airflow-core/tests/unit/cli/commands/test_variable_command.py index 21d2fb66822b5..e9f4b94f30840 100644 --- a/airflow-core/tests/unit/cli/commands/test_variable_command.py +++ b/airflow-core/tests/unit/cli/commands/test_variable_command.py @@ -120,7 +120,7 @@ def _create(data, format="yaml", filename=None): class TestCliVariables: @classmethod def setup_class(cls): - cls.dagbag = models.DagBag(include_examples=True) + cls.dagbag = models.DagBag() cls.parser = cli_parser.get_parser() def setup_method(self): diff --git a/airflow-core/tests/unit/cli/conftest.py b/airflow-core/tests/unit/cli/conftest.py index 2967e48cd6c1b..b9be97bfc9f42 100644 --- a/airflow-core/tests/unit/cli/conftest.py +++ b/airflow-core/tests/unit/cli/conftest.py @@ -56,7 +56,7 @@ def load_examples(): @pytest.fixture(scope="session") def dagbag(): - return DagBag(include_examples=True) + return DagBag() @pytest.fixture(scope="session") diff --git a/airflow-core/tests/unit/core/test_impersonation_tests.py b/airflow-core/tests/unit/core/test_impersonation_tests.py index 8165d1f6d73f6..7325cb3ef5ba2 100644 --- a/airflow-core/tests/unit/core/test_impersonation_tests.py +++ b/airflow-core/tests/unit/core/test_impersonation_tests.py @@ -167,7 +167,7 @@ def setup_impersonation_tests(self, create_airflow_home): @staticmethod def get_dagbag(dag_folder): """Get DagBag and print statistic into the log.""" - dagbag = DagBag(dag_folder=dag_folder, include_examples=False) + dagbag = DagBag(dag_folder=dag_folder) logger.info("Loaded DAGs:") logger.info(dagbag.dagbag_report()) return dagbag diff --git a/airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py b/airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py index 2c00e8aa25907..0d17069c831dd 100644 --- a/airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py +++ b/airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py @@ -466,4 +466,13 @@ def test_multiple_bundles_one_fails(clear_db, session): def test_get_all_bundle_names(): - assert DagBundlesManager().get_all_bundle_names() == ["dags-folder", "example_dags"] + bundle_names = DagBundlesManager().get_all_bundle_names() + # Built-in bundles are always present. + assert "dags-folder" in bundle_names + assert "example_dags" in bundle_names + # Any other bundle exposed here comes from a provider's example_dags + # folder discovered via ProvidersManager. Their presence depends on + # which providers are installed in the environment, so we only check + # the naming suffix instead of pinning an exact list. + extra = [n for n in bundle_names if n not in {"dags-folder", "example_dags"}] + assert all(n.endswith("-example-dags") for n in extra) diff --git a/airflow-core/tests/unit/dag_processing/test_dagbag.py b/airflow-core/tests/unit/dag_processing/test_dagbag.py index a673f9f1b0d2a..99abd92a59f73 100644 --- a/airflow-core/tests/unit/dag_processing/test_dagbag.py +++ b/airflow-core/tests/unit/dag_processing/test_dagbag.py @@ -28,6 +28,7 @@ import zipfile from copy import deepcopy from datetime import datetime, timedelta, timezone +from pathlib import Path from unittest import mock from unittest.mock import patch @@ -56,7 +57,25 @@ pytestmark = pytest.mark.db_test -example_dags_folder = AIRFLOW_ROOT_PATH / "airflow-core" / "src" / "airflow" / "example_dags" / "standard" + +def _standard_example_dags_folder() -> Path: + """ + Return the filesystem path of the standard provider's ``example_dags``. + + Importing the provider lazily keeps the test module collectable in + environments where the standard provider is not yet installed. The + tests that actually need the folder will fail explicitly when the + provider is missing, instead of breaking pytest collection. + """ + from airflow.providers.standard import example_dags + + return Path(example_dags.__file__).parent + + +@pytest.fixture +def standard_example_dags_folder() -> Path: + return _standard_example_dags_folder() + PY311 = sys.version_info >= (3, 11) PY313 = sys.version_info >= (3, 13) @@ -334,20 +353,20 @@ def teardown_class(self): def test_dagbag_with_bundle_name(self, tmp_path): """Test that DagBag constructor accepts and stores bundle_name parameter.""" - dagbag = DagBag(dag_folder=os.fspath(tmp_path), include_examples=False, bundle_name="test_bundle") + dagbag = DagBag(dag_folder=os.fspath(tmp_path), bundle_name="test_bundle") assert dagbag.bundle_name == "test_bundle" # Test with None (default) - dagbag2 = DagBag(dag_folder=os.fspath(tmp_path), include_examples=False) + dagbag2 = DagBag(dag_folder=os.fspath(tmp_path)) assert dagbag2.bundle_name is None - def test_get_existing_dag(self, tmp_path): + def test_get_existing_dag(self, tmp_path, standard_example_dags_folder): """ Test that we're able to parse some example DAGs and retrieve them """ - dagbag = DagBag(dag_folder=os.fspath(tmp_path), include_examples=True, bundle_name="test_bundle") + dagbag = DagBag(dag_folder=standard_example_dags_folder, bundle_name="test_bundle") - some_expected_dag_ids = ["example_bash_operator", "example_branch_operator"] + some_expected_dag_ids = ["example_bash_operator", "example_python_operator"] for dag_id in some_expected_dag_ids: dag = dagbag.get_dag(dag_id) @@ -361,7 +380,7 @@ def test_get_non_existing_dag(self, tmp_path): """ test that retrieving a non existing dag id returns None without crashing """ - dagbag = DagBag(dag_folder=os.fspath(tmp_path), include_examples=False) + dagbag = DagBag(dag_folder=os.fspath(tmp_path)) non_existing_dag_id = "non_existing_dag_id" assert dagbag.get_dag(non_existing_dag_id) is None @@ -377,7 +396,7 @@ def test_dont_load_example(self, tmp_path): """ test that the example are not loaded """ - dagbag = DagBag(dag_folder=os.fspath(tmp_path), include_examples=False) + dagbag = DagBag(dag_folder=os.fspath(tmp_path)) assert dagbag.size() == 0 @@ -390,7 +409,7 @@ def test_safe_mode_heuristic_match(self, tmp_path): path.write_text("# airflow\n# DAG") with conf_vars({("core", "dags_folder"): os.fspath(path.parent)}): - dagbag = DagBag(include_examples=False, safe_mode=True) + dagbag = DagBag(safe_mode=True) assert len(dagbag.dagbag_stats) == 1 assert dagbag.dagbag_stats[0].file == path.name @@ -403,7 +422,7 @@ def test_safe_mode_heuristic_mismatch(self, tmp_path): path = tmp_path / "testfile.py" path.write_text("") with conf_vars({("core", "dags_folder"): os.fspath(path.parent)}): - dagbag = DagBag(include_examples=False, safe_mode=True) + dagbag = DagBag(safe_mode=True) assert len(dagbag.dagbag_stats) == 0 def test_safe_mode_disabled(self, tmp_path): @@ -411,7 +430,7 @@ def test_safe_mode_disabled(self, tmp_path): path = tmp_path / "testfile.py" path.write_text("") with conf_vars({("core", "dags_folder"): os.fspath(path.parent)}): - dagbag = DagBag(include_examples=False, safe_mode=False) + dagbag = DagBag(safe_mode=False) assert len(dagbag.dagbag_stats) == 1 assert dagbag.dagbag_stats[0].file == path.name @@ -433,7 +452,7 @@ def test_dagbag_stats_file_is_relative_path_with_mixed_separators(self, tmp_path # but the filesystem returns paths with backslashes dags_folder_with_forward_slashes = path.parent.as_posix() with conf_vars({("core", "dags_folder"): dags_folder_with_forward_slashes}): - dagbag = DagBag(include_examples=False, safe_mode=True) + dagbag = DagBag(safe_mode=True) assert len(dagbag.dagbag_stats) == 1 assert dagbag.dagbag_stats[0].file == path.name @@ -449,7 +468,6 @@ def test_dagbag_stats_includes_bundle_info(self, tmp_path): with conf_vars({("core", "dags_folder"): os.fspath(path.parent)}): dagbag = DagBag( - include_examples=False, safe_mode=True, bundle_path=bundle_path, bundle_name=bundle_name, @@ -466,7 +484,7 @@ def test_dagbag_stats_bundle_info_none_when_not_provided(self, tmp_path): path.write_text("# airflow\n# DAG") with conf_vars({("core", "dags_folder"): os.fspath(path.parent)}): - dagbag = DagBag(include_examples=False, safe_mode=True) + dagbag = DagBag(safe_mode=True) assert len(dagbag.dagbag_stats) == 1 stat = dagbag.dagbag_stats[0] @@ -480,12 +498,12 @@ def test_process_file_that_contains_multi_bytes_char(self, tmp_path): path = tmp_path / "testfile.py" path.write_text("\u3042") # write multi-byte char (hiragana) - dagbag = DagBag(dag_folder=os.fspath(path.parent), include_examples=False) + dagbag = DagBag(dag_folder=os.fspath(path.parent)) assert dagbag.process_file(os.fspath(path)) == [] def test_process_file_duplicated_dag_id(self, tmp_path): """Loading a DAG with ID that already existed in a DAG bag should result in an import error.""" - dagbag = DagBag(dag_folder=os.fspath(tmp_path), include_examples=False) + dagbag = DagBag(dag_folder=os.fspath(tmp_path)) def create_dag(): from airflow.sdk import dag @@ -526,7 +544,6 @@ def test_import_errors_use_relative_path_with_bundle(self, tmp_path): dagbag = DagBag( dag_folder=os.fspath(dag_path), - include_examples=False, bundle_path=bundle_path, bundle_name="test_bundle", ) @@ -559,7 +576,6 @@ def my_flow(): dagbag = DagBag( dag_folder=os.fspath(bundle_path), - include_examples=False, bundle_path=bundle_path, bundle_name="test_bundle", ) @@ -585,7 +601,7 @@ def test_zip_skip_log(self, caplog, test_zip_path): it doesn't have "airflow" and "DAG" """ caplog.set_level(logging.INFO) - dagbag = DagBag(dag_folder=test_zip_path, include_examples=False) + dagbag = DagBag(dag_folder=test_zip_path) assert dagbag.has_logged assert ( @@ -598,7 +614,7 @@ def test_zip(self, tmp_path, test_zip_path): test the loading of a DAG within a zip file that includes dependencies """ syspath_before = deepcopy(sys.path) - dagbag = DagBag(dag_folder=os.fspath(tmp_path), include_examples=False) + dagbag = DagBag(dag_folder=os.fspath(tmp_path)) dagbag.process_file(test_zip_path) assert dagbag.get_dag("test_zip_dag") assert sys.path == syspath_before # sys.path doesn't change @@ -614,12 +630,12 @@ def test_process_dag_file_without_timeout( """ mocked_get_dagbag_import_timeout.return_value = 0 - dagbag = DagBag(dag_folder=os.fspath(tmp_path), include_examples=False) + dagbag = DagBag(dag_folder=os.fspath(tmp_path)) dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, "test_sensor.py")) mocked_timeout.assert_not_called() mocked_get_dagbag_import_timeout.return_value = -1 - dagbag = DagBag(dag_folder=os.fspath(tmp_path), include_examples=False) + dagbag = DagBag(dag_folder=os.fspath(tmp_path)) dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, "test_sensor.py")) mocked_timeout.assert_not_called() @@ -637,7 +653,7 @@ def test_process_dag_file_with_non_default_timeout( # ensure the test value is not equal to the default value assert timeout_value != settings.conf.getfloat("core", "DAGBAG_IMPORT_TIMEOUT") - dagbag = DagBag(dag_folder=os.fspath(tmp_path), include_examples=False) + dagbag = DagBag(dag_folder=os.fspath(tmp_path)) dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, "test_sensor.py")) mocked_timeout.assert_called_once_with(timeout_value, error_message=mock.ANY) @@ -651,7 +667,7 @@ def test_check_value_type_from_get_dagbag_import_timeout( """ mocked_get_dagbag_import_timeout.return_value = "1" - dagbag = DagBag(dag_folder=os.fspath(tmp_path), include_examples=False) + dagbag = DagBag(dag_folder=os.fspath(tmp_path)) with pytest.raises( TypeError, match=r"Value \(1\) from get_dagbag_import_timeout must be int or float" ): @@ -673,7 +689,7 @@ def test_process_file_cron_validity_check( self, request: pytest.FixtureRequest, invalid_dag_name: str, tmp_path ): """Test if an invalid cron expression as schedule interval can be identified""" - dagbag = DagBag(dag_folder=os.fspath(tmp_path), include_examples=False) + dagbag = DagBag(dag_folder=os.fspath(tmp_path)) assert len(dagbag.import_errors) == 0 dagbag.process_file(request.getfixturevalue(invalid_dag_name)) assert len(dagbag.import_errors) == 1 @@ -689,7 +705,7 @@ def test_process_file_invalid_param_check(self, tmp_path): "test_invalid_param3.py", "test_invalid_param4.py", ] - dagbag = DagBag(dag_folder=os.fspath(tmp_path), include_examples=False) + dagbag = DagBag(dag_folder=os.fspath(tmp_path)) assert len(dagbag.import_errors) == 0 for file in invalid_dag_files: @@ -705,7 +721,7 @@ def test_process_file_valid_param_check(self, tmp_path): "test_valid_param.py", "test_valid_param2.py", ] - dagbag = DagBag(dag_folder=os.fspath(tmp_path), include_examples=False) + dagbag = DagBag(dag_folder=os.fspath(tmp_path)) assert len(dagbag.import_errors) == 0 for file in valid_dag_files: @@ -714,7 +730,7 @@ def test_process_file_valid_param_check(self, tmp_path): assert len(dagbag.dags) == len(valid_dag_files) @patch.object(DagModel, "get_current") - def test_get_dag_without_refresh(self, mock_dagmodel): + def test_get_dag_without_refresh(self, mock_dagmodel, standard_example_dags_folder): """ Test that, once a DAG is loaded, it doesn't get refreshed again if it hasn't been expired. @@ -733,7 +749,7 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): _TestDagBag.process_file_calls += 1 super().process_file(filepath, only_if_updated, safe_mode) - dagbag = _TestDagBag(include_examples=True) + dagbag = _TestDagBag(dag_folder=standard_example_dags_folder) dagbag.process_file_calls # Should not call process_file again, since it's already loaded during init. @@ -742,25 +758,24 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): assert dagbag.process_file_calls == 1 @pytest.mark.parametrize( - ("file_to_load", "expected"), + ("file_name", "expected_dag_id"), ( pytest.param( - pathlib.Path(example_dags_folder) / "example_bash_operator.py", - { - "example_bash_operator": f"{example_dags_folder.relative_to(AIRFLOW_ROOT_PATH) / 'example_bash_operator.py'}" - }, + "example_bash_operator.py", + "example_bash_operator", id="example_bash_operator", ), ), ) - def test_get_dag_registration(self, file_to_load, expected): + def test_get_dag_registration(self, file_name, expected_dag_id, standard_example_dags_folder): pytest.importorskip("system.standard") - dagbag = DagBag(dag_folder=os.devnull, include_examples=False) + file_to_load = standard_example_dags_folder / file_name + expected_path = standard_example_dags_folder.relative_to(AIRFLOW_ROOT_PATH) / file_name + dagbag = DagBag(dag_folder=os.devnull) dagbag.process_file(os.fspath(file_to_load)) - for dag_id, path in expected.items(): - dag = dagbag.get_dag(dag_id) - assert dag, f"{dag_id} was bagged" - assert dag.fileloc.endswith(path) + dag = dagbag.get_dag(expected_dag_id) + assert dag, f"{expected_dag_id} was bagged" + assert dag.fileloc.endswith(str(expected_path)) @pytest.mark.parametrize( ("expected"), @@ -775,7 +790,7 @@ def test_get_dag_registration(self, file_to_load, expected): ), ) def test_get_zip_dag_registration(self, test_zip_path, expected): - dagbag = DagBag(dag_folder=os.devnull, include_examples=False) + dagbag = DagBag(dag_folder=os.devnull) dagbag.process_file(test_zip_path) for dag_id, path in expected.items(): dag = dagbag.get_dag(dag_id) @@ -783,7 +798,7 @@ def test_get_zip_dag_registration(self, test_zip_path, expected): assert dag.fileloc.endswith(f"{pathlib.Path(test_zip_path).parent}/{path}") def test_dag_registration_with_failure(self): - dagbag = DagBag(dag_folder=os.devnull, include_examples=False) + dagbag = DagBag(dag_folder=os.devnull) found = dagbag.process_file(str(TEST_DAGS_FOLDER / "test_invalid_dup_task.py")) assert found == [] @@ -798,18 +813,18 @@ def zip_with_valid_dag_and_dup_tasks(self, tmp_path: pathlib.Path) -> str: return os.fspath(zipped) def test_dag_registration_with_failure_zipped(self, zip_with_valid_dag_and_dup_tasks): - dagbag = DagBag(dag_folder=os.devnull, include_examples=False) + dagbag = DagBag(dag_folder=os.devnull) found = dagbag.process_file(zip_with_valid_dag_and_dup_tasks) assert len(found) == 1 assert [dag.dag_id for dag in found] == ["test_example_bash_operator"] @patch.object(DagModel, "get_current") - def test_refresh_py_dag(self, mock_dagmodel, tmp_path): + def test_refresh_py_dag(self, mock_dagmodel, tmp_path, standard_example_dags_folder): """ Test that we can refresh an ordinary .py DAG """ dag_id = "example_bash_operator" - fileloc = str(example_dags_folder / "example_bash_operator.py") + fileloc = str(standard_example_dags_folder / "example_bash_operator.py") mock_dagmodel.return_value = DagModel() mock_dagmodel.return_value.last_expired = datetime.max.replace(tzinfo=timezone.utc) @@ -823,7 +838,7 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): _TestDagBag.process_file_calls += 1 return super().process_file(filepath, only_if_updated, safe_mode) - dagbag = _TestDagBag(dag_folder=os.fspath(tmp_path), include_examples=True) + dagbag = _TestDagBag(dag_folder=standard_example_dags_folder) assert dagbag.process_file_calls == 1 dag = dagbag.get_dag(dag_id) @@ -851,7 +866,7 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): _TestDagBag.process_file_calls += 1 return super().process_file(filepath, only_if_updated, safe_mode) - dagbag = _TestDagBag(dag_folder=os.path.realpath(test_zip_path), include_examples=False) + dagbag = _TestDagBag(dag_folder=os.path.realpath(test_zip_path)) assert dagbag.process_file_calls == 1 dag = dagbag.get_dag(dag_id) @@ -868,7 +883,7 @@ def process_dag(self, create_dag, tmp_path): path = tmp_path / "testfile.py" path.write_text(source) - dagbag = DagBag(dag_folder=os.fspath(path.parent), include_examples=False) + dagbag = DagBag(dag_folder=os.fspath(path.parent)) found_dags = dagbag.process_file(os.fspath(path)) return dagbag, found_dags, os.fspath(path) @@ -923,7 +938,7 @@ def test_process_file_with_none(self, tmp_path): """ test that process_file can handle Nones """ - dagbag = DagBag(dag_folder=os.fspath(tmp_path), include_examples=False) + dagbag = DagBag(dag_folder=os.fspath(tmp_path)) assert dagbag.process_file(None) == [] @@ -949,7 +964,7 @@ def test_timeout_dag_errors_are_import_errors(self, tmp_path, caplog): """) with conf_vars({("core", "DAGBAG_IMPORT_TIMEOUT"): "0.01"}): - dagbag = DagBag(dag_folder=os.fspath(tmp_path), include_examples=False) + dagbag = DagBag(dag_folder=os.fspath(tmp_path)) assert dag_file.as_posix() in dagbag.import_errors assert "DagBag import timeout for" in caplog.text @@ -979,7 +994,7 @@ def test_import_error_tracebacks(self, tmp_path, depth): with contextlib.ExitStack() as cm: if depth is not None: cm.enter_context(conf_vars({("core", "dagbag_import_error_traceback_depth"): str(depth)})) - dagbag = DagBag(dag_folder=unparseable_filename, include_examples=False) + dagbag = DagBag(dag_folder=unparseable_filename) import_errors = dagbag.import_errors assert unparseable_filename in import_errors @@ -995,7 +1010,7 @@ def test_import_error_tracebacks_zip(self, tmp_path, depth): with contextlib.ExitStack() as cm: if depth is not None: cm.enter_context(conf_vars({("core", "dagbag_import_error_traceback_depth"): str(depth)})) - dagbag = DagBag(dag_folder=invalid_zip_filename, include_examples=False) + dagbag = DagBag(dag_folder=invalid_zip_filename) import_errors = dagbag.import_errors assert invalid_dag_filename in import_errors assert import_errors[invalid_dag_filename] == self._make_test_traceback(invalid_dag_filename, depth) @@ -1010,7 +1025,7 @@ def test_task_cluster_policy_violation(self): dag_id = "test_missing_owner" err_cls_name = "AirflowClusterPolicyViolation" - dagbag = DagBag(dag_folder=dag_file, include_examples=False) + dagbag = DagBag(dag_folder=dag_file) assert set() == set(dagbag.dag_ids) expected_import_errors = { dag_file: ( @@ -1032,7 +1047,7 @@ def test_task_cluster_policy_nonstring_owner(self): dag_id = "test_nonstring_owner" err_cls_name = "AirflowClusterPolicyViolation" - dagbag = DagBag(dag_folder=dag_file, include_examples=False) + dagbag = DagBag(dag_folder=dag_file) assert set() == set(dagbag.dag_ids) expected_import_errors = { dag_file: ( @@ -1051,7 +1066,7 @@ def test_task_cluster_policy_obeyed(self): """ dag_file = os.path.join(TEST_DAGS_FOLDER, "test_with_non_default_owner.py") - dagbag = DagBag(dag_folder=dag_file, include_examples=False) + dagbag = DagBag(dag_folder=dag_file) assert {"test_with_non_default_owner"} == set(dagbag.dag_ids) assert dagbag.import_errors == {} @@ -1060,14 +1075,13 @@ def test_task_cluster_policy_obeyed(self): def test_dag_cluster_policy_obeyed(self): dag_file = os.path.join(TEST_DAGS_FOLDER, "test_dag_with_no_tags.py") - dagbag = DagBag(dag_folder=dag_file, include_examples=False) + dagbag = DagBag(dag_folder=dag_file) assert len(dagbag.dag_ids) == 0 assert "has no tags" in dagbag.import_errors[dag_file] def test_dagbag_dag_collection(self): dagbag = DagBag( dag_folder=TEST_DAGS_FOLDER, - include_examples=False, collect_dags=False, bundle_name="test_collection", ) @@ -1078,15 +1092,15 @@ def test_dagbag_dag_collection(self): assert dagbag.dags # test that dagbag.dags is not empty if collect_dags is True - dagbag = DagBag(dag_folder=TEST_DAGS_FOLDER, include_examples=False, bundle_name="test_collection") + dagbag = DagBag(dag_folder=TEST_DAGS_FOLDER, bundle_name="test_collection") assert dagbag.dags def test_dabgag_captured_warnings(self): dag_file = os.path.join(TEST_DAGS_FOLDER, "test_dag_warnings.py") - dagbag = DagBag(dag_folder=dag_file, include_examples=False, collect_dags=False) + dagbag = DagBag(dag_folder=dag_file, collect_dags=False) assert dag_file not in dagbag.captured_warnings - dagbag.collect_dags(dag_folder=dagbag.dag_folder, include_examples=False, only_if_updated=False) + dagbag.collect_dags(dag_folder=dagbag.dag_folder, only_if_updated=False) assert dagbag.dagbag_stats[0].warning_num == 2 assert dagbag.captured_warnings == { dag_file: ( @@ -1098,14 +1112,14 @@ def test_dabgag_captured_warnings(self): with warnings.catch_warnings(): # Disable capture DeprecationWarning, and it should be reflected in captured warnings warnings.simplefilter("ignore", DeprecationWarning) - dagbag.collect_dags(dag_folder=dagbag.dag_folder, include_examples=False, only_if_updated=False) + dagbag.collect_dags(dag_folder=dagbag.dag_folder, only_if_updated=False) assert dag_file in dagbag.captured_warnings assert len(dagbag.captured_warnings[dag_file]) == 1 assert dagbag.dagbag_stats[0].warning_num == 1 # Disable all warnings, no captured warnings expected warnings.simplefilter("ignore") - dagbag.collect_dags(dag_folder=dagbag.dag_folder, include_examples=False, only_if_updated=False) + dagbag.collect_dags(dag_folder=dagbag.dag_folder, only_if_updated=False) assert dag_file not in dagbag.captured_warnings assert dagbag.dagbag_stats[0].warning_num == 0 @@ -1119,7 +1133,7 @@ def warning_zipped_dag_path(self, tmp_path: pathlib.Path) -> str: def test_dabgag_captured_warnings_zip(self, warning_zipped_dag_path: str): in_zip_dag_file = f"{warning_zipped_dag_path}/test_dag_warnings.py" - dagbag = DagBag(dag_folder=warning_zipped_dag_path, include_examples=False) + dagbag = DagBag(dag_folder=warning_zipped_dag_path) assert dagbag.dagbag_stats[0].warning_num == 2 assert dagbag.captured_warnings == { warning_zipped_dag_path: ( @@ -1155,7 +1169,7 @@ def test_dag_warnings_invalid_pool(self, known_pools, expected): BaseOperator(task_id="1") BaseOperator(task_id="2", pool="pool1") - dagbag = DagBag(dag_folder="", include_examples=False, collect_dags=False, known_pools=known_pools) + dagbag = DagBag(dag_folder="", collect_dags=False, known_pools=known_pools) dagbag.bag_dag(dag) assert dagbag.dag_warnings == expected @@ -1184,7 +1198,7 @@ def mytask(): ) ) - dagbag = DagBag(dag_folder=os.fspath(tmp_path), include_examples=False) + dagbag = DagBag(dag_folder=os.fspath(tmp_path)) assert "Received SIGSEGV signal while processing" in caplog.text assert dag_file.as_posix() in dagbag.import_errors @@ -1209,7 +1223,7 @@ def mytask(): ) with mock.patch("airflow.dag_processing.importers.python_importer.signal.signal") as mock_signal: mock_signal.side_effect = ValueError("Invalid signal setting") - DagBag(dag_folder=os.fspath(tmp_path), include_examples=False) + DagBag(dag_folder=os.fspath(tmp_path)) assert "SIGSEGV signal handler registration failed. Not in the main thread" in caplog.text @@ -1355,7 +1369,7 @@ def test_dagbag_no_bundle_path_no_syspath_modification(self, tmp_path): ) ) syspath_before = deepcopy(sys.path) - dagbag = DagBag(dag_folder=str(dag_file), include_examples=False) + dagbag = DagBag(dag_folder=str(dag_file)) dag = dagbag.get_dag("simple_dag") assert str(tmp_path) not in dag.description diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index c10e73473bfef..6850887e02146 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -754,7 +754,6 @@ def test_scan_stale_dags(self, session): ) dagbag = DagBag( test_dag_path.absolute_path, - include_examples=False, bundle_path=test_dag_path.bundle_path, ) @@ -1125,7 +1124,7 @@ def test_refresh_dags_dir_doesnt_delete_zipped_dags( self, tmp_path, session, configure_testing_dag_bundle, test_zip_path ): """Test DagFileProcessorManager._refresh_dag_dir method""" - dagbag = DagBag(dag_folder=tmp_path, include_examples=False) + dagbag = DagBag(dag_folder=tmp_path) dagbag.process_file(test_zip_path) dag = dagbag.get_dag("test_zip_dag") sync_dag_to_db(dag) diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 16c42e8acd029..2982a725f59c7 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -4103,7 +4103,7 @@ def test_dagrun_root_after_dagrun_unfinished(self, mock_executor, testing_dag_bu Noted: the DagRun state could be still in running state during CI. """ - dagbag = DagBag(TEST_DAG_FOLDER, include_examples=False) + dagbag = DagBag(TEST_DAG_FOLDER) sync_bag_to_db(dagbag, "testing", None) dag_id = "test_dagrun_states_root_future" @@ -4121,7 +4121,7 @@ def test_scheduler_start_date(self, testing_dag_bundle): """ Test that the scheduler respects start_dates, even when DAGs have run """ - dagbag = DagBag(TEST_DAG_FOLDER, include_examples=False) + dagbag = DagBag(TEST_DAG_FOLDER) with create_session() as session: dag_id = "test_start_date_scheduling" dag = dagbag.get_dag(dag_id) @@ -4178,7 +4178,6 @@ def test_scheduler_task_start_date_catchup_true(self, testing_dag_bundle): """ dagbag = DagBag( dag_folder=os.path.join(settings.DAGS_FOLDER, "test_scheduler_dags.py"), - include_examples=False, ) dag_id = "test_task_start_date_scheduling" dag = dagbag.get_dag(dag_id) @@ -4219,7 +4218,6 @@ def test_scheduler_task_start_date_catchup_false(self, testing_dag_bundle): """ dagbag = DagBag( dag_folder=os.path.join(settings.DAGS_FOLDER, "test_scheduler_dags.py"), - include_examples=False, ) dag_id = "test_task_start_date_scheduling" dag = dagbag.get_dag(dag_id) @@ -4263,7 +4261,7 @@ def test_scheduler_multiprocessing(self): """ Test that the scheduler can successfully queue multiple dags in parallel """ - dagbag = DagBag(TEST_DAG_FOLDER, include_examples=False) + dagbag = DagBag(TEST_DAG_FOLDER) dag_ids = [ "test_start_date_scheduling", "test_task_start_date_scheduling", @@ -7412,7 +7410,7 @@ def watch_heartbeat(*args, **kwargs): def test_mapped_dag(self, dag_id, session, testing_dag_bundle): """End-to-end test of a simple mapped dag""" - dagbag = DagBag(dag_folder=TEST_DAGS_FOLDER, include_examples=False) + dagbag = DagBag(dag_folder=TEST_DAGS_FOLDER) sync_bag_to_db(dagbag, "testing", None) dagbag.process_file(str(TEST_DAGS_FOLDER / f"{dag_id}.py")) dag = dagbag.get_dag(dag_id) @@ -7445,7 +7443,7 @@ def test_should_mark_empty_task_as_success(self, testing_dag_bundle): dag_file = Path(__file__).parents[1] / "dags/test_only_empty_tasks.py" # Write DAGs to dag and serialized_dag table - dagbag = DagBag(dag_folder=dag_file, include_examples=False) + dagbag = DagBag(dag_folder=dag_file) sync_bag_to_db(dagbag, "testing", None) scheduler_job = Job() @@ -9052,7 +9050,7 @@ def test_execute_queries_count_with_harvested_dags( ), ): dagruns = [] - dagbag = DagBag(dag_folder=ELASTIC_DAG_FILE, include_examples=False) + dagbag = DagBag(dag_folder=ELASTIC_DAG_FILE) sync_bag_to_db(dagbag, "testing", None) for i, dag in enumerate(dagbag.dags.values()): @@ -9144,7 +9142,7 @@ def test_process_dags_queries_count( } ), ): - dagbag = DagBag(dag_folder=ELASTIC_DAG_FILE, include_examples=False) + dagbag = DagBag(dag_folder=ELASTIC_DAG_FILE) sync_bag_to_db(dagbag, "testing", None) scheduler_job = Job(job_type=SchedulerJobRunner.job_type) diff --git a/airflow-core/tests/unit/models/test_dag.py b/airflow-core/tests/unit/models/test_dag.py index b34ab12dd4aef..e16534e894e12 100644 --- a/airflow-core/tests/unit/models/test_dag.py +++ b/airflow-core/tests/unit/models/test_dag.py @@ -207,7 +207,7 @@ def test_dag_test_auto_parses_when_not_serialized(self, test_dags_bundle, sessio dag_id = "test_example_bash_operator" - dagbag = DagBag(dag_folder=os.fspath(TEST_DAGS_FOLDER), include_examples=False) + dagbag = DagBag(dag_folder=os.fspath(TEST_DAGS_FOLDER)) dag = dagbag.dags.get(dag_id) # Ensure not serialized yet @@ -235,7 +235,7 @@ def test_dag_test_syncs_sibling_for_trigger_dagrun(self, test_dags_bundle, sessi parent_id = "test_dag_test_trigger_parent" target_id = "test_dag_test_trigger_target" - dagbag = DagBag(dag_folder=os.fspath(TEST_DAGS_FOLDER), include_examples=False) + dagbag = DagBag(dag_folder=os.fspath(TEST_DAGS_FOLDER)) parent = dagbag.dags.get(parent_id) assert parent is not None @@ -268,7 +268,7 @@ def test_dag_test_syncs_sibling_for_dynamic_trigger_dagrun(self, test_dags_bundl parent_id = "test_dag_test_dynamic_trigger_parent" target_id = "test_dag_test_dynamic_trigger_target" - dagbag = DagBag(dag_folder=os.fspath(TEST_DAGS_FOLDER), include_examples=False) + dagbag = DagBag(dag_folder=os.fspath(TEST_DAGS_FOLDER)) parent = dagbag.dags.get(parent_id) assert parent is not None @@ -295,7 +295,7 @@ def test_dag_test_falls_back_when_recorded_bundle_no_longer_configured( parent_id = "test_dag_test_trigger_parent" target_id = "test_dag_test_trigger_target" - dagbag = DagBag(dag_folder=os.fspath(TEST_DAGS_FOLDER), include_examples=False) + dagbag = DagBag(dag_folder=os.fspath(TEST_DAGS_FOLDER)) parent = dagbag.dags.get(parent_id) assert parent is not None @@ -323,7 +323,7 @@ def test_dag_test_only_syncs_owning_bundle_when_parent_already_serialized( """ parent_id = "test_dag_test_trigger_parent" - dagbag = DagBag(dag_folder=os.fspath(TEST_DAGS_FOLDER), include_examples=False) + dagbag = DagBag(dag_folder=os.fspath(TEST_DAGS_FOLDER)) parent = dagbag.dags.get(parent_id) assert parent is not None diff --git a/airflow-core/tests/unit/models/test_dagcode.py b/airflow-core/tests/unit/models/test_dagcode.py index 6bb6e412442b0..5fdade754ee5c 100644 --- a/airflow-core/tests/unit/models/test_dagcode.py +++ b/airflow-core/tests/unit/models/test_dagcode.py @@ -77,10 +77,10 @@ def teardown_method(self): def _write_two_example_dags(self, session): example_dags = make_example_dags(example_dags_module) - bash_dag = example_dags["example_bash_operator"] - sync_dag_to_db(bash_dag, session=session) - dag_version = DagVersion.get_latest_version("example_bash_operator") - x = DagCode(dag_version, bash_dag.fileloc) + xcomargs_dag = example_dags["example_xcom_args"] + sync_dag_to_db(xcomargs_dag, session=session) + dag_version = DagVersion.get_latest_version("example_xcom_args") + x = DagCode(dag_version, xcomargs_dag.fileloc) session.add(x) session.commit() xcom_dag = example_dags["example_xcom"] @@ -89,7 +89,7 @@ def _write_two_example_dags(self, session): x = DagCode(dag_version, xcom_dag.fileloc) session.add(x) session.commit() - return [bash_dag, xcom_dag] + return [xcomargs_dag, xcom_dag] def _write_example_dags(self): example_dags = make_example_dags(example_dags_module) @@ -133,7 +133,9 @@ def test_code_can_be_read_when_no_access_to_file(self, testing_dag_bundle): Test that code can be retrieved from DB when you do not have access to Code file. Source Code should at least exist in one of DB or File. """ - example_dag = make_example_dags(example_dags_module).get("example_bash_operator") + from airflow.providers.standard import example_dags + + example_dag = make_example_dags(example_dags).get("example_bash_operator") sync_dag_to_db(example_dag) # Mock that there is no access to the Dag File @@ -146,7 +148,9 @@ def test_code_can_be_read_when_no_access_to_file(self, testing_dag_bundle): def test_db_code_created_on_serdag_change(self, session, testing_dag_bundle): """Test new DagCode is created in DB when ser dag is changed""" - example_dag = make_example_dags(example_dags_module).get("example_bash_operator") + from airflow.providers.standard import example_dags + + example_dag = make_example_dags(example_dags).get("example_bash_operator") sync_dag_to_db(example_dag, session=session).create_dagrun( run_id="test1", run_after=pendulum.datetime(2025, 1, 1, tz="UTC"), diff --git a/airflow-core/tests/unit/models/test_dagrun.py b/airflow-core/tests/unit/models/test_dagrun.py index 34e8acbaacf60..b19d3724d450e 100644 --- a/airflow-core/tests/unit/models/test_dagrun.py +++ b/airflow-core/tests/unit/models/test_dagrun.py @@ -96,7 +96,7 @@ async def empty_callback_for_deadline(): def dagbag(): from airflow.dag_processing.dagbag import DagBag - return DagBag(include_examples=True) + return DagBag() @pytest.fixture diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py b/airflow-core/tests/unit/models/test_serialized_dag.py index 54438f8e82fc9..0f42d8efbdee8 100644 --- a/airflow-core/tests/unit/models/test_serialized_dag.py +++ b/airflow-core/tests/unit/models/test_serialized_dag.py @@ -158,14 +158,14 @@ def my_callable2(): def test_serialized_dag_is_updated_if_dag_is_changed(self, testing_dag_bundle): """Test Serialized DAG is updated if DAG is changed""" example_dags = make_example_dags(example_dags_module) - example_bash_op_dag = example_dags.get("example_bash_operator") + example_params_trigger_ui = example_dags.get("example_params_trigger_ui") dag_updated = SDM.write_dag( - dag=LazyDeserializedDAG.from_dag(example_bash_op_dag), + dag=LazyDeserializedDAG.from_dag(example_params_trigger_ui), bundle_name="testing", ) assert dag_updated is True - s_dag = SDM.get(example_bash_op_dag.dag_id) + s_dag = SDM.get(example_params_trigger_ui.dag_id) s_dag.dag.create_dagrun( run_id="test1", run_after=pendulum.datetime(2025, 1, 1, tz="UTC"), @@ -177,28 +177,28 @@ def test_serialized_dag_is_updated_if_dag_is_changed(self, testing_dag_bundle): # Test that if DAG is not changed, Serialized DAG is not re-written and last_updated # column is not updated dag_updated = SDM.write_dag( - dag=LazyDeserializedDAG.from_dag(example_bash_op_dag), + dag=LazyDeserializedDAG.from_dag(example_params_trigger_ui), bundle_name="testing", ) - s_dag_1 = SDM.get(example_bash_op_dag.dag_id) + s_dag_1 = SDM.get(example_params_trigger_ui.dag_id) assert s_dag_1.dag_hash == s_dag.dag_hash assert s_dag.created_at == s_dag_1.created_at assert dag_updated is False # Update DAG - example_bash_op_dag.tags.add("new_tag") - assert example_bash_op_dag.tags == {"example", "example2", "new_tag"} + example_params_trigger_ui.tags.add("new_tag") + assert example_params_trigger_ui.tags == {"example", "new_tag", "params"} dag_updated = SDM.write_dag( - dag=LazyDeserializedDAG.from_dag(example_bash_op_dag), + dag=LazyDeserializedDAG.from_dag(example_params_trigger_ui), bundle_name="testing", ) - s_dag_2 = SDM.get(example_bash_op_dag.dag_id) + s_dag_2 = SDM.get(example_params_trigger_ui.dag_id) assert s_dag.created_at != s_dag_2.created_at assert s_dag.dag_hash != s_dag_2.dag_hash - assert s_dag_2.data["dag"]["tags"] == ["example", "example2", "new_tag"] + assert s_dag_2.data["dag"]["tags"] == ["example", "new_tag", "params"] assert dag_updated is True def test_read_dags(self): @@ -217,7 +217,7 @@ def test_read_all_dags_only_picks_the_latest_serdags(self, session): serialized_dags = SDM.read_all_dags() assert len(example_dags) == len(serialized_dags) - dag = example_dags.get("example_bash_operator") + dag = example_dags.get("example_params_trigger_ui") create_scheduler_dag(dag=dag).create_dagrun( run_id="test1", run_after=pendulum.datetime(2025, 1, 1, tz="UTC"), diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py b/airflow-core/tests/unit/serialization/test_dag_serialization.py index 33db3d8d906d4..39d9549827e55 100644 --- a/airflow-core/tests/unit/serialization/test_dag_serialization.py +++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py @@ -474,7 +474,7 @@ def collect_dags(dag_folder=None): for directory in glob(f"{AIRFLOW_REPO_ROOT_PATH}/{pattern}"): if any([directory.startswith(excluded_pattern) for excluded_pattern in excluded_patterns]): continue - dagbag = DagBag(directory, include_examples=False) + dagbag = DagBag(directory) dags.update(dagbag.dags) import_errors.update(dagbag.import_errors) return dags, import_errors @@ -1824,9 +1824,7 @@ def mytask(): @pytest.mark.db_test def test_basic_mapped_dag(self, dag_maker): - dagbag = DagBag( - "airflow-core/src/airflow/example_dags/example_dynamic_task_mapping.py", include_examples=False - ) + dagbag = DagBag("airflow-core/src/airflow/example_dags/example_dynamic_task_mapping.py") assert not dagbag.import_errors dag = dagbag.dags["example_dynamic_task_mapping"] ser_dag = DagSerialization.to_dict(dag) diff --git a/devel-common/src/tests_common/pytest_plugin.py b/devel-common/src/tests_common/pytest_plugin.py index 98a871ebb12d2..788afa9533319 100644 --- a/devel-common/src/tests_common/pytest_plugin.py +++ b/devel-common/src/tests_common/pytest_plugin.py @@ -910,6 +910,7 @@ def dag_maker(request) -> Generator[DagMaker, None, None]: AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_1_PLUS, AIRFLOW_V_3_2_PLUS, + AIRFLOW_V_3_3_PLUS, NOTSET, ) @@ -932,7 +933,10 @@ def __init__(self): from airflow.models import DagBag # Keep all the serialized dags we've created in this test - self.dagbag = DagBag(os.devnull, include_examples=False) + if AIRFLOW_V_3_3_PLUS: + self.dagbag = DagBag(os.devnull) + else: + self.dagbag = DagBag(os.devnull, include_examples=False) # type: ignore[call-arg] def __enter__(self): self.serialized_model = None @@ -1732,7 +1736,11 @@ def _get(dag_id: str): from airflow import settings from airflow.models.serialized_dag import SerializedDagModel - from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS + from tests_common.test_utils.version_compat import ( + AIRFLOW_V_3_0_PLUS, + AIRFLOW_V_3_2_PLUS, + AIRFLOW_V_3_3_PLUS, + ) if AIRFLOW_V_3_2_PLUS: from airflow.dag_processing.dagbag import DagBag @@ -1740,7 +1748,10 @@ def _get(dag_id: str): from airflow.models.dagbag import DagBag # type: ignore[no-redef, attribute-defined] dag_file = AIRFLOW_CORE_TESTS_PATH / "unit" / "dags" / f"{dag_id}.py" - dagbag = DagBag(dag_folder=dag_file, include_examples=False) + if AIRFLOW_V_3_3_PLUS: + dagbag = DagBag(dag_folder=dag_file) + else: + dagbag = DagBag(dag_folder=dag_file, include_examples=False) # type: ignore[call-arg] dag = dagbag.get_dag(dag_id) diff --git a/devel-common/src/tests_common/test_utils/db.py b/devel-common/src/tests_common/test_utils/db.py index cbfb0b377ae71..b4d2bc1740cdb 100644 --- a/devel-common/src/tests_common/test_utils/db.py +++ b/devel-common/src/tests_common/test_utils/db.py @@ -64,7 +64,12 @@ ParseImportError, TaskOutletAssetReference, ) -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_1_PLUS, AIRFLOW_V_3_2_PLUS +from tests_common.test_utils.version_compat import ( + AIRFLOW_V_3_0_PLUS, + AIRFLOW_V_3_1_PLUS, + AIRFLOW_V_3_2_PLUS, + AIRFLOW_V_3_3_PLUS, +) log = logging.getLogger(__name__) @@ -185,7 +190,17 @@ def initial_db_init(): _bootstrap_dagbag() -def parse_and_sync_to_db(folder: Path | str, include_examples: bool = False): +def parse_and_sync_to_db(folder: Path | str): + """ + Parse DAGs in ``folder`` and sync them to the metadata database. + + On Airflow 3.3+, example DAGs are exposed as dedicated bundles + (``example_dags`` for core, ``apache-airflow-providers-*-example-dags`` + for each provider that ships an ``example_dags`` folder), and whether + they are loaded is controlled by the ``[core] load_examples`` + configuration option. Tests that need example DAGs should set + ``conf_vars({("core", "load_examples"): "true"})``. + """ if AIRFLOW_V_3_2_PLUS: from airflow.dag_processing.dagbag import DagBag else: @@ -199,17 +214,35 @@ def parse_and_sync_to_db(folder: Path | str, include_examples: bool = False): DagBundlesManager().sync_bundles_to_db(session=session) session.flush() - dagbag = DagBag(dag_folder=folder, include_examples=include_examples) - if AIRFLOW_V_3_1_PLUS: + if AIRFLOW_V_3_3_PLUS: + from airflow.dag_processing.dagbag import sync_bag_to_db + + # On 3.3+, example DAGs are exposed as their own bundles + # (``example_dags`` for core, ``apache-airflow-providers-*-example-dags`` + # for each provider that ships an ``example_dags`` folder). The + # bundle loop below already syncs every one of them, so the + # ``dags-folder`` DagBag must NOT pull example DAGs in too, + # otherwise the same DAG gets registered under two bundles and + # ``dag_schedule_asset_reference`` rows collide on the unique + # ``(asset_id, dag_id)`` constraint. + dagbag = DagBag(dag_folder=folder) + sync_bag_to_db(dagbag, "dags-folder", None, session=session) + for bundle in DagBundlesManager().get_all_dag_bundles(): + bundle_dagbag = DagBag(dag_folder=bundle.path) + sync_bag_to_db(bundle_dagbag, bundle.name, None, session=session) + + elif AIRFLOW_V_3_1_PLUS: try: from airflow.dag_processing.dagbag import sync_bag_to_db except ImportError: from airflow.models.dagbag import sync_bag_to_db # type: ignore[no-redef, attribute-defined] - + dagbag = DagBag(dag_folder=folder, include_examples=False) # type: ignore[call-arg] sync_bag_to_db(dagbag, "dags-folder", None, session=session) elif AIRFLOW_V_3_0_PLUS: + dagbag = DagBag(dag_folder=folder, include_examples=False) # type: ignore[call-arg] dagbag.sync_to_db("dags-folder", None, session) # type: ignore[attr-defined] else: + dagbag = DagBag(dag_folder=folder, include_examples=False) # type: ignore[call-arg] dagbag.sync_to_db(session=session) # type: ignore[attr-defined] return dagbag diff --git a/providers/fab/tests/unit/fab/www/views/conftest.py b/providers/fab/tests/unit/fab/www/views/conftest.py index 3c6e047deb994..96c423947e11b 100644 --- a/providers/fab/tests/unit/fab/www/views/conftest.py +++ b/providers/fab/tests/unit/fab/www/views/conftest.py @@ -43,7 +43,8 @@ def session(): @pytest.fixture(autouse=True, scope="module") def examples_dag_bag(session): - dag_bag = parse_and_sync_to_db(os.devnull, include_examples=True) + with conf_vars({("core", "load_examples"): "True"}): + dag_bag = parse_and_sync_to_db(os.devnull) session.commit() return dag_bag diff --git a/providers/google/tests/unit/google/cloud/operators/test_dataproc.py b/providers/google/tests/unit/google/cloud/operators/test_dataproc.py index 164580b6c7fbd..d04875431b497 100644 --- a/providers/google/tests/unit/google/cloud/operators/test_dataproc.py +++ b/providers/google/tests/unit/google/cloud/operators/test_dataproc.py @@ -460,7 +460,7 @@ def assert_warning(msg: str, warnings): class DataprocTestBase: @classmethod def setup_class(cls): - cls.dagbag = DagBag(dag_folder="/dev/null", include_examples=False) + cls.dagbag = DagBag(dag_folder="/dev/null") cls.dag = DAG( dag_id=TEST_DAG_ID, schedule=None, diff --git a/providers/google/tests/unit/google/cloud/operators/test_looker.py b/providers/google/tests/unit/google/cloud/operators/test_looker.py index 91f2387a64aa4..2cc2c5ac0513d 100644 --- a/providers/google/tests/unit/google/cloud/operators/test_looker.py +++ b/providers/google/tests/unit/google/cloud/operators/test_looker.py @@ -43,7 +43,7 @@ class LookerTestBase: @classmethod def setUpClass(cls): - cls.dagbag = DagBag(dag_folder="/dev/null", include_examples=False) + cls.dagbag = DagBag(dag_folder="/dev/null") cls.dag = DAG(TEST_DAG_ID, default_args={"owner": "airflow", "start_date": DEFAULT_DATE}) def setup_method(self): diff --git a/providers/openlineage/tests/unit/openlineage/plugins/test_execution.py b/providers/openlineage/tests/unit/openlineage/plugins/test_execution.py index ffb32b6ff1d37..256e873f55e47 100644 --- a/providers/openlineage/tests/unit/openlineage/plugins/test_execution.py +++ b/providers/openlineage/tests/unit/openlineage/plugins/test_execution.py @@ -86,7 +86,6 @@ def setup_job(self, task_name, run_id, listener_manager): dagbag = DagBag( dag_folder=TEST_DAG_FOLDER, - include_examples=False, ) dag = dagbag.dags.get("test_openlineage_execution") task = dag.get_task(task_name) @@ -189,7 +188,6 @@ def test_success_overtime_kills_tasks(self, listener_manager): dagbag = DagBag( dag_folder=TEST_DAG_FOLDER, - include_examples=False, ) dag = dagbag.dags.get("test_openlineage_execution") task = dag.get_task("execute_long_stall") diff --git a/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py b/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py index bcc5ad6153c6e..e99acd0575d79 100644 --- a/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py +++ b/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py @@ -101,7 +101,7 @@ def teardown_method(self): if AIRFLOW_V_3_0_PLUS: from airflow.models.dagbundle import DagBundleModel - session.execute(delete(DagBundleModel)) + session.execute(delete(DagBundleModel).where(DagBundleModel.name == "test_bundle")) session.commit() @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Implementation is different for Airflow 2 & 3") diff --git a/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py b/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py index fde38e4e2367a..fd5d7ea0fee11 100644 --- a/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py +++ b/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py @@ -62,7 +62,25 @@ from tests_common.test_utils.dag import create_scheduler_dag, sync_dag_to_db, sync_dags_to_db from tests_common.test_utils.db import clear_db_runs from tests_common.test_utils.mock_operators import MockOperator -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_1_PLUS, AIRFLOW_V_3_2_PLUS +from tests_common.test_utils.version_compat import ( + AIRFLOW_V_3_0_PLUS, + AIRFLOW_V_3_1_PLUS, + AIRFLOW_V_3_2_PLUS, + AIRFLOW_V_3_3_PLUS, +) + + +def _make_dagbag(dag_folder): + """DagBag with examples disabled on Airflow <3.3. + + In 3.3+, ``include_examples`` was removed and example DAGs come from + provider example bundles instead. On older versions the default is True, + which loads example DAGs that can fail tests with their required Params. + """ + if AIRFLOW_V_3_3_PLUS: + return DagBag(dag_folder=dag_folder) + return DagBag(dag_folder=dag_folder, include_examples=False) # type: ignore[call-arg] + if AIRFLOW_V_3_0_PLUS: from airflow.models.dag_version import DagVersion @@ -1721,7 +1739,7 @@ def dag_bag_ext(): """ clear_db_runs() - dag_bag = DagBag(dag_folder=DEV_NULL, include_examples=False) + dag_bag = _make_dagbag(DEV_NULL) dag_0 = DAG("dag_0", start_date=DEFAULT_DATE, schedule=None) task_a_0 = EmptyOperator(task_id="task_a_0", dag=dag_0) @@ -1785,7 +1803,7 @@ def dag_bag_parent_child(): """ clear_db_runs() - dag_bag = DagBag(dag_folder=DEV_NULL, include_examples=False) + dag_bag = _make_dagbag(DEV_NULL) day_1 = DEFAULT_DATE @@ -2020,7 +2038,7 @@ def dag_bag_cyclic(): """ def _factory(depth: int) -> DagBag: - dag_bag = DagBag(dag_folder=DEV_NULL, include_examples=False) + dag_bag = _make_dagbag(DEV_NULL) dags = [] @@ -2118,7 +2136,7 @@ def dag_bag_multiple(session): """ Create a DagBag containing two DAGs, linked by multiple ExternalTaskMarker. """ - dag_bag = DagBag(dag_folder=DEV_NULL, include_examples=False) + dag_bag = _make_dagbag(DEV_NULL) daily_dag = DAG("daily_dag", start_date=DEFAULT_DATE, schedule="@daily") agg_dag = DAG("agg_dag", start_date=DEFAULT_DATE, schedule="@daily") if AIRFLOW_V_3_0_PLUS: @@ -2164,7 +2182,7 @@ def dag_bag_head_tail(session): | tail/| | tail/| / | tail | +------+ +------+ +------+ """ - dag_bag = DagBag(dag_folder=DEV_NULL, include_examples=False) + dag_bag = _make_dagbag(DEV_NULL) with DAG("head_tail", start_date=DEFAULT_DATE, schedule="@daily") as dag: head = ExternalTaskSensor( @@ -2209,7 +2227,7 @@ def dag_bag_head_tail_mapped_tasks(session): | tail/| | tail/| / | tail | +------+ +------+ +------+ """ - dag_bag = DagBag(dag_folder=DEV_NULL, include_examples=False) + dag_bag = _make_dagbag(DEV_NULL) with DAG("head_tail", start_date=DEFAULT_DATE, schedule="@daily") as dag: diff --git a/providers/standard/tests/unit/standard/sensors/test_time_delta.py b/providers/standard/tests/unit/standard/sensors/test_time_delta.py index b5c931e3dafbb..5c4eeef2a8eb9 100644 --- a/providers/standard/tests/unit/standard/sensors/test_time_delta.py +++ b/providers/standard/tests/unit/standard/sensors/test_time_delta.py @@ -36,7 +36,12 @@ from airflow.utils.types import DagRunType from tests_common.test_utils import db -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS, timezone +from tests_common.test_utils.version_compat import ( + AIRFLOW_V_3_0_PLUS, + AIRFLOW_V_3_2_PLUS, + AIRFLOW_V_3_3_PLUS, + timezone, +) if AIRFLOW_V_3_2_PLUS: from airflow.dag_processing.dagbag import DagBag @@ -63,7 +68,10 @@ def clear_db(): class TestTimedeltaSensor: def setup_method(self): - self.dagbag = DagBag(dag_folder=DEV_NULL, include_examples=False) + if AIRFLOW_V_3_3_PLUS: + self.dagbag = DagBag(dag_folder=DEV_NULL) + else: + self.dagbag = DagBag(dag_folder=DEV_NULL, include_examples=False) # type: ignore[call-arg] self.dag = DAG(TEST_DAG_ID, schedule=timedelta(days=1), start_date=DEFAULT_DATE) def test_timedelta_sensor(self, mocker): @@ -161,7 +169,10 @@ def test_timedelta_sensor_deferrable_run_after_vs_interval(run_after, interval_e class TestTimeDeltaSensorAsync: def setup_method(self): - self.dagbag = DagBag(dag_folder=DEV_NULL, include_examples=True) + if AIRFLOW_V_3_3_PLUS: + self.dagbag = DagBag(dag_folder=DEV_NULL) + else: + self.dagbag = DagBag(dag_folder=DEV_NULL, include_examples=True) self.args = {"owner": "airflow", "start_date": DEFAULT_DATE} self.dag = DAG(TEST_DAG_ID, schedule=timedelta(days=1), default_args=self.args) diff --git a/providers/standard/tests/unit/standard/sensors/test_weekday.py b/providers/standard/tests/unit/standard/sensors/test_weekday.py index 4f9bac530785d..69f2d82bf7bc8 100644 --- a/providers/standard/tests/unit/standard/sensors/test_weekday.py +++ b/providers/standard/tests/unit/standard/sensors/test_weekday.py @@ -27,7 +27,12 @@ from airflow.providers.standard.utils.weekday import WeekDay from tests_common.test_utils import db -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS, timezone +from tests_common.test_utils.version_compat import ( + AIRFLOW_V_3_0_PLUS, + AIRFLOW_V_3_2_PLUS, + AIRFLOW_V_3_3_PLUS, + timezone, +) if AIRFLOW_V_3_2_PLUS: from airflow.dag_processing.dagbag import DagBag @@ -66,7 +71,10 @@ def clean_db(): def setup_method(self): self.clean_db() - self.dagbag = DagBag(dag_folder=DEV_NULL, include_examples=True) + if AIRFLOW_V_3_3_PLUS: + self.dagbag = DagBag(dag_folder=DEV_NULL) + else: + self.dagbag = DagBag(dag_folder=DEV_NULL, include_examples=True) self.args = {"owner": "airflow", "start_date": DEFAULT_DATE} dag = DAG(TEST_DAG_ID, schedule=timedelta(days=1), default_args=self.args) self.dag = dag