Skip to content

Commit 61805b1

Browse files
feat: allowing injecting custom join_set tracer to avoid dependency on tracing crate
1 parent 703aa32 commit 61805b1

File tree

9 files changed

+212
-175
lines changed

9 files changed

+212
-175
lines changed

Diff for: Cargo.lock

+1-12
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: README.md

-1
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,6 @@ Optional features:
129129
- `backtrace`: include backtrace information in error messages
130130
- `pyarrow`: conversions between PyArrow and DataFusion types
131131
- `serde`: enable arrow-schema's `serde` feature
132-
- `tracing`: propagates the current span across thread boundaries
133132

134133
[apache avro]: https://avro.apache.org/
135134
[apache parquet]: https://parquet.apache.org/

Diff for: datafusion-examples/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ async-trait = { workspace = true }
6161
bytes = { workspace = true }
6262
dashmap = { workspace = true }
6363
# note only use main datafusion crate for examples
64-
datafusion = { workspace = true, default-features = true, features = ["avro", "tracing"] }
64+
datafusion = { workspace = true, default-features = true, features = ["avro"] }
6565
datafusion-proto = { workspace = true }
6666
env_logger = { workspace = true }
6767
futures = { workspace = true }

Diff for: datafusion-examples/examples/tracing.rs

+48-78
Original file line numberDiff line numberDiff line change
@@ -15,113 +15,83 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! This example demonstrates the trace feature in DataFusion’s runtime.
19-
//! When the `tracing` feature is enabled, spawned tasks in DataFusion (such as those
20-
//! created during repartitioning or when reading Parquet files) are instrumented
21-
//! with the current tracing span, allowing to propagate any existing tracing context.
22-
//!
23-
//! In this example we create a session configured to use multiple partitions,
24-
//! register a Parquet table (based on the `alltypes_tiny_pages_plain.parquet` file),
25-
//! and run a query that should trigger parallel execution on multiple threads.
26-
//! We wrap the entire query execution within a custom span and log messages.
27-
//! By inspecting the tracing output, we should see that the tasks spawned
28-
//! internally inherit the span context.
18+
//! This example demonstrates the tracing injection feature for the DataFusion runtime.
19+
//! Tasks spawned on new threads behave differently depending on whether a tracer is
20+
//! injected. The log output clearly distinguishes the two cases.
2921
30-
use arrow::util::pretty::pretty_format_batches;
31-
use datafusion::arrow::record_batch::RecordBatch;
22+
use std::any::Any;
3223
use datafusion::datasource::file_format::parquet::ParquetFormat;
3324
use datafusion::datasource::listing::ListingOptions;
3425
use datafusion::error::Result;
3526
use datafusion::prelude::*;
3627
use datafusion::test_util::parquet_test_data;
3728
use std::sync::Arc;
38-
use tracing::{info, instrument, Level};
29+
use futures::future::BoxFuture;
30+
use futures::FutureExt;
31+
use tracing::{info, instrument, Instrument, Level, Span};
32+
use datafusion::common::runtime::set_join_set_tracer;
3933

4034
#[tokio::main]
4135
async fn main() -> Result<()> {
42-
// Initialize a tracing subscriber that prints to stdout.
36+
// Initialize tracing subscriber with thread info.
4337
tracing_subscriber::fmt()
4438
.with_thread_ids(true)
4539
.with_thread_names(true)
4640
.with_max_level(Level::DEBUG)
4741
.init();
4842

49-
log::info!("Starting example, this log is not captured by tracing");
43+
// Run query WITHOUT tracer injection.
44+
info!("***** RUNNING WITHOUT INJECTED TRACER *****");
45+
run_instrumented_query().await?;
46+
info!("***** WITHOUT tracer: `tokio-runtime-worker` tasks did NOT inherit the `run_instrumented_query` span *****");
5047

51-
// execute the query within a tracing span
52-
let result = run_instrumented_query().await;
48+
// Inject custom tracer so tasks run in the current span.
49+
info!("Injecting custom tracer...");
50+
set_join_set_tracer(instrument_future, instrument_block)
51+
.expect("Failed to set tracer");
5352

54-
info!(
55-
"Finished example. Check the logs above for tracing span details showing \
56-
that tasks were spawned within the 'run_instrumented_query' span on different threads."
57-
);
53+
// Run query WITH tracer injection.
54+
info!("***** RUNNING WITH INJECTED TRACER *****");
55+
run_instrumented_query().await?;
56+
info!("***** WITH tracer: `tokio-runtime-worker` tasks DID inherit the `run_instrumented_query` span *****");
5857

59-
result
58+
Ok(())
59+
}
60+
61+
/// Instruments a boxed future to run in the current span.
62+
fn instrument_future(
63+
fut: BoxFuture<'static, Box<dyn Any + Send>>
64+
) -> BoxFuture<'static, Box<dyn Any + Send>> {
65+
fut.in_current_span().boxed()
66+
}
67+
68+
/// Instruments a boxed blocking closure to execute within the current span.
69+
fn instrument_block(
70+
f: Box<dyn FnOnce() -> Box<dyn Any + Send> + Send>
71+
) -> Box<dyn FnOnce() -> Box<dyn Any + Send> + Send> {
72+
let span = Span::current();
73+
Box::new(move || span.in_scope(|| f()))
6074
}
6175

6276
#[instrument(level = "info")]
6377
async fn run_instrumented_query() -> Result<()> {
64-
info!("Starting query execution within the custom tracing span");
78+
info!("Starting query execution");
6579

66-
// The default session will set the number of partitions to `std::thread::available_parallelism()`.
6780
let ctx = SessionContext::new();
68-
69-
// Get the path to the test parquet data.
7081
let test_data = parquet_test_data();
71-
// Build listing options that pick up only the "alltypes_tiny_pages_plain.parquet" file.
7282
let file_format = ParquetFormat::default().with_enable_pruning(true);
7383
let listing_options = ListingOptions::new(Arc::new(file_format))
7484
.with_file_extension("alltypes_tiny_pages_plain.parquet");
7585

76-
info!("Registering Parquet table 'alltypes' from {test_data} in {listing_options:?}");
77-
78-
// Register a listing table using an absolute URL.
7986
let table_path = format!("file://{test_data}/");
80-
ctx.register_listing_table(
81-
"alltypes",
82-
&table_path,
83-
listing_options.clone(),
84-
None,
85-
None,
86-
)
87-
.await
88-
.expect("register_listing_table failed");
89-
90-
info!("Registered Parquet table 'alltypes' from {table_path}");
91-
92-
// Run a query that will trigger parallel execution on multiple threads.
93-
let sql = "SELECT COUNT(*), bool_col, date_string_col, string_col
94-
FROM (
95-
SELECT bool_col, date_string_col, string_col FROM alltypes
96-
UNION ALL
97-
SELECT bool_col, date_string_col, string_col FROM alltypes
98-
) AS t
99-
GROUP BY bool_col, date_string_col, string_col
100-
ORDER BY 1,2,3,4 DESC
101-
LIMIT 5;";
102-
info!(%sql, "Executing SQL query");
103-
let df = ctx.sql(sql).await?;
104-
105-
let results: Vec<RecordBatch> = df.collect().await?;
106-
info!("Query execution complete");
107-
108-
// Print out the results and tracing output.
109-
datafusion::common::assert_batches_eq!(
110-
[
111-
"+----------+----------+-----------------+------------+",
112-
"| count(*) | bool_col | date_string_col | string_col |",
113-
"+----------+----------+-----------------+------------+",
114-
"| 2 | false | 01/01/09 | 9 |",
115-
"| 2 | false | 01/01/09 | 7 |",
116-
"| 2 | false | 01/01/09 | 5 |",
117-
"| 2 | false | 01/01/09 | 3 |",
118-
"| 2 | false | 01/01/09 | 1 |",
119-
"+----------+----------+-----------------+------------+",
120-
],
121-
&results
122-
);
123-
124-
info!("Query results:\n{}", pretty_format_batches(&results)?);
125-
87+
info!("Registering table 'alltypes' from {}", table_path);
88+
ctx.register_listing_table("alltypes", &table_path, listing_options, None, None)
89+
.await
90+
.expect("Failed to register table");
91+
92+
let sql = "SELECT COUNT(*), string_col FROM alltypes GROUP BY string_col";
93+
info!(sql, "Executing SQL query");
94+
let result = ctx.sql(sql).await?.collect().await?;
95+
info!("Query complete: {} batches returned", result.len());
12696
Ok(())
127-
}
97+
}

Diff for: datafusion/common-runtime/Cargo.toml

+1-5
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,13 @@ all-features = true
3434
[lints]
3535
workspace = true
3636

37-
[features]
38-
tracing = ["dep:tracing", "dep:tracing-futures"]
39-
4037
[lib]
4138
name = "datafusion_common_runtime"
4239

4340
[dependencies]
41+
futures = { workspace = true }
4442
log = { workspace = true }
4543
tokio = { workspace = true }
46-
tracing = { version = "0.1", optional = true }
47-
tracing-futures = { version = "0.2", optional = true }
4844

4945
[dev-dependencies]
5046
tokio = { version = "1.36", features = ["rt", "rt-multi-thread", "time"] }

Diff for: datafusion/common-runtime/src/common.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,15 @@ impl<R: 'static> SpawnedTask<R> {
5353
}
5454

5555
/// Joins the task, returning the result of join (`Result<R, JoinError>`).
56-
pub async fn join(mut self) -> Result<R, JoinError> {
56+
pub async fn join(mut self) -> Result<R, JoinError> where R: Send {
5757
self.inner
5858
.join_next()
5959
.await
6060
.expect("`SpawnedTask` instance always contains exactly 1 task")
6161
}
6262

6363
/// Joins the task and unwinds the panic if it happens.
64-
pub async fn join_unwind(self) -> Result<R, JoinError> {
64+
pub async fn join_unwind(self) -> Result<R, JoinError> where R: Send {
6565
self.join().await.map_err(|e| {
6666
// `JoinError` can be caused either by panic or cancellation. We have to handle panics:
6767
if e.is_panic() {

0 commit comments

Comments
 (0)