diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index a4e99e06a9dd2..b5429a484e9ea 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -407,7 +407,7 @@ def run(self): DagBundlesManager().sync_bundles_to_db() self.log.info("Getting all DAG bundles") - self._dag_bundles = DagBundlesManager().get_all_dag_bundles() + self._dag_bundles = list(DagBundlesManager().get_all_dag_bundles()) return self._run_parsing_loop() diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py index 90f88ddd030a8..b6609c162390d 100644 --- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py @@ -91,7 +91,7 @@ def _create_dag(self, dag_id): dag = DAG(dag_id=dag_id, schedule=None, params={"validated_number": Param(1, minimum=1, maximum=10)}) DagBundlesManager().sync_bundles_to_db() self.app.dag_bag.bag_dag(dag) - self.app.dag_bag.sync_to_db("dags_folder", None) + self.app.dag_bag.sync_to_db("dags-folder", None) return dag_instance def _create_test_dag_run(self, state=DagRunState.RUNNING, extra_dag=False, commit=True, idx_start=1): diff --git a/tests/api_connexion/endpoints/test_extra_link_endpoint.py b/tests/api_connexion/endpoints/test_extra_link_endpoint.py index de58eaa243fec..e4aa7895ac662 100644 --- a/tests/api_connexion/endpoints/test_extra_link_endpoint.py +++ b/tests/api_connexion/endpoints/test_extra_link_endpoint.py @@ -77,7 +77,7 @@ def setup_attrs(self, configured_app, session) -> None: DagBundlesManager().sync_bundles_to_db() self.app.dag_bag = DagBag(os.devnull, include_examples=False) self.app.dag_bag.dags = {self.dag.dag_id: self.dag} - self.app.dag_bag.sync_to_db("dags_folder", None) + self.app.dag_bag.sync_to_db("dags-folder", None) triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} self.dag.create_dagrun( diff --git a/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py index 1141cad8788c7..7a4d802f6cc15 100644 --- a/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py +++ b/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py @@ -126,7 +126,7 @@ def create_dag_runs_with_mapped_tasks(self, dag_maker, session, dags=None): DagBundlesManager().sync_bundles_to_db() self.app.dag_bag = DagBag(os.devnull, include_examples=False) self.app.dag_bag.dags = {dag_id: dag_maker.dag} - self.app.dag_bag.sync_to_db("dags_folder", None) + self.app.dag_bag.sync_to_db("dags-folder", None) session.flush() mapped.expand_mapped_task(dr.run_id, session=session) diff --git a/tests/api_connexion/endpoints/test_task_endpoint.py b/tests/api_connexion/endpoints/test_task_endpoint.py index 5926b6424e8e8..7e593731d539d 100644 --- a/tests/api_connexion/endpoints/test_task_endpoint.py +++ b/tests/api_connexion/endpoints/test_task_endpoint.py @@ -89,7 +89,7 @@ def setup_dag(self, configured_app): mapped_dag.dag_id: mapped_dag, unscheduled_dag.dag_id: unscheduled_dag, } - dag_bag.sync_to_db("dags_folder", None) + dag_bag.sync_to_db("dags-folder", None) configured_app.dag_bag = dag_bag # type:ignore @staticmethod diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_task_instance_endpoint.py index b06742da8aa5c..b5079c47aa17e 100644 --- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py +++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py @@ -1224,7 +1224,7 @@ def test_should_respond_200(self, main_dag, task_instances, request_dag, payload task_instances=task_instances, update_extras=False, ) - self.app.dag_bag.sync_to_db("dags_folder", None) + self.app.dag_bag.sync_to_db("dags-folder", None) response = self.client.post( f"/api/v1/dags/{request_dag}/clearTaskInstances", environ_overrides={"REMOTE_USER": "test"}, @@ -1246,7 +1246,7 @@ def test_clear_taskinstance_is_called_with_queued_dr_state(self, mock_clearti, s self.create_task_instances(session) dag_id = "example_python_operator" payload = {"reset_dag_runs": True, "dry_run": False} - self.app.dag_bag.sync_to_db("dags_folder", None) + self.app.dag_bag.sync_to_db("dags-folder", None) response = self.client.post( f"/api/v1/dags/{dag_id}/clearTaskInstances", environ_overrides={"REMOTE_USER": "test"}, @@ -1266,7 +1266,7 @@ def test_clear_taskinstance_is_called_with_invalid_task_ids(self, session): assert dagrun.state == "running" payload = {"dry_run": False, "reset_dag_runs": True, "task_ids": [""]} - self.app.dag_bag.sync_to_db("dags_folder", None) + self.app.dag_bag.sync_to_db("dags-folder", None) response = self.client.post( f"/api/v1/dags/{dag_id}/clearTaskInstances", environ_overrides={"REMOTE_USER": "test"}, @@ -1693,7 +1693,7 @@ def test_should_raise_400_for_naive_and_bad_datetime(self, payload, expected, se task_instances=task_instances, update_extras=False, ) - self.app.dag_bag.sync_to_db("dags_folder", None) + self.app.dag_bag.sync_to_db("dags-folder", None) response = self.client.post( "/api/v1/dags/example_python_operator/clearTaskInstances", environ_overrides={"REMOTE_USER": "test"}, diff --git a/tests/api_fastapi/core_api/routes/public/test_extra_links.py b/tests/api_fastapi/core_api/routes/public/test_extra_links.py index 0be767538eed3..89aef555bbe20 100644 --- a/tests/api_fastapi/core_api/routes/public/test_extra_links.py +++ b/tests/api_fastapi/core_api/routes/public/test_extra_links.py @@ -72,7 +72,7 @@ def setup(self, test_client, session=None) -> None: dag_bag = DagBag(os.devnull, include_examples=False) dag_bag.dags = {self.dag.dag_id: self.dag} test_client.app.state.dag_bag = dag_bag - dag_bag.sync_to_db("dags_folder", None) + dag_bag.sync_to_db("dags-folder", None) triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} diff --git a/tests/api_fastapi/core_api/routes/public/test_task_instances.py b/tests/api_fastapi/core_api/routes/public/test_task_instances.py index 9be7901b9d197..f0c3c888807ea 100644 --- a/tests/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/tests/api_fastapi/core_api/routes/public/test_task_instances.py @@ -526,7 +526,7 @@ def create_dag_runs_with_mapped_tasks(self, dag_maker, session, dags=None): DagBundlesManager().sync_bundles_to_db() dagbag = DagBag(os.devnull, include_examples=False) dagbag.dags = {dag_id: dag_maker.dag} - dagbag.sync_to_db("dags_folder", None) + dagbag.sync_to_db("dags-folder", None) session.flush() mapped.expand_mapped_task(dr.run_id, session=session) @@ -1859,7 +1859,7 @@ def test_should_respond_200( task_instances=task_instances, update_extras=False, ) - self.dagbag.sync_to_db("dags_folder", None) + self.dagbag.sync_to_db("dags-folder", None) response = test_client.post( f"/public/dags/{request_dag}/clearTaskInstances", json=payload, @@ -1873,7 +1873,7 @@ def test_clear_taskinstance_is_called_with_queued_dr_state(self, mock_clearti, t self.create_task_instances(session) dag_id = "example_python_operator" payload = {"reset_dag_runs": True, "dry_run": False} - self.dagbag.sync_to_db("dags_folder", None) + self.dagbag.sync_to_db("dags-folder", None) response = test_client.post( f"/public/dags/{dag_id}/clearTaskInstances", json=payload, @@ -1893,7 +1893,7 @@ def test_clear_taskinstance_is_called_with_invalid_task_ids(self, test_client, s assert dagrun.state == "running" payload = {"dry_run": False, "reset_dag_runs": True, "task_ids": [""]} - self.dagbag.sync_to_db("dags_folder", None) + self.dagbag.sync_to_db("dags-folder", None) response = test_client.post( f"/public/dags/{dag_id}/clearTaskInstances", json=payload, @@ -1943,7 +1943,7 @@ def test_should_respond_200_with_reset_dag_run(self, test_client, session): update_extras=False, dag_run_state=DagRunState.FAILED, ) - self.dagbag.sync_to_db("dags_folder", None) + self.dagbag.sync_to_db("dags-folder", None) response = test_client.post( f"/public/dags/{dag_id}/clearTaskInstances", json=payload, @@ -2028,7 +2028,7 @@ def test_should_respond_200_with_dag_run_id(self, test_client, session): update_extras=False, dag_run_state=State.FAILED, ) - self.dagbag.sync_to_db("dags_folder", None) + self.dagbag.sync_to_db("dags-folder", None) response = test_client.post( f"/public/dags/{dag_id}/clearTaskInstances", json=payload, @@ -2084,7 +2084,7 @@ def test_should_respond_200_with_include_past(self, test_client, session): update_extras=False, dag_run_state=State.FAILED, ) - self.dagbag.sync_to_db("dags_folder", None) + self.dagbag.sync_to_db("dags-folder", None) response = test_client.post( f"/public/dags/{dag_id}/clearTaskInstances", json=payload, @@ -2166,7 +2166,7 @@ def test_should_respond_200_with_include_future(self, test_client, session): update_extras=False, dag_run_state=State.FAILED, ) - self.dagbag.sync_to_db("dags_folder", None) + self.dagbag.sync_to_db("dags-folder", None) response = test_client.post( f"/public/dags/{dag_id}/clearTaskInstances", json=payload, @@ -2314,7 +2314,7 @@ def test_should_raise_400_for_naive_and_bad_datetime(self, test_client, session, task_instances=task_instances, update_extras=False, ) - self.dagbag.sync_to_db("dags_folder", None) + self.dagbag.sync_to_db("dags-folder", None) response = test_client.post( "/public/dags/example_python_operator/clearTaskInstances", json=payload, diff --git a/tests/cli/commands/remote_commands/test_task_command.py b/tests/cli/commands/remote_commands/test_task_command.py index 05e90dddcd3a1..9a4c606caa469 100644 --- a/tests/cli/commands/remote_commands/test_task_command.py +++ b/tests/cli/commands/remote_commands/test_task_command.py @@ -164,7 +164,7 @@ def test_cli_test_different_path(self, session, tmp_path): with conf_vars({("core", "dags_folder"): orig_dags_folder.as_posix()}): dagbag = DagBag(include_examples=False) dag = dagbag.get_dag("test_dags_folder") - dagbag.sync_to_db("dags_folder", None, session=session) + dagbag.sync_to_db("dags-folder", None, session=session) logical_date = pendulum.now("UTC") data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date) diff --git a/tests/conftest.py b/tests/conftest.py index 7194e8f307eae..8806dc1e8c1ad 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -104,13 +104,14 @@ def configure_testing_dag_bundle(): @contextmanager def _config_bundle(path_to_parse: Path | str): - bundle_config = { - "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", - "kwargs": {"local_folder": str(path_to_parse), "refresh_interval": 30}, - } - with conf_vars( - {("dag_bundles", "testing"): json.dumps(bundle_config), ("dag_bundles", "dags_folder"): ""} - ): + bundle_config = [ + { + "name": "testing", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"local_folder": str(path_to_parse), "refresh_interval": 30}, + } + ] + with conf_vars({("dag_bundles", "backends"): json.dumps(bundle_config)}): yield return _config_bundle diff --git a/tests_common/test_utils/db.py b/tests_common/test_utils/db.py index d8abb8cda1a38..58ad46372d711 100644 --- a/tests_common/test_utils/db.py +++ b/tests_common/test_utils/db.py @@ -72,7 +72,7 @@ def _bootstrap_dagbag(): dagbag = DagBag() # Save DAGs in the ORM if AIRFLOW_V_3_0_PLUS: - dagbag.sync_to_db(bundle_name="dags_folder", bundle_version=None, session=session) + dagbag.sync_to_db(bundle_name="dags-folder", bundle_version=None, session=session) else: dagbag.sync_to_db(session=session) @@ -120,7 +120,7 @@ def parse_and_sync_to_db(folder: Path | str, include_examples: bool = False): dagbag = DagBag(dag_folder=folder, include_examples=include_examples) if AIRFLOW_V_3_0_PLUS: - dagbag.sync_to_db("dags_folder", None, session) + dagbag.sync_to_db("dags-folder", None, session) else: dagbag.sync_to_db(session=session) # type: ignore[call-arg] session.commit()