Skip to content

Commit 933c88b

Browse files
feat: move the join_set tracing utils to a dedicated file
1 parent b3ec4e8 commit 933c88b

File tree

5 files changed

+197
-158
lines changed

5 files changed

+197
-158
lines changed

Diff for: datafusion-examples/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ test-utils = { path = "../test-utils" }
7474
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
7575
tonic = "0.12.1"
7676
tracing = { version = "0.1" }
77-
tracing-subscriber = { version = "0.3" }
77+
tracing-subscriber = { version = "0.3" }
7878
url = { workspace = true }
7979
uuid = "1.15"
8080

Diff for: datafusion-examples/examples/tracing.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ impl JoinSetTracer for SpanTracer {
111111
f: Box<dyn FnOnce() -> Box<dyn Any + Send> + Send>,
112112
) -> Box<dyn FnOnce() -> Box<dyn Any + Send> + Send> {
113113
let span = Span::current();
114-
Box::new(move || span.in_scope(|| f()))
114+
Box::new(move || span.in_scope(f))
115115
}
116116
}
117117

Diff for: datafusion/common-runtime/src/join_set.rs

+5-154
Original file line numberDiff line numberDiff line change
@@ -15,160 +15,12 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use futures::FutureExt;
19-
use std::any::Any;
18+
use crate::trace_utils::{trace_block, trace_future};
2019
use std::future::Future;
2120
use std::task::{Context, Poll};
2221
use tokio::runtime::Handle;
2322
use tokio::task::{AbortHandle, Id, JoinError, LocalSet};
2423

25-
pub mod trace_utils {
26-
use super::*;
27-
use futures::future::BoxFuture;
28-
use tokio::sync::OnceCell;
29-
30-
/// A trait for injecting instrumentation into either asynchronous futures or
31-
/// blocking closures at runtime.
32-
pub trait JoinSetTracer: Send + Sync + 'static {
33-
/// Function pointer type for tracing a future.
34-
///
35-
/// This function takes a boxed future (with its output type erased)
36-
/// and returns a boxed future (with its output still erased). The
37-
/// tracer must apply instrumentation without altering the output.
38-
fn trace_future(
39-
&self,
40-
fut: BoxFuture<'static, Box<dyn Any + Send>>,
41-
) -> BoxFuture<'static, Box<dyn Any + Send>>;
42-
43-
/// Function pointer type for tracing a blocking closure.
44-
///
45-
/// This function takes a boxed closure (with its return type erased)
46-
/// and returns a boxed closure (with its return type still erased). The
47-
/// tracer must apply instrumentation without changing the return value.
48-
fn trace_block(
49-
&self,
50-
f: Box<dyn FnOnce() -> Box<dyn Any + Send> + Send>,
51-
) -> Box<dyn FnOnce() -> Box<dyn Any + Send> + Send>;
52-
}
53-
54-
/// A no-op tracer that does not modify or instrument any futures or closures.
55-
/// This is used as a fallback if no custom tracer is set.
56-
struct NoopTracer;
57-
58-
impl JoinSetTracer for NoopTracer {
59-
fn trace_future(
60-
&self,
61-
fut: BoxFuture<'static, Box<dyn Any + Send>>,
62-
) -> BoxFuture<'static, Box<dyn Any + Send>> {
63-
fut
64-
}
65-
66-
fn trace_block(
67-
&self,
68-
f: Box<dyn FnOnce() -> Box<dyn Any + Send> + Send>,
69-
) -> Box<dyn FnOnce() -> Box<dyn Any + Send> + Send> {
70-
f
71-
}
72-
}
73-
74-
/// Global storage for an injected tracer. If no tracer is injected, a no-op
75-
/// tracer is used instead. This ensures that calls to [`trace_future`] or
76-
/// [`trace_block`] never panic due to missing instrumentation.
77-
static GLOBAL_TRACER: OnceCell<&'static dyn JoinSetTracer> = OnceCell::const_new();
78-
79-
/// A no-op tracer singleton that is returned by [`get_tracer`] if no custom
80-
/// tracer has been registered.
81-
static NOOP_TRACER: NoopTracer = NoopTracer;
82-
83-
/// Return the currently registered tracer, or the no-op tracer if none was
84-
/// registered.
85-
#[inline]
86-
fn get_tracer() -> &'static dyn JoinSetTracer {
87-
GLOBAL_TRACER.get().copied().unwrap_or(&NOOP_TRACER)
88-
}
89-
90-
/// Set the custom tracer for both futures and blocking closures.
91-
///
92-
/// This should be called once at startup. If called more than once, an
93-
/// `Err(())` is returned. If not called at all, a no-op tracer that does nothing
94-
/// is used.
95-
pub fn set_join_set_tracer(tracer: &'static dyn JoinSetTracer) -> Result<(), ()> {
96-
GLOBAL_TRACER.set(tracer).map_err(|_| ())
97-
}
98-
99-
/// Optionally instruments a future with custom tracing.
100-
///
101-
/// If a tracer has been injected via `set_tracer`, the future's output is
102-
/// boxed (erasing its type), passed to the tracer, and then downcast back
103-
/// to the expected type. If no tracer is set, the original future is returned.
104-
///
105-
/// # Type Parameters
106-
/// * `T` - The concrete output type of the future.
107-
/// * `F` - The future type.
108-
///
109-
/// # Parameters
110-
/// * `future` - The future to potentially instrument.
111-
pub fn trace_future<T, F>(future: F) -> BoxFuture<'static, T>
112-
where
113-
F: Future<Output = T> + Send + 'static,
114-
T: Send + 'static,
115-
{
116-
// Erase the future’s output type first:
117-
let erased_future = async move {
118-
let result = future.await;
119-
Box::new(result) as Box<dyn Any + Send>
120-
}
121-
.boxed();
122-
123-
// Forward through the global tracer:
124-
get_tracer()
125-
.trace_future(erased_future)
126-
// Downcast from `Box<dyn Any + Send>` back to `T`:
127-
.map(|any_box| {
128-
*any_box
129-
.downcast::<T>()
130-
.expect("Tracer must preserve the future’s output type!")
131-
})
132-
.boxed()
133-
}
134-
135-
/// Optionally instruments a blocking closure with custom tracing.
136-
///
137-
/// If a tracer has been injected via `set_tracer`, the closure is wrapped so that
138-
/// its return value is boxed (erasing its type), passed to the tracer, and then the
139-
/// result is downcast back to the original type. If no tracer is set, the closure is
140-
/// returned unmodified (except for being boxed).
141-
///
142-
/// # Type Parameters
143-
/// * `T` - The concrete return type of the closure.
144-
/// * `F` - The closure type.
145-
///
146-
/// # Parameters
147-
/// * `f` - The blocking closure to potentially instrument.
148-
pub fn trace_block<T, F>(f: F) -> Box<dyn FnOnce() -> T + Send>
149-
where
150-
F: FnOnce() -> T + Send + 'static,
151-
T: Send + 'static,
152-
{
153-
// Erase the closure’s return type first:
154-
let erased_closure = Box::new(|| {
155-
let result = f();
156-
Box::new(result) as Box<dyn Any + Send>
157-
});
158-
159-
// Forward through the global tracer:
160-
let traced_closure = get_tracer().trace_block(erased_closure);
161-
162-
// Downcast from `Box<dyn Any + Send>` back to `T`:
163-
Box::new(move || {
164-
let any_box = traced_closure();
165-
*any_box
166-
.downcast::<T>()
167-
.expect("Tracer must preserve the closure’s return type!")
168-
})
169-
}
170-
}
171-
17224
/// A wrapper around Tokio's JoinSet that forwards all API calls while optionally
17325
/// instrumenting spawned tasks and blocking closures with custom tracing behavior.
17426
/// If no tracer is injected via `trace_utils::set_tracer`, tasks and closures are executed
@@ -211,7 +63,7 @@ impl<T: 'static> JoinSet<T> {
21163
F: Send + 'static,
21264
T: Send,
21365
{
214-
self.inner.spawn(trace_utils::trace_future(task))
66+
self.inner.spawn(trace_future(task))
21567
}
21668

21769
/// [JoinSet::spawn_on](tokio::task::JoinSet::spawn_on) - Spawn a task on a provided runtime.
@@ -221,7 +73,7 @@ impl<T: 'static> JoinSet<T> {
22173
F: Send + 'static,
22274
T: Send,
22375
{
224-
self.inner.spawn_on(trace_utils::trace_future(task), handle)
76+
self.inner.spawn_on(trace_future(task), handle)
22577
}
22678

22779
/// [JoinSet::spawn_local](tokio::task::JoinSet::spawn_local) - Spawn a local task.
@@ -249,7 +101,7 @@ impl<T: 'static> JoinSet<T> {
249101
F: Send + 'static,
250102
T: Send,
251103
{
252-
self.inner.spawn_blocking(trace_utils::trace_block(f))
104+
self.inner.spawn_blocking(trace_block(f))
253105
}
254106

255107
/// [JoinSet::spawn_blocking_on](tokio::task::JoinSet::spawn_blocking_on) - Spawn a blocking task on a provided runtime.
@@ -259,8 +111,7 @@ impl<T: 'static> JoinSet<T> {
259111
F: Send + 'static,
260112
T: Send,
261113
{
262-
self.inner
263-
.spawn_blocking_on(trace_utils::trace_block(f), handle)
114+
self.inner.spawn_blocking_on(trace_block(f), handle)
264115
}
265116

266117
/// [JoinSet::join_next](tokio::task::JoinSet::join_next) - Await the next completed task.

Diff for: datafusion/common-runtime/src/lib.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525

2626
pub mod common;
2727
mod join_set;
28+
mod trace_utils;
2829

2930
pub use common::SpawnedTask;
30-
pub use join_set::trace_utils::set_join_set_tracer;
31-
pub use join_set::trace_utils::JoinSetTracer;
3231
pub use join_set::JoinSet;
32+
pub use trace_utils::{set_join_set_tracer, JoinSetTracer};

0 commit comments

Comments
 (0)