Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide an alternative OpenTelemetry implementation for traces that follows standard otel practices #43941

Open
wants to merge 54 commits into
base: main
Choose a base branch
from

Conversation

xBis7
Copy link

@xBis7 xBis7 commented Nov 12, 2024

This is my first time contributing to Airflow and I'm not sure if there should be an AIP or a mailing discussion for such changes. I'd appreciate any feedback.

Issue description

related: #40802

There are some OpenTelemetry standard practices that help keep the usage consistent across multiple projects. According to those

  • when a root span starts, the trace id and the span id are generated randomly
  • while the span is active, the context is captured
    • injected into a carrier object which is a map or a python dictionary
  • the carrier with the captured context is propagated across services and used to create sub spans
  • the new sub-span extracts the parent context from the carrier and uses it to set the parent
    • the trace id is accessed from the carrier and the span id is generated randomly

To explain more, this is what the flow of operations should be

  1. Dag run starts - start dagrun span
    i. Get the dagrun span context
  2. Start task - with the dagrun context, start ti span
    i. Get the ti span context
  3. With the ti span context create task-sub span
  4. Task finishes - ti span ends
  5. Dag run finishes - dagrun span ends

Airflow follows a different approach

  • trace and span ids are generated in a deterministic way from various properties of the dag_run and the task_instance
  • the spans for a dag and its tasks, are generated after the run has finished

This is the flow of the current implementation

  1. Dag run starts
  2. Tasks start
  3. Tasks finish
  4. Create spans for the tasks
  5. Dag run finishes
  6. Create span for the dag run

The current approach makes it impossible to create spans from under tasks while using the existing airflow code. To achieve that, you need to use https://github.com/howardyoo/airflow_otel_provider which has to generate the same trace id and span id as airflow otherwise the spans won't be properly associated. This isn't easily maintainable and it also makes it hard for people that are familiar with otel but new to airflow, to start using the feature.

These are some references to OpenTelemetry docs

https://opentelemetry.io/docs/concepts/context-propagation/
https://opentelemetry.io/docs/languages/python/propagation/
https://www.w3.org/TR/trace-context/

Implementation description

For the dagrun and ti spans, I've reused the attributes from the original implementation. I found in my testing that the span timings were occasionally off. That was due to the fact that the time conversion to nanoseconds wasn't considering that the timezone isn't always present.

To be able to propagate the context of a span, and use it to create subspan, the span must be active.

For example, for a dag run, the span can't be created at the end but

  • it needs to start with the run
  • stay active so that the context can be captured and propagated
  • end once the run also ends

Same goes for a task and any sub-spans.

With this approach, we can use the new otel methods for creating spans directly from under a task without the need of the airflow_otel_provider. These spans will be children of the task span.

Check test_otel_dag.py for an example of usage.

In scheduler HA, multiple schedulers might process a dag, that is running for a long time. The OpenTelemetry philosophy is that spans can't be shared between processes. The process that starts a span should also be the only one that can end it.

To workaround that while staying consistent with the otel spec, each scheduler will keep track of the spans that it starts and will also be solely responsible for ending them.

Let's assume that we are expecting these spans to be created and exported

dag_span
    |_ task1_span
          |_ task1_sub1_span
                |_ task1_sub1_sub_span
          |_ task1_sub2_span
    |_ task2_span

With the existing airflow implementation, the spans will be

dag_span
    |_ task1_span
    |_ task2_span

Here are some possible scenarios and how the dag spans will look like, assuming that the dag creates the same spans as above

  1. Scheduler1 processes a dag from start to finish.

    dag_span
        |_ task1_span
              |_ task1_sub1_span
                    |_ task1_sub1_sub_span
              |_ task1_sub2_span
        |_ task2_span
    
  2. Scheduler1 starts a dag and sometime during the run, Scheduler2 picks up the processing and finishes the dag.

    Through a db update, Scheduler2 will notify whoever scheduler started the spans, to end them.

    dag_span
        |_ task1_span
              |_ task1_sub1_span
                    |_ task1_sub1_sub_span
              |_ task1_sub2_span
        |_ task2_span
    
  3. Scheduler1 starts a dag and while the dag is running, exits gracefully. Scheduler2 picks up the processing and finishes the dag while the initial scheduler is no longer running.

    Scheduler1 will end all active spans that it has a record of and update the db, while exiting. Scheduler2 will create continued spans. This will result in 2 or more spans in place of a long span, but the total duration will be the same. The continued span will be the new parent.

    dag_span
        |_ task1_span
        |_ scheduler_exits_span
        |_ new_scheduler_span
        |_ dag_span_continued
              |_ task1_span_continued
                    |_ task1_sub1_span
                          |_ task1_sub1_sub_span
                    |_ task1_sub2_span
              |_ task2_span
    
  4. Scheduler1 starts a dag and while the dag is running, exits forcefully. Scheduler2 picks up the processing and finishes the dag while the initial scheduler is no longer running.

    Scheduler1 didn't have time to end any spans. Once the new scheduler picks up that the scheduler that started the spans, is unhealthy, it will recreate the lost spans.

    dag_span_recreated
        |_ task1_span_recreated
              |_ task1_sub1_span
                    |_ task1_sub1_sub_span
              |_ task1_sub2_span
        |_ task2_span
    

Testing

  • Added a new unit test for the otel_tracer methods and updated the existing ones
  • Added an integration test for each possible scenario
  • Updated the existing tests to handle the changes
  • Tested the changes manually with a PythonVirtualenvOperator without issues

The integration test can be used with otel and jaeger as well. To do that, follow the steps on this comment.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added area:CLI area:Executors-core LocalExecutor & SequentialExecutor area:Scheduler including HA (high availability) scheduler area:serialization kind:documentation labels Nov 12, 2024
Copy link

boring-cyborg bot commented Nov 12, 2024

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: [email protected]
    Slack: https://s.apache.org/airflow-slack

@ashb
Copy link
Member

ashb commented Nov 12, 2024

I'm confirming, but otel traces might have been experimental still, and if that's the case we are free to change them for a good reason, and your description certainly sounds like one!

@ferruzzi
Copy link
Contributor

ferruzzi commented Nov 12, 2024

If I may, can you add some indication to the title that this is related to OTel Traces specifically? We also have OTel Metrics implemented and it would be nice to minimize confusion.

@xBis7 xBis7 changed the title Provide an alternative OpenTelemetry implementation that follows standard otel practices Provide an alternative OpenTelemetry implementation for traces that follows standard otel practices Nov 13, 2024
@xBis7
Copy link
Author

xBis7 commented Nov 13, 2024

@ferruzzi I adjusted the title. The only change in this patch that is related to metrics, it's https://github.com/apache/airflow/pull/43941/files#diff-1cca954ec0be1aaf2c212e718c004cb0902a96ac60043bf0c97a782dee52cc32R85-R86

If you think that it's out of scope, then I can remove it.

@xBis7
Copy link
Author

xBis7 commented Dec 2, 2024

My initial approach wasn't considering scheduler HA. I've updated the patch accordingly.

There have been two main challenges

  • Opentelemetry spans are designed so that only the process that starts them, can end them
    • The span objects can't be shared or stored to a db
  • The airflow philosophy for scheduler HA is that the only shared state between multiple schedulers is the db
    • It is very common that one scheduler starts a dag (also starts the span) and another scheduler finishes the dag (should end the span)

To avoid breaking things, each scheduler will have a list of the spans that it started and will be solely responsible for ending them. We will save two new attributes on the dagrun table and the ti table.

The new columns will be

  • context_carrier
    • this is used for propagating the context and creating sub spans
  • span_status
    • this is keeping track of the span status and notifying each scheduler of how to handle the span

Possible scenarios with 2 scheduler (this can easily work with more)

  1. Scheduler1 starts a dag and finishes processing it
    • This is straight forward
      dag span
        |_ task1 span
              |_ task1 sub span
        |_ task2 span
      
  2. Scheduler1 starts a dag while another scheduler finishes it
    • The visualized result will be the same as scenario 1
      dag span
        |_ task1 span
              |_ task1 sub span
        |_ task2 span
      
    • scheduler2 will set the span status to SHOULD_END and scheduler1 will end the spans during the next loop iteration.
  3. Scheduler1 starts a dag, exits gracefully and another scheduler picks up the dag and finishes it
    • Since scheduler1 exits gracefully, e.g. with a SIGTERM or SIGINT signal, we can end the spans and update the status
    • scheduler2 will create a continued span, for each prematurely ended span
      original       |----|
      continued           |-------|
      
      dag span
       |_ task1 span
       |_ scheduler exited span
       |_ new scheduler span
       |_ dag span (continued suffix)
             |_ task1 span (continued suffix)
                   |_ task1 sub span
             |_ task2 span
      
  4. Scheduler1 starts a dag, exits forcefully and another scheduler picks up the dag and finishes it
    • In this case scheduler1 exited with active spans. Airflow has a standard way of declaring a scheduler unhealthy and also stores the id of the scheduler job that started a dagrun or a ti
    • If the scheduler that started the dagrun or the ti, is unhealthy, we can recreate the lost spans
    • If a task is active and running and its part that hasn't been executed yet, is supposed to create some sub-spans, then it will use the new recreated span as a parent
      dag span (recreated suffix)
        |_ task1 span (recreated suffix)
              |_ task1 sub span
        |_ task2 span
      
    • If a task has finished and it created some sub-spans, then those spans are referencing a parent span that is lost along with the unhealthy scheduler. The only way to get the sub-spans is to re-run the task. In that case, we are recreating the span of the task itself but we can't recreate the sub-spans without rerunning the task
      dag span (recreated suffix)
        |_ task1 span (recreated suffix)
        |_ task2 span
      

Note that with the current airflow otel implementation, you can't create sub-spans from tasks. All dagrun spans are visualized like this

dag span
  |_ task1 span
  |_ task2 span

I'll do some testing to make sure nothing has been broken from merging with main and I'll move the PR out of draft.

@xBis7
Copy link
Author

xBis7 commented Dec 18, 2024

Hi @ashb,

  • I've addressed all your comments,
  • I created a new migration file,
  • I added some unit tests for the scheduler changes
  • I fixed 2 bugs that I came across while making the changes.

Please take a look when you get a chance and let me know how it looks.

I'll push some commits for moving the dagrun/scheduler span changes to new methods, to make the code more readable.

I think this will work, though the main question I have is: if we can re-create the span for an unhealthy scheduler, couldn't we do that for healthy schedulers too? That way we wouldn't need to store active_spans in memory at all, and nor would we need to store span status in the DB?

This will get us to the old implementation where we can't create spans from under tasks.

If a task has already been run and it created spans, then these spans will be lost when we recreate the task span. This is happening for a few reasons

  • the already exported spans are referencing the old span as a parent
  • we are not setting the traceid and spanid deterministically but we are letting the otel sdk to handle that
  • the task sub spans are getting the parent with context propagation

The only way to get the spans back, will be to rerun the task.

@ashb ashb added the pinned Protect from Stalebot auto closing label Jan 15, 2025
@ashb
Copy link
Member

ashb commented Jan 15, 2025

I'm aware this is waiting on my for a review, I will look at this when I can, but I've got some critical bits of AIP-72 (Task Execution Interface) to land first.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:CLI area:Executors-core LocalExecutor & SequentialExecutor area:Scheduler including HA (high availability) scheduler area:serialization kind:documentation pinned Protect from Stalebot auto closing
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants