Skip to content

feat: introduce JoinSetTracer trait for tracing context propagation in spawned tasks #14547

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 20, 2025

Conversation

geoffreyclaude
Copy link
Contributor

@geoffreyclaude geoffreyclaude commented Feb 7, 2025

Which issue does this PR close?

This PR lays the groundwork for optional instrumentation of async tasks in DataFusion.

Rationale for this change

This PR introduces a general mechanism enabling DataFusion to propagate user-defined context (such as tracing spans, logging, or metrics) across thread boundaries without depending on any specific instrumentation library.

Previously, tasks spawned on new threads—such as those performing repartitioning or Parquet file reads—would lose thread-local context, making instrumentation challenging for users. The introduced approach addresses this gap by allowing users to inject custom instrumentation via the new JoinSetTracer trait. This ensures context is preserved seamlessly, keeping DataFusion lightweight by not adding any direct instrumentation dependencies.

What changes are included in this PR?

  • New JoinSetTracer trait: Defines how to instrument futures or blocking closures when tasks are spawned on threads.
  • Global tracer registration: Adds a set_join_set_tracer function for registering a custom tracer at startup. If no tracer is set, a no-op implementation is used by default.
  • Refactored JoinSet: Introduces a wrapper around Tokio's JoinSet that leverages the registered tracer (if available) to instrument spawned tasks transparently.
  • Integration Example: Provides an illustrative example in datafusion-examples/examples/tracing.rs, demonstrating how users can integrate their tracing implementations. This example does not impose any direct tracing dependency on DataFusion users.

Are these changes tested?

Yes. There are no dedicated unit tests specifically for the tracer injection, but the example in datafusion-examples/examples/tracing.rs shows a working end-to-end setup using tracing. By running that example, you can confirm that tasks spawned on multiple threads inherit whichever span is active at the moment they are spawned—if a tracer is registered.

Are there any user-facing changes?

  • Users who do not register a tracer see no differences (and incur no overhead). Everything works as before.
  • Users who do want instrumentation can implement JoinSetTracer and call set_join_set_tracer(...). This approach is fully optional.

The upshot is that DataFusion now provides a pluggable way to connect with tracing or other instrumentation without pulling those dependencies into DataFusion by default.

@github-actions github-actions bot added documentation Improvements or additions to documentation physical-expr Changes to the physical-expr crates core Core DataFusion crate common Related to common crate labels Feb 7, 2025
@geoffreyclaude geoffreyclaude force-pushed the feat/trace-span-propagation branch 2 times, most recently from f7001cb to 615dc39 Compare February 8, 2025 16:16
@geoffreyclaude geoffreyclaude force-pushed the feat/trace-span-propagation branch 2 times, most recently from 234deb9 to bea0836 Compare February 13, 2025 17:13
README.md Outdated
@@ -126,6 +126,7 @@ Optional features:
- `backtrace`: include backtrace information in error messages
- `pyarrow`: conversions between PyArrow and DataFusion types
- `serde`: enable arrow-schema's `serde` feature
- `tracing`: propagates the current span across thread boundaries
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Is the new tracing feature required? Or should we just make it the default?

}
impl<T: 'static> JoinSet<T> {
/// [JoinSet::spawn](tokio::task::JoinSet::spawn) - Spawn a new task.
pub fn spawn<F>(&mut self, task: F) -> AbortHandle
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: All public stable methods of the tokio::task::JoinSet are wrapped. Should only the methods used in DataFusion be wrapped for conciseness?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrapping all methods makes sense to me. The developer would expect using this wrapper to be the same as the original one.

@NGA-TRAN
Copy link
Contributor

NGA-TRAN commented Feb 19, 2025

@alamb and @erratic-pattern
What do you think about this proposal PR as the first step to support tracing spans? DataDog uses traces a lot and we are planning to contribute to fully support this feature

@alamb
Copy link
Contributor

alamb commented Feb 23, 2025

Hi @geoffreyclaude @NGA-TRAN -- I will plan to review this PR shortly. Sorry for the delay.

@geoffreyclaude geoffreyclaude force-pushed the feat/trace-span-propagation branch 3 times, most recently from ccf9a9c to 703aa32 Compare March 2, 2025 17:25
@alamb alamb mentioned this pull request Mar 3, 2025
12 tasks
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for working on this @geoffreyclaude -- I am sorry for the delay in responding.

Primarily my concern about this PR is that it adds new more dependencies (albiet optional ones) to DataFusion. We already spend a non trivial amount of maintenance time keeping up with dependencies / fixing related issues, etc .

I also worry that not everyone uses the tracing crate.

New features also increase the compile time, such as

the "TracingExec", but it's a pretty wonky workaround. It also fails when the source nodes of the plan spawn new tasks, as in the ParquetSink, which prevent the >underlying object_store from linking its spans to the main trace.

FWIW the way we have handled this is to pass a tracing span along in PartitionedFile::extensions, but that is non ideal

https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.PartitionedFile.html#structfield.extensions

Questions / Suggestions

Is there some way to avoid adding a direct dependency on tracing but still keep the API flexibility you need?

For example, perhaps we could create a different JoinSet that instead of hard coding tracing could provide the callbacks for spawning tasks 🤔

@geoffreyclaude
Copy link
Contributor Author

geoffreyclaude commented Mar 3, 2025

Thank you for working on this @geoffreyclaude -- I am sorry for the delay in responding.

Primarily my concern about this PR is that it adds new more dependencies (albiet optional ones) to DataFusion. We already spend a non trivial amount of maintenance time keeping up with dependencies / fixing related issues, etc .

I also worry that not everyone uses the tracing crate.

New features also increase the compile time, such as

the "TracingExec", but it's a pretty wonky workaround. It also fails when the source nodes of the plan spawn new tasks, as in the ParquetSink, which prevent the >underlying object_store from linking its spans to the main trace.

FWIW the way we have handled this is to pass a tracing span along in PartitionedFile::extensions, but that is non ideal

https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.PartitionedFile.html#structfield.extensions

Questions / Suggestions

Is there some way to avoid adding a direct dependency on tracing but still keep the API flexibility you need?

For example, perhaps we could create a different JoinSet that instead of hard coding tracing could provide the callbacks for spawning tasks 🤔

And thank you @alamb for the detailed response! I'll try to address your points one by one.

  • I also worry that not everyone uses the tracing crate.

the tracing crate has pretty much become the standard crate for instrumentation, same as how tokio is the standard crate for async runtimes. Almost everyone who's interested in instrumenting their DataFusion execution will be using the tracing crate. Moreover, if we push this further and deliver a full execution plan instrumentation solution that relies on tracing (as a separate contrib crate,) it will probably become the de-facto DataFusion standard.

  • New features also increase the compile time

This is why I added a feature guard, as tracing definitely adds some compilation overhead. With the feature disabled, there should be no impact at all.

  • pass a tracing span along in PartitionedFile::extensions

This works if you want to link the object store calls to the root span, but I believe it won't allow you to link them to the "real" parent span, the DataSinkExec that wraps the ParquetSink. It also doesn't allow you to instrument native and custom execution nodes past a RepartitionExec that spawns tasks on separate threads.

  • Is there some way to avoid adding a direct dependency on tracing but still keep the API flexibility you need?

I really like that idea, as it doesn't need to pull in additional dependencies, and lets users chose their instrumentation solution.
Unfortunately, if we want to dynamically "inject" the new tracing behavior at runtime, it's particularly tricky, in particular because:

  1. it should be a static injection, so each new JoinSet created with JoinSet<T>::new() picks it up
  2. it needs to work for all the generic types of JoinSet, from JoinSet<()> to JoinSet<std::result::Result<usize, DataFusionError>>. (So we probably need complex macros, or a hard-coded list of all the possible types.)
  3. It needs to be done once and once only, globally, without relying on unsafe.
    It's of course possible, but the additional complexity just doesn't seem worth it. ... Or maybe there's a super obvious way to do it which I completely missed!

To me, I don't see a simpler way to open up full instrumentation of execution plans than what's done in this PR. It's super unfortunate that the tracing crate has a thread-local only storage for its context, but it doesn't seem likely to move away from this implementation decision.

Since the minimal change is pretty small, we can keep our fork of the change internally. But it also seemed like a good reason to push it upstream and contribute the full execution plan instrumentation code on top of it!

@github-actions github-actions bot added datasource Changes to the datasource crate and removed documentation Improvements or additions to documentation physical-expr Changes to the physical-expr crates labels Mar 4, 2025
@geoffreyclaude geoffreyclaude force-pushed the feat/trace-span-propagation branch from 61805b1 to 3cd150c Compare March 4, 2025 10:36
@geoffreyclaude
Copy link
Contributor Author

geoffreyclaude commented Mar 4, 2025

@alamb: I've gone ahead and refactored to allow "injecting" the tracing behavior at runtime, as a separate new commit. As predicted, the code is a bit thorny, especially due to the Boxing/Unboxing dance needed to get static Sized closures.

But it works (see datafusion-examples/examples/tracing.rs) without needing the tracing crate (optionally or not) inside datafusion-common-runtime!

Note that I've removed the tracing feature completely. We could easily add it back without importing the tracing crate, and use it only to call the new JoinSetTracer methods as before if set, for even less user impact.

@geoffreyclaude geoffreyclaude force-pushed the feat/trace-span-propagation branch 2 times, most recently from 7cd9209 to 69a5c78 Compare March 4, 2025 11:12
@geoffreyclaude geoffreyclaude force-pushed the feat/trace-span-propagation branch 9 times, most recently from 157a6c6 to 933c88b Compare March 4, 2025 14:01
@alamb
Copy link
Contributor

alamb commented Mar 17, 2025

Thanks @geoffreyclaude -- I will try and look at this.

I think @goldmedal was also interested in this capability (using tracing via the otel crate -- I will try and find time to look at this shortly. I am really sorry for the delay

@goldmedal
Copy link
Contributor

I'll take a look this PR in a few day.

@geoffreyclaude geoffreyclaude force-pushed the feat/trace-span-propagation branch from 933c88b to be86f76 Compare March 18, 2025 10:58
Copy link
Contributor

@goldmedal goldmedal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @geoffreyclaude. It makes sense to me. The way to inject the join set tracer is awesome.

}
impl<T: 'static> JoinSet<T> {
/// [JoinSet::spawn](tokio::task::JoinSet::spawn) - Spawn a new task.
pub fn spawn<F>(&mut self, task: F) -> AbortHandle
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrapping all methods makes sense to me. The developer would expect using this wrapper to be the same as the original one.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @geoffreyclaude and @goldmedal -- this looks really nice. The example is also 👨‍🍳 👌 and I think will permit other people to use this functionality more easily

Relates to #9415. Does not fully close the issue, but moves forward with a pre-requisite.

From my perspective this PR now closes that issue's stated request:

I would like to make it easy for people to add DataFusion ExecutionPlan level tracing to their systems as well.

Is it ok if we close it as well after this is merged?

@alamb
Copy link
Contributor

alamb commented Mar 19, 2025

(this is a very nice first contribution @geoffreyclaude -- thank you for sticking with it 🙏 )

@geoffreyclaude
Copy link
Contributor Author

geoffreyclaude commented Mar 20, 2025

@alamb and @goldmedal thanks for the review!

From my perspective this PR now closes that issue's stated request:

I would like to make it easy for people to add DataFusion ExecutionPlan level tracing to their systems as well.

Is it ok if we close it as well after this is merged?

Ok for me at least! There'll probably be some minor adjustments to make once we start playing with it (maybe some other futures to wrap, for instance in ListingTable::list_files_for_scan the future::try_join_all and StreamExt::buffer_unordered might cause tokio to schedule on separate threads), but I think the core problem is solved for ExecutionPath tracing.

I've also updated the PR title and description to match the latest changes, otherwise it could be confusing the readers that didn't follow the PR review.

@geoffreyclaude geoffreyclaude changed the title feat: instrument spawned tasks with current tracing span when tracing feature is enabled feat: introduce JoinSetTracer trait for context propagation in spawned tasks Mar 20, 2025
@geoffreyclaude geoffreyclaude changed the title feat: introduce JoinSetTracer trait for context propagation in spawned tasks feat: introduce JoinSetTracer trait for tracing context propagation in spawned tasks Mar 20, 2025
@alamb
Copy link
Contributor

alamb commented Mar 20, 2025

I also added this feature to the list of things we should document with the next release

Thanks again @geoffreyclaude and @goldmedal

@alamb alamb merged commit dd9c3a8 into apache:main Mar 20, 2025
32 of 33 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
common Related to common crate core Core DataFusion crate datasource Changes to the datasource crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support "Tracing" / Spans
4 participants