Skip to content

Commit

Permalink
feat: automatically inject OL info into spark job in DataprocInstanti…
Browse files Browse the repository at this point in the history
…ateInlineWorkflowTemplateOperator (#44697)

Signed-off-by: Kacper Muda <[email protected]>
  • Loading branch information
kacpermuda authored Jan 2, 2025
1 parent 0d31fac commit 48a5a0a
Show file tree
Hide file tree
Showing 5 changed files with 438 additions and 0 deletions.
2 changes: 2 additions & 0 deletions docs/exts/templates/openlineage.rst.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ apache-airflow-providers-google
- Parent Job Information
- :class:`~airflow.providers.google.cloud.operators.dataproc.DataprocCreateBatchOperator`
- Parent Job Information
- :class:`~airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateInlineWorkflowTemplateOperator`
- Parent Job Information


:class:`~airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator`
Expand Down
64 changes: 64 additions & 0 deletions providers/src/airflow/providers/google/cloud/openlineage/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,3 +622,67 @@ def inject_openlineage_properties_into_dataproc_batch(

batch_with_ol_config = _replace_dataproc_batch_properties(batch=batch, new_properties=properties)
return batch_with_ol_config


def inject_openlineage_properties_into_dataproc_workflow_template(
template: dict, context: Context, inject_parent_job_info: bool
) -> dict:
"""
Inject OpenLineage properties into Spark jobs in Workflow Template.
Function is not removing any configuration or modifying the jobs in any other way,
apart from adding desired OpenLineage properties to Dataproc job definition if not already present.
Note:
Any modification to job will be skipped if:
- OpenLineage provider is not accessible.
- The job type is not supported.
- Automatic parent job information injection is disabled.
- Any OpenLineage properties with parent job information are already present
in the Spark job definition.
Args:
template: The original Dataproc Workflow Template definition.
context: The Airflow context in which the job is running.
inject_parent_job_info: Flag indicating whether to inject parent job information.
Returns:
The modified Workflow Template definition with OpenLineage properties injected, if applicable.
"""
if not inject_parent_job_info:
log.debug("Automatic injection of OpenLineage information is disabled.")
return template

if not _is_openlineage_provider_accessible():
log.warning(
"Could not access OpenLineage provider for automatic OpenLineage "
"properties injection. No action will be performed."
)
return template

final_jobs = []
for single_job_definition in template["jobs"]:
step_id = single_job_definition["step_id"]
log.debug("Injecting OpenLineage properties into Workflow step: `%s`", step_id)

if (job_type := _extract_supported_job_type_from_dataproc_job(single_job_definition)) is None:
log.debug(
"Could not find a supported Dataproc job type for automatic OpenLineage "
"properties injection. No action will be performed.",
)
final_jobs.append(single_job_definition)
continue

properties = single_job_definition[job_type].get("properties", {})

properties = inject_parent_job_information_into_spark_properties(
properties=properties, context=context
)

job_with_ol_config = _replace_dataproc_job_properties(
job=single_job_definition, job_type=job_type, new_properties=properties
)
final_jobs.append(job_with_ol_config)

template["jobs"] = final_jobs
return template
13 changes: 13 additions & 0 deletions providers/src/airflow/providers/google/cloud/operators/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from airflow.providers.google.cloud.openlineage.utils import (
inject_openlineage_properties_into_dataproc_batch,
inject_openlineage_properties_into_dataproc_job,
inject_openlineage_properties_into_dataproc_workflow_template,
)
from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator
from airflow.providers.google.cloud.triggers.dataproc import (
Expand Down Expand Up @@ -1825,6 +1826,9 @@ def __init__(
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
polling_interval_seconds: int = 10,
cancel_on_kill: bool = True,
openlineage_inject_parent_job_info: bool = conf.getboolean(
"openlineage", "spark_inject_parent_job_info", fallback=False
),
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -1844,11 +1848,20 @@ def __init__(
self.polling_interval_seconds = polling_interval_seconds
self.cancel_on_kill = cancel_on_kill
self.operation_name: str | None = None
self.openlineage_inject_parent_job_info = openlineage_inject_parent_job_info

def execute(self, context: Context):
self.log.info("Instantiating Inline Template")
hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
project_id = self.project_id or hook.project_id
if self.openlineage_inject_parent_job_info:
self.log.info("Automatic injection of OpenLineage information into Spark properties is enabled.")
self.template = inject_openlineage_properties_into_dataproc_workflow_template(
template=self.template,
context=context,
inject_parent_job_info=self.openlineage_inject_parent_job_info,
)

operation = hook.instantiate_inline_workflow_template(
template=self.template,
project_id=project_id,
Expand Down
123 changes: 123 additions & 0 deletions providers/tests/google/cloud/openlineage/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
get_identity_column_lineage_facet,
inject_openlineage_properties_into_dataproc_batch,
inject_openlineage_properties_into_dataproc_job,
inject_openlineage_properties_into_dataproc_workflow_template,
merge_column_lineage_facets,
)

Expand Down Expand Up @@ -829,3 +830,125 @@ def test_inject_openlineage_properties_into_dataproc_batch(mock_is_ol_accessible
}
result = inject_openlineage_properties_into_dataproc_batch(batch, context, True)
assert result == expected_batch


@patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
def test_inject_openlineage_properties_into_dataproc_workflow_template_provider_not_accessible(
mock_is_accessible,
):
mock_is_accessible.return_value = False
template = {"workflow": "template"} # It does not matter what the dict is, we should return it unmodified
result = inject_openlineage_properties_into_dataproc_workflow_template(template, None, True)
assert result == template


@patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
@patch("airflow.providers.google.cloud.openlineage.utils._extract_supported_job_type_from_dataproc_job")
def test_inject_openlineage_properties_into_dataproc_workflow_template_no_inject_parent_job_info(
mock_extract_job_type, mock_is_accessible
):
mock_is_accessible.return_value = True
mock_extract_job_type.return_value = "sparkJob"
inject_parent_job_info = False
template = {"workflow": "template"} # It does not matter what the dict is, we should return it unmodified
result = inject_openlineage_properties_into_dataproc_workflow_template(
template, None, inject_parent_job_info
)
assert result == template


@patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
def test_inject_openlineage_properties_into_dataproc_workflow_template(mock_is_ol_accessible):
mock_is_ol_accessible.return_value = True
context = {
"ti": MagicMock(
dag_id="dag_id",
task_id="task_id",
try_number=1,
map_index=1,
logical_date=dt.datetime(2024, 11, 11),
)
}
template = {
"id": "test-workflow",
"placement": {
"cluster_selector": {
"zone": "europe-central2-c",
"cluster_labels": {"key": "value"},
}
},
"jobs": [
{
"step_id": "job_1",
"pyspark_job": {
"main_python_file_uri": "gs://bucket1/spark_job.py",
"properties": {
"spark.sql.shuffle.partitions": "1",
},
},
},
{
"step_id": "job_2",
"pyspark_job": {
"main_python_file_uri": "gs://bucket2/spark_job.py",
"properties": {
"spark.sql.shuffle.partitions": "1",
"spark.openlineage.parentJobNamespace": "test",
},
},
},
{
"step_id": "job_3",
"hive_job": {
"main_python_file_uri": "gs://bucket3/hive_job.py",
"properties": {
"spark.sql.shuffle.partitions": "1",
},
},
},
],
}
expected_template = {
"id": "test-workflow",
"placement": {
"cluster_selector": {
"zone": "europe-central2-c",
"cluster_labels": {"key": "value"},
}
},
"jobs": [
{
"step_id": "job_1",
"pyspark_job": {
"main_python_file_uri": "gs://bucket1/spark_job.py",
"properties": { # Injected properties
"spark.sql.shuffle.partitions": "1",
"spark.openlineage.parentJobName": "dag_id.task_id",
"spark.openlineage.parentJobNamespace": "default",
"spark.openlineage.parentRunId": "01931885-2800-7be7-aa8d-aaa15c337267",
},
},
},
{
"step_id": "job_2",
"pyspark_job": { # Not modified because it's already present
"main_python_file_uri": "gs://bucket2/spark_job.py",
"properties": {
"spark.sql.shuffle.partitions": "1",
"spark.openlineage.parentJobNamespace": "test",
},
},
},
{
"step_id": "job_3",
"hive_job": { # Not modified because it's unsupported job type
"main_python_file_uri": "gs://bucket3/hive_job.py",
"properties": {
"spark.sql.shuffle.partitions": "1",
},
},
},
],
}
result = inject_openlineage_properties_into_dataproc_workflow_template(template, context, True)
assert result == expected_template
Loading

0 comments on commit 48a5a0a

Please sign in to comment.