Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: automatically inject OL transport info into spark jobs #45326

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

kacpermuda
Copy link
Contributor

Similar to #44477 , this PR introduces a new feature to OpenLineage integration. It will NOT impact users that are not using OpenLineage or have not explicitly enabled this feature (False by default).

TLDR;

When explicitly enabled by the user for supported operators, we will automatically inject transport information into the Spark job properties. For example, when submitting a Spark job using the DataprocSubmitJobOperator, we will configure Spark/OpenLineage integration to use the same transport configuration that Airflow integration uses.

Why ?

Currently, this process requires manual configuration by the user, as described here. E.g.:

DataprocSubmitJobOperator(
    task_id="my_task", 
    # ... 
    job={ 
        # ...
        "spark.openlineage.transport.type": "http",
        "spark.openlineage.transport.url": openlineage_url,
        "spark.openlineage.transport.compression": "gzip",
        "spark.openlineage.transport.auth.apiKey": api_key,
        "spark.openlineage.transport.auth.type": "apiKey",
    } 
)

Understanding how various Airflow operators configure Spark allows us to automatically inject transport information.

Controlling the Behavior

We provide users with a flexible control mechanism to manage this injection, combining per-operator enablement with a global fallback configuration. This design is inspired by the deferrable argument in Airflow.

ol_inject_transport_info: bool = conf.getboolean(
    "openlineage", "spark_inject_transport_info", fallback=False
)

Each supported operator will include an argument like ol_inject_transport_info, which defaults to the global configuration value of openlineage.spark_inject_transport_info. This approach allows users to:

  1. Control behavior on a per-job basis by explicitly setting the argument.
  2. Rely on a consistent default configuration for all jobs if the argument is not set.

This design ensures both flexibility and ease of use, enabling users to fine-tune their workflows while minimizing repetitive configuration. I am aware that adding an OpenLineage-related argument to the operator will affect all users, even those not using OpenLineage, but since it defaults to False and can be ignored, I hope this will not pose any issues.

How?

The implementation is divided into three parts for better organization and clarity:

  1. Operator's Code (including the execute method):
    Contains minimal logic to avoid overwhelming users who are not actively working with OpenLineage.

  2. Google's Provider OpenLineage Utils File:
    Handles the logic for accessing Spark properties specific to a given operator or job.

  3. OpenLineage Provider's Utils:
    Responsible for creating / extracting all necessary information in a format compatible with the OpenLineage Spark integration. We are also performing modifications to the Spark properties here.

For some operators parts 1 and 2 may be in the operator's code. In general, the specific operator / provider will know how to get the spark properties and the OL will know what to inject and do the injection itself.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@kacpermuda kacpermuda force-pushed the feat-ol-inject-transport-dataproc-job branch 4 times, most recently from d9a52b6 to 78b57e4 Compare January 2, 2025 10:06
@kacpermuda kacpermuda marked this pull request as ready for review January 2, 2025 10:11
@kacpermuda kacpermuda force-pushed the feat-ol-inject-transport-dataproc-job branch from 78b57e4 to 9f45650 Compare January 2, 2025 12:13
@potiuk potiuk force-pushed the feat-ol-inject-transport-dataproc-job branch from 9f45650 to 6a85faa Compare January 2, 2025 12:15
@potiuk
Copy link
Member

potiuk commented Jan 2, 2025

Hey @kacpermuda -> I rebased your PR -> we found and issue with @jscheffl with the new caching scheme - fixed in #45347 that would run "main" version of the tests-> so I am rebasing all PRs affected :)

@kacpermuda kacpermuda force-pushed the feat-ol-inject-transport-dataproc-job branch 5 times, most recently from 179acb9 to 1523cd9 Compare January 3, 2025 17:36
@kacpermuda kacpermuda force-pushed the feat-ol-inject-transport-dataproc-job branch from 1523cd9 to 4f46713 Compare January 6, 2025 11:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants