Skip to content

Commit 20384d9

Browse files
committed
Revert "AIP-66: Add support for parsing DAG bundles (#45371)"
This reverts commit 72ab1d5.
1 parent 40fd78f commit 20384d9

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+682
-1043
lines changed

airflow/cli/commands/local_commands/dag_processor_command.py

+1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ def _create_dag_processor_job_runner(args: Any) -> DagProcessorJobRunner:
3939
job=Job(),
4040
processor=DagFileProcessorManager(
4141
processor_timeout=processor_timeout_seconds,
42+
dag_directory=args.subdir,
4243
max_runs=args.num_runs,
4344
),
4445
)

airflow/dag_processing/bundles/manager.py

-17
Original file line numberDiff line numberDiff line change
@@ -64,23 +64,6 @@ def parse_config(self) -> None:
6464
"Bundle config is not a list. Check config value"
6565
" for section `dag_bundles` and key `backends`."
6666
)
67-
68-
# example dags!
69-
if conf.getboolean("core", "LOAD_EXAMPLES"):
70-
from airflow import example_dags
71-
72-
example_dag_folder = next(iter(example_dags.__path__))
73-
backends.append(
74-
{
75-
"name": "example_dags",
76-
"classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
77-
"kwargs": {
78-
"local_folder": example_dag_folder,
79-
"refresh_interval": conf.getint("scheduler", "dag_dir_list_interval"),
80-
},
81-
}
82-
)
83-
8467
seen = set()
8568
for cfg in backends:
8669
name = cfg["name"]

airflow/dag_processing/collection.py

+3-13
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,11 @@
7474
log = logging.getLogger(__name__)
7575

7676

77-
def _create_orm_dags(
78-
bundle_name: str, dags: Iterable[MaybeSerializedDAG], *, session: Session
79-
) -> Iterator[DagModel]:
77+
def _create_orm_dags(dags: Iterable[MaybeSerializedDAG], *, session: Session) -> Iterator[DagModel]:
8078
for dag in dags:
8179
orm_dag = DagModel(dag_id=dag.dag_id)
8280
if dag.is_paused_upon_creation is not None:
8381
orm_dag.is_paused = dag.is_paused_upon_creation
84-
orm_dag.bundle_name = bundle_name
8582
log.info("Creating ORM DAG for %s", dag.dag_id)
8683
session.add(orm_dag)
8784
yield orm_dag
@@ -273,8 +270,6 @@ def _update_import_errors(files_parsed: set[str], import_errors: dict[str, str],
273270

274271

275272
def update_dag_parsing_results_in_db(
276-
bundle_name: str,
277-
bundle_version: str | None,
278273
dags: Collection[MaybeSerializedDAG],
279274
import_errors: dict[str, str],
280275
warnings: set[DagWarning],
@@ -312,7 +307,7 @@ def update_dag_parsing_results_in_db(
312307
)
313308
log.debug("Calling the DAG.bulk_sync_to_db method")
314309
try:
315-
DAG.bulk_write_to_db(bundle_name, bundle_version, dags, session=session)
310+
DAG.bulk_write_to_db(dags, session=session)
316311
# Write Serialized DAGs to DB, capturing errors
317312
# Write Serialized DAGs to DB, capturing errors
318313
for dag in dags:
@@ -351,8 +346,6 @@ class DagModelOperation(NamedTuple):
351346
"""Collect DAG objects and perform database operations for them."""
352347

353348
dags: dict[str, MaybeSerializedDAG]
354-
bundle_name: str
355-
bundle_version: str | None
356349

357350
def find_orm_dags(self, *, session: Session) -> dict[str, DagModel]:
358351
"""Find existing DagModel objects from DAG objects."""
@@ -372,8 +365,7 @@ def add_dags(self, *, session: Session) -> dict[str, DagModel]:
372365
orm_dags.update(
373366
(model.dag_id, model)
374367
for model in _create_orm_dags(
375-
bundle_name=self.bundle_name,
376-
dags=(dag for dag_id, dag in self.dags.items() if dag_id not in orm_dags),
368+
(dag for dag_id, dag in self.dags.items() if dag_id not in orm_dags),
377369
session=session,
378370
)
379371
)
@@ -438,8 +430,6 @@ def update_dags(
438430
dm.timetable_summary = dag.timetable.summary
439431
dm.timetable_description = dag.timetable.description
440432
dm.asset_expression = dag.timetable.asset_condition.as_expression()
441-
dm.bundle_name = self.bundle_name
442-
dm.latest_bundle_version = self.bundle_version
443433

444434
last_automated_run: DagRun | None = run_info.latest_runs.get(dag.dag_id)
445435
if last_automated_run is None:

0 commit comments

Comments
 (0)