Skip to content

Commit

Permalink
AIP-66: Add support for parsing DAG bundles (apache#45371)
Browse files Browse the repository at this point in the history
Let's start parsing DAG bundles! This moves us away from parsing a
single local directory to being able to parse many different bundles,
including optional support for versioning.

This is just the basics - it keeps the parsing loop largely untouched.
We still have a single list of "dag files" to parse, and queue of them.
However, instead of just a path, this list and queue now contain
`DagFilePath`s, which hold both a local path and the bundle its from.

There are a number of things that are not fully functional at this
stage, like versioned callbacks. These will be refactored later. There
is enough churn with the basics (particularly with the number of test
changes).

Co-authored-by: Daniel Standish <[email protected]>
  • Loading branch information
2 people authored and karenbraganz committed Jan 13, 2025
1 parent b54269b commit 5bfc96f
Show file tree
Hide file tree
Showing 54 changed files with 1,043 additions and 682 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ def _create_dag_processor_job_runner(args: Any) -> DagProcessorJobRunner:
job=Job(),
processor=DagFileProcessorManager(
processor_timeout=processor_timeout_seconds,
dag_directory=args.subdir,
max_runs=args.num_runs,
),
)
Expand Down
17 changes: 17 additions & 0 deletions airflow/dag_processing/bundles/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,23 @@ def parse_config(self) -> None:
"Bundle config is not a list. Check config value"
" for section `dag_bundles` and key `backends`."
)

# example dags!
if conf.getboolean("core", "LOAD_EXAMPLES"):
from airflow import example_dags

example_dag_folder = next(iter(example_dags.__path__))
backends.append(
{
"name": "example_dags",
"classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
"kwargs": {
"local_folder": example_dag_folder,
"refresh_interval": conf.getint("scheduler", "dag_dir_list_interval"),
},
}
)

seen = set()
for cfg in backends:
name = cfg["name"]
Expand Down
16 changes: 13 additions & 3 deletions airflow/dag_processing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,14 @@
log = logging.getLogger(__name__)


def _create_orm_dags(dags: Iterable[MaybeSerializedDAG], *, session: Session) -> Iterator[DagModel]:
def _create_orm_dags(
bundle_name: str, dags: Iterable[MaybeSerializedDAG], *, session: Session
) -> Iterator[DagModel]:
for dag in dags:
orm_dag = DagModel(dag_id=dag.dag_id)
if dag.is_paused_upon_creation is not None:
orm_dag.is_paused = dag.is_paused_upon_creation
orm_dag.bundle_name = bundle_name
log.info("Creating ORM DAG for %s", dag.dag_id)
session.add(orm_dag)
yield orm_dag
Expand Down Expand Up @@ -270,6 +273,8 @@ def _update_import_errors(files_parsed: set[str], import_errors: dict[str, str],


def update_dag_parsing_results_in_db(
bundle_name: str,
bundle_version: str | None,
dags: Collection[MaybeSerializedDAG],
import_errors: dict[str, str],
warnings: set[DagWarning],
Expand Down Expand Up @@ -307,7 +312,7 @@ def update_dag_parsing_results_in_db(
)
log.debug("Calling the DAG.bulk_sync_to_db method")
try:
DAG.bulk_write_to_db(dags, session=session)
DAG.bulk_write_to_db(bundle_name, bundle_version, dags, session=session)
# Write Serialized DAGs to DB, capturing errors
# Write Serialized DAGs to DB, capturing errors
for dag in dags:
Expand Down Expand Up @@ -346,6 +351,8 @@ class DagModelOperation(NamedTuple):
"""Collect DAG objects and perform database operations for them."""

dags: dict[str, MaybeSerializedDAG]
bundle_name: str
bundle_version: str | None

def find_orm_dags(self, *, session: Session) -> dict[str, DagModel]:
"""Find existing DagModel objects from DAG objects."""
Expand All @@ -365,7 +372,8 @@ def add_dags(self, *, session: Session) -> dict[str, DagModel]:
orm_dags.update(
(model.dag_id, model)
for model in _create_orm_dags(
(dag for dag_id, dag in self.dags.items() if dag_id not in orm_dags),
bundle_name=self.bundle_name,
dags=(dag for dag_id, dag in self.dags.items() if dag_id not in orm_dags),
session=session,
)
)
Expand Down Expand Up @@ -430,6 +438,8 @@ def update_dags(
dm.timetable_summary = dag.timetable.summary
dm.timetable_description = dag.timetable.description
dm.asset_expression = dag.timetable.asset_condition.as_expression()
dm.bundle_name = self.bundle_name
dm.latest_bundle_version = self.bundle_version

last_automated_run: DagRun | None = run_info.latest_runs.get(dag.dag_id)
if last_automated_run is None:
Expand Down
Loading

0 comments on commit 5bfc96f

Please sign in to comment.