Skip to content

Commit

Permalink
OpenLineage: Include AirflowDagRunFacet in complete/failed events (a…
Browse files Browse the repository at this point in the history
…pache#45615)

Signed-off-by: Maciej Obuchowski <[email protected]>
  • Loading branch information
mobuchowski authored and karenbraganz committed Jan 13, 2025
1 parent e855d1d commit e7e68d2
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ def dag_success(
clear_number: int,
dag_run_state: DagRunState,
task_ids: list[str],
run_facets: dict[str, RunFacet],
):
try:
event = RunEvent(
Expand All @@ -390,6 +391,7 @@ def dag_success(
facets={
**get_airflow_state_run_facet(dag_id, run_id, task_ids, dag_run_state),
**get_airflow_debug_facet(),
**run_facets,
},
),
inputs=[],
Expand All @@ -413,6 +415,7 @@ def dag_failed(
dag_run_state: DagRunState,
task_ids: list[str],
msg: str,
run_facets: dict[str, RunFacet],
):
try:
event = RunEvent(
Expand All @@ -431,6 +434,7 @@ def dag_failed(
),
**get_airflow_state_run_facet(dag_id, run_id, task_ids, dag_run_state),
**get_airflow_debug_facet(),
**run_facets,
},
),
inputs=[],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,7 @@ def on_dag_run_success(self, dag_run: DagRun, msg: str) -> None:
task_ids = DagRun._get_partial_task_ids(dag_run.dag)
else:
task_ids = dag_run.dag.task_ids if dag_run.dag and dag_run.dag.partial else None

self.submit_callable(
self.adapter.dag_success,
dag_id=dag_run.dag_id,
Expand All @@ -509,6 +510,7 @@ def on_dag_run_success(self, dag_run: DagRun, msg: str) -> None:
clear_number=dag_run.clear_number,
task_ids=task_ids,
dag_run_state=dag_run.get_state(),
run_facets={**get_airflow_dag_run_facet(dag_run)},
)
except BaseException as e:
self.log.warning("OpenLineage received exception in method on_dag_run_success", exc_info=e)
Expand Down Expand Up @@ -543,6 +545,7 @@ def on_dag_run_failed(self, dag_run: DagRun, msg: str) -> None:
dag_run_state=dag_run.get_state(),
task_ids=task_ids,
msg=msg,
run_facets={**get_airflow_dag_run_facet(dag_run)},
)
except BaseException as e:
self.log.warning("OpenLineage received exception in method on_dag_run_failed", exc_info=e)
Expand Down
4 changes: 4 additions & 0 deletions providers/tests/openlineage/plugins/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,7 @@ def test_emit_dag_complete_event(
clear_number=0,
dag_run_state=DagRunState.SUCCESS,
task_ids=["task_0", "task_1", "task_2.test"],
run_facets={"airflowDagRun": AirflowDagRunFacet(dag={"description": "dag desc"}, dagRun=dag_run)},
)

client.emit.assert_called_once_with(
Expand All @@ -731,6 +732,7 @@ def test_emit_dag_complete_event(
},
),
"debug": AirflowDebugRunFacet(packages=ANY),
"airflowDagRun": AirflowDagRunFacet(dag={"description": "dag desc"}, dagRun=dag_run),
},
),
job=Job(
Expand Down Expand Up @@ -804,6 +806,7 @@ def test_emit_dag_failed_event(
dag_run_state=DagRunState.FAILED,
task_ids=["task_0", "task_1", "task_2.test"],
msg="error msg",
run_facets={"airflowDagRun": AirflowDagRunFacet(dag={"description": "dag desc"}, dagRun=dag_run)},
)

client.emit.assert_called_once_with(
Expand All @@ -825,6 +828,7 @@ def test_emit_dag_failed_event(
},
),
"debug": AirflowDebugRunFacet(packages=ANY),
"airflowDagRun": AirflowDagRunFacet(dag={"description": "dag desc"}, dagRun=dag_run),
},
),
job=Job(
Expand Down

0 comments on commit e7e68d2

Please sign in to comment.