From 4117fa19b3fa09a3db31ffd568da16d7723666ae Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 18 Sep 2025 11:17:04 +0200 Subject: [PATCH 1/8] Collocate variants of From DFSchema to Schema --- datafusion/common/src/dfschema.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index d69810db1965..2bd94609bf6e 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -1083,6 +1083,12 @@ impl From<&DFSchema> for Schema { } } +impl From for SchemaRef { + fn from(df_schema: DFSchema) -> Self { + SchemaRef::new(df_schema.into()) + } +} + /// Allow DFSchema to be converted into an Arrow `&Schema` impl AsRef for DFSchema { fn as_ref(&self) -> &Schema { @@ -1120,12 +1126,6 @@ impl TryFrom for DFSchema { } } -impl From for SchemaRef { - fn from(df_schema: DFSchema) -> Self { - SchemaRef::new(df_schema.into()) - } -} - // Hashing refers to a subset of fields considered in PartialEq. impl Hash for DFSchema { fn hash(&self, state: &mut H) { From 217b8595df7012678b7e20351bcec92594bc20af Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 18 Sep 2025 11:21:11 +0200 Subject: [PATCH 2/8] Remove duplicated logic for obtaining Schema from DFSchema --- datafusion/common/src/dfschema.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 2bd94609bf6e..0c896c5fa660 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -1070,16 +1070,14 @@ fn format_simple_data_type(data_type: &DataType) -> String { impl From for Schema { /// Convert DFSchema into a Schema fn from(df_schema: DFSchema) -> Self { - let fields: Fields = df_schema.inner.fields.clone(); - Schema::new_with_metadata(fields, df_schema.inner.metadata.clone()) + (&df_schema).into() } } impl From<&DFSchema> for Schema { /// Convert DFSchema reference into a Schema fn from(df_schema: &DFSchema) -> Self { - let fields: Fields = df_schema.inner.fields.clone(); - Schema::new_with_metadata(fields, df_schema.inner.metadata.clone()) + df_schema.as_arrow().clone() } } From dc6a05714516fa6b078457bfa1b63e48aa785c48 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 18 Sep 2025 11:04:55 +0200 Subject: [PATCH 3/8] Remove Arc clone in hash_nested_array --- datafusion/common/src/scalar/mod.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index 67287546f438..b4f26ed8a6f2 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -834,8 +834,9 @@ impl Hash for ScalarValue { } fn hash_nested_array(arr: ArrayRef, state: &mut H) { - let arrays = vec![arr.to_owned()]; - let hashes_buffer = &mut vec![0; arr.len()]; + let len = arr.len(); + let arrays = vec![arr]; + let hashes_buffer = &mut vec![0; len]; let random_state = ahash::RandomState::with_seeds(0, 0, 0, 0); let hashes = create_hashes(&arrays, &random_state, hashes_buffer).unwrap(); // Hash back to std::hash::Hasher From 525fd9ca23388882a053ef1cb39ac9b6ca30cf23 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 18 Sep 2025 11:09:59 +0200 Subject: [PATCH 4/8] Avoid redundant Schema clones --- datafusion-examples/examples/expr_api.rs | 5 +-- .../examples/flight/flight_server.rs | 3 +- .../examples/flight/flight_sql_server.rs | 4 +-- datafusion/catalog/src/stream.rs | 2 +- datafusion/catalog/src/view.rs | 2 +- datafusion/common/src/dfschema.rs | 17 +++++++--- datafusion/core/src/dataframe/mod.rs | 3 +- .../src/datasource/listing_table_factory.rs | 2 +- .../src/datasource/physical_plan/parquet.rs | 4 +-- datafusion/core/src/execution/context/mod.rs | 4 +-- datafusion/core/src/physical_planner.rs | 32 ++++++++----------- datafusion/core/src/test_util/mod.rs | 2 +- datafusion/core/tests/dataframe/mod.rs | 3 +- .../core/tests/fuzz_cases/window_fuzz.rs | 6 ++-- .../physical_optimizer/enforce_sorting.rs | 2 +- .../tests/physical_optimizer/test_utils.rs | 2 +- datafusion/core/tests/sql/sql_api.rs | 4 +-- datafusion/expr/src/logical_plan/display.rs | 8 ++--- datafusion/physical-expr/src/planner.rs | 3 +- .../src/windows/bounded_window_agg_exec.rs | 2 +- datafusion/physical-plan/src/windows/mod.rs | 10 +++--- .../proto/src/physical_plan/from_proto.rs | 2 +- datafusion/pruning/src/pruning_predicate.rs | 4 +-- datafusion/sql/src/cte.rs | 8 ++--- .../tests/cases/substrait_validations.rs | 2 +- datafusion/substrait/tests/utils.rs | 2 +- 26 files changed, 65 insertions(+), 73 deletions(-) diff --git a/datafusion-examples/examples/expr_api.rs b/datafusion-examples/examples/expr_api.rs index ac17bd417c01..ec359fd1797d 100644 --- a/datafusion-examples/examples/expr_api.rs +++ b/datafusion-examples/examples/expr_api.rs @@ -518,10 +518,7 @@ fn type_coercion_demo() -> Result<()> { HashMap::new(), )?; let i8_array = Int8Array::from_iter_values(vec![0, 1, 2]); - let batch = RecordBatch::try_new( - Arc::new(df_schema.as_arrow().to_owned()), - vec![Arc::new(i8_array) as _], - )?; + let batch = RecordBatch::try_new((&df_schema).into(), vec![Arc::new(i8_array) as _])?; // Constructs a binary expression for demo. // By default, the literal `1` is translated into the Int32 type and cannot be directly compared with the Int8 type. diff --git a/datafusion-examples/examples/flight/flight_server.rs b/datafusion-examples/examples/flight/flight_server.rs index cc5f43746ddf..3201c90b4f14 100644 --- a/datafusion-examples/examples/flight/flight_server.rs +++ b/datafusion-examples/examples/flight/flight_server.rs @@ -33,6 +33,7 @@ use arrow_flight::{ Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaResult, Ticket, }; +use arrow_schema::SchemaRef; #[derive(Clone)] pub struct FlightServiceImpl {} @@ -98,7 +99,7 @@ impl FlightService for FlightServiceImpl { let df = ctx.sql(sql).await.map_err(to_tonic_err)?; // execute the query - let schema = df.schema().clone().into(); + let schema: SchemaRef = df.schema().into(); let results = df.collect().await.map_err(to_tonic_err)?; if results.is_empty() { return Err(Status::internal("There were no results from ticket")); diff --git a/datafusion-examples/examples/flight/flight_sql_server.rs b/datafusion-examples/examples/flight/flight_sql_server.rs index 5a573ed52320..c8671cc5856f 100644 --- a/datafusion-examples/examples/flight/flight_sql_server.rs +++ b/datafusion-examples/examples/flight/flight_sql_server.rs @@ -397,8 +397,8 @@ impl FlightSqlService for FlightSqlServiceImpl { let plan_schema = plan.schema(); - let arrow_schema = (&**plan_schema).into(); - let message = SchemaAsIpc::new(&arrow_schema, &IpcWriteOptions::default()) + let arrow_schema = plan_schema.as_ref().into(); + let message = SchemaAsIpc::new(arrow_schema, &IpcWriteOptions::default()) .try_into() .map_err(|e| status!("Unable to serialize schema", e))?; let IpcMessage(schema_bytes) = message; diff --git a/datafusion/catalog/src/stream.rs b/datafusion/catalog/src/stream.rs index 0fab9beba81f..608607325f4f 100644 --- a/datafusion/catalog/src/stream.rs +++ b/datafusion/catalog/src/stream.rs @@ -53,7 +53,7 @@ impl TableProviderFactory for StreamTableFactory { state: &dyn Session, cmd: &CreateExternalTable, ) -> Result> { - let schema: SchemaRef = Arc::new(cmd.schema.as_ref().into()); + let schema: SchemaRef = cmd.schema.as_ref().into(); let location = cmd.location.clone(); let encoding = cmd.file_type.parse()?; let header = if let Ok(opt) = cmd diff --git a/datafusion/catalog/src/view.rs b/datafusion/catalog/src/view.rs index 3bb7214399b5..2cdd07dcef4b 100644 --- a/datafusion/catalog/src/view.rs +++ b/datafusion/catalog/src/view.rs @@ -51,7 +51,7 @@ impl ViewTable { /// Notes: the `LogicalPlan` is not validated or type coerced. If this is /// needed it should be done after calling this function. pub fn new(logical_plan: LogicalPlan, definition: Option) -> Self { - let table_schema = logical_plan.schema().as_ref().to_owned().into(); + let table_schema = logical_plan.schema().as_ref().into(); Self { logical_plan, table_schema, diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 0c896c5fa660..ef36c5b26224 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -594,7 +594,7 @@ impl DFSchema { &self, arrow_schema: &Schema, ) -> Result<()> { - let self_arrow_schema: Schema = self.into(); + let self_arrow_schema: &Schema = self.into(); self_arrow_schema .fields() .iter() @@ -1081,9 +1081,16 @@ impl From<&DFSchema> for Schema { } } -impl From for SchemaRef { - fn from(df_schema: DFSchema) -> Self { - SchemaRef::new(df_schema.into()) +impl<'a> From<&'a DFSchema> for &'a Schema { + /// Convert DFSchema reference into a Schema + fn from(df_schema: &'a DFSchema) -> Self { + df_schema.as_arrow() + } +} + +impl From<&DFSchema> for SchemaRef { + fn from(df_schema: &DFSchema) -> Self { + Arc::clone(df_schema.inner()) } } @@ -1413,7 +1420,7 @@ mod tests { #[test] fn from_qualified_schema_into_arrow_schema() -> Result<()> { let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?; - let arrow_schema: Schema = schema.into(); + let arrow_schema = schema.as_arrow(); let expected = "Field { name: \"c0\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, \ Field { name: \"c1\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }"; assert_eq!(expected, arrow_schema.to_string()); diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 9832c0e9db1e..ca4f9cf641ee 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -2440,8 +2440,7 @@ impl TableProvider for DataFrameTableProvider { } fn schema(&self) -> SchemaRef { - let schema: Schema = self.plan.schema().as_ref().into(); - Arc::new(schema) + self.plan.schema().as_ref().into() } fn table_type(&self) -> TableType { diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 45d0fefded57..b1791f0eb0d8 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -105,7 +105,7 @@ impl TableProviderFactory for ListingTableFactory { .collect::>(), ) } else { - let schema: SchemaRef = Arc::new(cmd.schema.as_ref().to_owned().into()); + let schema: SchemaRef = cmd.schema.as_ref().into(); let table_partition_cols = cmd .table_partition_cols .iter() diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 6f1c6c4171d1..9adab933751e 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -2018,14 +2018,14 @@ mod tests { let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out"; fs::create_dir(&out_dir).unwrap(); let df = ctx.sql("SELECT c1, c2 FROM test").await?; - let schema: Schema = df.schema().into(); + let schema: SchemaRef = df.schema().into(); // Register a listing table - this will use all files in the directory as data sources // for the query ctx.register_listing_table( "my_table", &out_dir, listing_options, - Some(Arc::new(schema)), + Some(schema), None, ) .await diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 433b9400afa2..3e925939b607 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -855,7 +855,7 @@ impl SessionContext { (true, false, Ok(_)) => self.return_empty_dataframe(), (false, true, Ok(_)) => { self.deregister_table(name.clone())?; - let schema = Arc::new(input.schema().as_ref().into()); + let schema = input.schema().as_ref().into(); let physical = DataFrame::new(self.state(), input); let batches: Vec<_> = physical.collect_partitioned().await?; @@ -874,7 +874,7 @@ impl SessionContext { } (_, _, Err(_)) => { let df_schema = input.schema(); - let schema = Arc::new(df_schema.as_ref().into()); + let schema = df_schema.as_ref().into(); let physical = DataFrame::new(self.state(), input); let batches: Vec<_> = physical.collect_partitioned().await?; diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index e322b1effd94..a05ee1117b79 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -466,7 +466,6 @@ impl DefaultPhysicalPlanner { Arc::clone(res.plan()) } LogicalPlan::Values(Values { values, schema }) => { - let exec_schema = schema.as_ref().to_owned().into(); let exprs = values .iter() .map(|row| { @@ -477,27 +476,22 @@ impl DefaultPhysicalPlanner { .collect::>>>() }) .collect::>>()?; - MemorySourceConfig::try_new_as_values(SchemaRef::new(exec_schema), exprs)? - as _ + MemorySourceConfig::try_new_as_values(schema.as_ref().into(), exprs)? as _ } LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, schema, - }) => Arc::new(EmptyExec::new(SchemaRef::new( - schema.as_ref().to_owned().into(), - ))), + }) => Arc::new(EmptyExec::new(schema.as_ref().into())), LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: true, schema, - }) => Arc::new(PlaceholderRowExec::new(SchemaRef::new( - schema.as_ref().to_owned().into(), - ))), + }) => Arc::new(PlaceholderRowExec::new(schema.as_ref().into())), LogicalPlan::DescribeTable(DescribeTable { schema, output_schema, }) => { - let output_schema: Schema = output_schema.as_ref().into(); - self.plan_describe(Arc::clone(schema), Arc::new(output_schema))? + let output_schema: SchemaRef = output_schema.as_ref().into(); + self.plan_describe(Arc::clone(schema), output_schema)? } // 1 Child @@ -514,7 +508,7 @@ impl DefaultPhysicalPlanner { let parsed_url = ListingTableUrl::parse(output_url)?; let object_store_url = parsed_url.object_store(); - let schema: Schema = (**input.schema()).clone().into(); + let schema: SchemaRef = input.schema().as_ref().into(); // Note: the DataType passed here is ignored for the purposes of writing and inferred instead // from the schema of the RecordBatch being written. This allows COPY statements to specify only @@ -551,7 +545,7 @@ impl DefaultPhysicalPlanner { object_store_url, table_paths: vec![parsed_url], file_group: FileGroup::default(), - output_schema: Arc::new(schema), + output_schema: schema, table_partition_cols, insert_op: InsertOp::Append, keep_partition_by_columns, @@ -931,7 +925,7 @@ impl DefaultPhysicalPlanner { .. }) => { let input = children.one()?; - let schema = SchemaRef::new(schema.as_ref().to_owned().into()); + let schema = schema.as_ref().into(); let list_column_indices = list_type_columns .iter() .map(|(index, unnesting)| ListUnnest { @@ -1639,7 +1633,7 @@ pub fn create_window_expr_with_name( execution_props: &ExecutionProps, ) -> Result> { let name = name.into(); - let physical_schema: &Schema = &logical_schema.into(); + let physical_schema: SchemaRef = logical_schema.into(); match e { Expr::WindowFunction(window_fun) => { let WindowFunction { @@ -1850,7 +1844,7 @@ impl DefaultPhysicalPlanner { if !e.logical_optimization_succeeded { return Ok(Arc::new(ExplainExec::new( - Arc::clone(e.schema.inner()), + e.schema.as_ref().into(), e.stringified_plans.clone(), true, ))); @@ -1894,7 +1888,7 @@ impl DefaultPhysicalPlanner { if !stringified_plans.is_empty() { return Ok(Arc::new(ExplainExec::new( - Arc::clone(e.schema.inner()), + e.schema.as_ref().into(), stringified_plans, e.verbose, ))); @@ -2019,7 +2013,7 @@ impl DefaultPhysicalPlanner { } Ok(Arc::new(ExplainExec::new( - Arc::clone(e.schema.inner()), + e.schema.as_ref().into(), stringified_plans, e.verbose, ))) @@ -2031,7 +2025,7 @@ impl DefaultPhysicalPlanner { session_state: &SessionState, ) -> Result> { let input = self.create_physical_plan(&a.input, session_state).await?; - let schema = SchemaRef::new((*a.schema).clone().into()); + let schema = a.schema.as_ref().into(); let show_statistics = session_state.config_options().explain.show_statistics; Ok(Arc::new(AnalyzeExec::new( a.verbose, diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index 299b73ccbe40..31b9d241c0bc 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -185,7 +185,7 @@ impl TableProviderFactory for TestTableFactory { ) -> Result> { Ok(Arc::new(TestTableProvider { url: cmd.location.to_string(), - schema: Arc::new(cmd.schema.as_ref().into()), + schema: cmd.schema.as_ref().into(), })) } } diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index aa984775e457..3bd6953e1dfc 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -119,8 +119,7 @@ pub fn table_with_constraints() -> Arc { } async fn assert_logical_expr_schema_eq_physical_expr_schema(df: DataFrame) -> Result<()> { - let logical_expr_dfschema = df.schema(); - let logical_expr_schema = SchemaRef::from(logical_expr_dfschema.to_owned()); + let logical_expr_schema: SchemaRef = df.schema().into(); let batches = df.collect().await?; let physical_expr_schema = batches[0].schema(); assert_eq!(logical_expr_schema, physical_expr_schema); diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs b/datafusion/core/tests/fuzz_cases/window_fuzz.rs index 9da92f7cd49a..65a41d39d3c5 100644 --- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs @@ -286,7 +286,7 @@ async fn bounded_window_causal_non_causal() -> Result<()> { &partitionby_exprs, &[], Arc::new(window_frame), - &extended_schema, + extended_schema, false, false, None, @@ -660,7 +660,7 @@ async fn run_window_test( &partitionby_exprs, &orderby_exprs.clone(), Arc::new(window_frame.clone()), - &extended_schema, + Arc::clone(&extended_schema), false, false, None, @@ -680,7 +680,7 @@ async fn run_window_test( &partitionby_exprs, &orderby_exprs, Arc::new(window_frame.clone()), - &extended_schema, + extended_schema, false, false, None, diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs index a19dd7ace977..3858e70eaf3e 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs @@ -3683,7 +3683,7 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> { &partition_by, &[], case.window_frame, - input_schema.as_ref(), + Arc::clone(&input_schema), false, false, None, diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index 7c9fb9de5340..e6f005678dec 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -263,7 +263,7 @@ pub fn bounded_window_exec_with_partition( partition_by, &sort_exprs, Arc::new(WindowFrame::new(Some(false))), - schema.as_ref(), + schema, false, false, None, diff --git a/datafusion/core/tests/sql/sql_api.rs b/datafusion/core/tests/sql/sql_api.rs index ec086bcc50c7..b87afd27ddea 100644 --- a/datafusion/core/tests/sql/sql_api.rs +++ b/datafusion/core/tests/sql/sql_api.rs @@ -84,8 +84,8 @@ async fn dml_output_schema() { ctx.sql("CREATE TABLE test (x int)").await.unwrap(); let sql = "INSERT INTO test VALUES (1)"; let df = ctx.sql(sql).await.unwrap(); - let count_schema = Schema::new(vec![Field::new("count", DataType::UInt64, false)]); - assert_eq!(Schema::from(df.schema()), count_schema); + let count_schema = &Schema::new(vec![Field::new("count", DataType::UInt64, false)]); + assert_eq!(df.schema().as_arrow(), count_schema); } #[tokio::test] diff --git a/datafusion/expr/src/logical_plan/display.rs b/datafusion/expr/src/logical_plan/display.rs index cc3fbad7b0c2..2dbaea3ae7a9 100644 --- a/datafusion/expr/src/logical_plan/display.rs +++ b/datafusion/expr/src/logical_plan/display.rs @@ -72,11 +72,7 @@ impl<'n> TreeNodeVisitor<'n> for IndentVisitor<'_, '_> { write!(self.f, "{:indent$}", "", indent = self.indent * 2)?; write!(self.f, "{}", plan.display())?; if self.with_schema { - write!( - self.f, - " {}", - display_schema(&plan.schema().as_ref().to_owned().into()) - )?; + write!(self.f, " {}", display_schema(plan.schema().as_ref().into()))?; } self.indent += 1; @@ -196,7 +192,7 @@ impl<'n> TreeNodeVisitor<'n> for GraphvizVisitor<'_, '_> { format!( r"{}\nSchema: {}", plan.display(), - display_schema(&plan.schema().as_ref().to_owned().into()) + display_schema(plan.schema().as_ref().into()) ) } else { format!("{}", plan.display()) diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index fb491341f81d..a2017e7fe307 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -111,7 +111,7 @@ pub fn create_physical_expr( input_dfschema: &DFSchema, execution_props: &ExecutionProps, ) -> Result> { - let input_schema: &Schema = &input_dfschema.into(); + let input_schema: &Schema = input_dfschema.into(); match e { Expr::Alias(Alias { expr, metadata, .. }) => { @@ -407,6 +407,7 @@ where /// Convert a logical expression to a physical expression (without any simplification, etc) pub fn logical2physical(expr: &Expr, schema: &Schema) -> Arc { + // TODO this makes a deep copy of the Schema. Should take SchemaRef instead and avoid deep copy let df_schema = schema.clone().to_dfschema().unwrap(); let execution_props = ExecutionProps::new(); create_physical_expr(expr, &df_schema, &execution_props).unwrap() diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index e51b2d3f569c..e1946c8b2be6 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -1375,7 +1375,7 @@ mod tests { &partitionby_exprs, &orderby_exprs, Arc::new(window_frame), - &input.schema(), + input.schema(), false, false, None, diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index dccd9200fc77..4b4e018fd561 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -96,7 +96,7 @@ pub fn create_window_expr( partition_by: &[Arc], order_by: &[PhysicalSortExpr], window_frame: Arc, - input_schema: &Schema, + input_schema: SchemaRef, ignore_nulls: bool, distinct: bool, filter: Option>, @@ -105,7 +105,7 @@ pub fn create_window_expr( WindowFunctionDefinition::AggregateUDF(fun) => { let aggregate = if distinct { AggregateExprBuilder::new(Arc::clone(fun), args.to_vec()) - .schema(Arc::new(input_schema.clone())) + .schema(input_schema) .alias(name) .with_ignore_nulls(ignore_nulls) .distinct() @@ -113,7 +113,7 @@ pub fn create_window_expr( .map(Arc::new)? } else { AggregateExprBuilder::new(Arc::clone(fun), args.to_vec()) - .schema(Arc::new(input_schema.clone())) + .schema(input_schema) .alias(name) .with_ignore_nulls(ignore_nulls) .build() @@ -128,7 +128,7 @@ pub fn create_window_expr( ) } WindowFunctionDefinition::WindowUDF(fun) => Arc::new(StandardWindowExpr::new( - create_udwf_window_expr(fun, args, input_schema, name, ignore_nulls)?, + create_udwf_window_expr(fun, args, &input_schema, name, ignore_nulls)?, partition_by, order_by, window_frame, @@ -814,7 +814,7 @@ mod tests { &[], &[], Arc::new(WindowFrame::new(None)), - schema.as_ref(), + schema, false, false, None, diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 85de56e97280..910a3fd73c60 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -177,7 +177,7 @@ pub fn parse_physical_window_expr( &partition_by, &order_by, Arc::new(window_frame), - &extended_schema, + extended_schema, proto.ignore_nulls, proto.distinct, None, diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 5e92dbe227fd..954b251927fc 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -1094,7 +1094,7 @@ fn rewrite_expr_to_prunable( Ok((Arc::clone(column_expr), op, Arc::clone(scalar_expr))) } else if let Some(cast) = column_expr_any.downcast_ref::() { // `cast(col) op lit()` - let arrow_schema: SchemaRef = schema.clone().into(); + let arrow_schema: SchemaRef = (&schema).into(); let from_type = cast.expr().data_type(&arrow_schema)?; verify_support_type_for_prune(&from_type, cast.cast_type())?; let (left, op, right) = @@ -1109,7 +1109,7 @@ fn rewrite_expr_to_prunable( column_expr_any.downcast_ref::() { // `try_cast(col) op lit()` - let arrow_schema: SchemaRef = schema.clone().into(); + let arrow_schema: SchemaRef = (&schema).into(); let from_type = try_cast.expr().data_type(&arrow_schema)?; verify_support_type_for_prune(&from_type, try_cast.cast_type())?; let (left, op, right) = diff --git a/datafusion/sql/src/cte.rs b/datafusion/sql/src/cte.rs index 3650aea9c3c2..482d877be2c1 100644 --- a/datafusion/sql/src/cte.rs +++ b/datafusion/sql/src/cte.rs @@ -19,7 +19,6 @@ use std::sync::Arc; use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; -use arrow::datatypes::Schema; use datafusion_common::{ not_impl_err, plan_err, tree_node::{TreeNode, TreeNodeRecursion}, @@ -135,10 +134,9 @@ impl SqlToRel<'_, S> { // ---------- Step 2: Create a temporary relation ------------------ // Step 2.1: Create a table source for the temporary relation - let work_table_source = self.context_provider.create_cte_work_table( - &cte_name, - Arc::new(Schema::from(static_plan.schema().as_ref())), - )?; + let work_table_source = self + .context_provider + .create_cte_work_table(&cte_name, static_plan.schema().as_ref().into())?; // Step 2.2: Create a temporary relation logical plan that will be used // as the input to the recursive term diff --git a/datafusion/substrait/tests/cases/substrait_validations.rs b/datafusion/substrait/tests/cases/substrait_validations.rs index a31b3ca385e9..d222ff6a368d 100644 --- a/datafusion/substrait/tests/cases/substrait_validations.rs +++ b/datafusion/substrait/tests/cases/substrait_validations.rs @@ -51,7 +51,7 @@ mod tests { let ctx = SessionContext::new(); ctx.register_table( table_ref, - Arc::new(EmptyTable::new(df_schema.inner().clone())), + Arc::new(EmptyTable::new((&df_schema).into())), )?; Ok(ctx) } diff --git a/datafusion/substrait/tests/utils.rs b/datafusion/substrait/tests/utils.rs index e3e3ec3fab01..fbb526b8bbde 100644 --- a/datafusion/substrait/tests/utils.rs +++ b/datafusion/substrait/tests/utils.rs @@ -150,7 +150,7 @@ pub mod test { let df_schema = from_substrait_named_struct(self.consumer, substrait_schema)? .replace_qualifier(table_reference.clone()); - let table = EmptyTable::new(df_schema.inner().clone()); + let table = EmptyTable::new((&df_schema).into()); self.schemas.push((table_reference, Arc::new(table))); Ok(()) } From dfeca08ae23ab969a2bf226beca634f8282d1c9a Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 18 Sep 2025 14:19:53 +0200 Subject: [PATCH 5/8] Avoid some Field clones --- datafusion/expr/src/expr_schema.rs | 2 +- datafusion/expr/src/logical_plan/builder.rs | 2 ++ datafusion/expr/src/logical_plan/plan.rs | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index c844bbd5384b..e803e3534130 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -482,7 +482,7 @@ impl ExprSchemable for Expr { Ok(Arc::new(Field::new(&schema_name, DataType::Boolean, false))) } Expr::ScalarSubquery(subquery) => { - Ok(Arc::new(subquery.subquery.schema().field(0).clone())) + Ok(Arc::clone(&subquery.subquery.schema().fields()[0])) } Expr::BinaryExpr(BinaryExpr { ref left, diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index ac699b9d30c3..29d94f8c6d7e 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1970,6 +1970,7 @@ pub fn table_scan_with_filter_and_fetch( } pub fn table_source(table_schema: &Schema) -> Arc { + // TODO should we take SchemaRef and avoid cloning? let table_schema = Arc::new(table_schema.clone()); Arc::new(LogicalTableSource { table_schema, @@ -1981,6 +1982,7 @@ pub fn table_source_with_constraints( table_schema: &Schema, constraints: Constraints, ) -> Arc { + // TODO should we take SchemaRef and avoid cloning? let table_schema = Arc::new(table_schema.clone()); Arc::new(LogicalTableSource { table_schema, diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index c271e48a01f7..c3baaeaace92 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2702,7 +2702,7 @@ impl TableScan { let df_schema = DFSchema::new_with_metadata( p.iter() .map(|i| { - (Some(table_name.clone()), Arc::new(schema.field(*i).clone())) + (Some(table_name.clone()), Arc::clone(&schema.fields()[*i])) }) .collect(), schema.metadata.clone(), From 79c4162e432f2382d996d5fe7c49761e1373c51f Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Fri, 19 Sep 2025 17:21:08 +0200 Subject: [PATCH 6/8] make arc clones explicit --- datafusion-examples/examples/expr_api.rs | 5 +++- .../examples/flight/flight_server.rs | 3 +-- datafusion/catalog/src/stream.rs | 2 +- datafusion/catalog/src/view.rs | 2 +- datafusion/common/src/dfschema.rs | 20 -------------- datafusion/core/src/dataframe/mod.rs | 2 +- .../src/datasource/listing_table_factory.rs | 4 +-- .../src/datasource/physical_plan/parquet.rs | 2 +- datafusion/core/src/execution/context/mod.rs | 5 ++-- datafusion/core/src/physical_planner.rs | 26 ++++++++++--------- datafusion/core/src/test_util/mod.rs | 2 +- datafusion/core/tests/dataframe/mod.rs | 5 ++-- datafusion/expr/src/logical_plan/plan.rs | 4 +-- datafusion/pruning/src/pruning_predicate.rs | 8 +++--- datafusion/sql/src/cte.rs | 2 +- .../tests/cases/substrait_validations.rs | 2 +- datafusion/substrait/tests/utils.rs | 2 +- 17 files changed, 39 insertions(+), 57 deletions(-) diff --git a/datafusion-examples/examples/expr_api.rs b/datafusion-examples/examples/expr_api.rs index ec359fd1797d..56f960870e58 100644 --- a/datafusion-examples/examples/expr_api.rs +++ b/datafusion-examples/examples/expr_api.rs @@ -518,7 +518,10 @@ fn type_coercion_demo() -> Result<()> { HashMap::new(), )?; let i8_array = Int8Array::from_iter_values(vec![0, 1, 2]); - let batch = RecordBatch::try_new((&df_schema).into(), vec![Arc::new(i8_array) as _])?; + let batch = RecordBatch::try_new( + Arc::clone(df_schema.inner()), + vec![Arc::new(i8_array) as _], + )?; // Constructs a binary expression for demo. // By default, the literal `1` is translated into the Int32 type and cannot be directly compared with the Int8 type. diff --git a/datafusion-examples/examples/flight/flight_server.rs b/datafusion-examples/examples/flight/flight_server.rs index 3201c90b4f14..58bfb7a341c1 100644 --- a/datafusion-examples/examples/flight/flight_server.rs +++ b/datafusion-examples/examples/flight/flight_server.rs @@ -33,7 +33,6 @@ use arrow_flight::{ Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaResult, Ticket, }; -use arrow_schema::SchemaRef; #[derive(Clone)] pub struct FlightServiceImpl {} @@ -99,7 +98,7 @@ impl FlightService for FlightServiceImpl { let df = ctx.sql(sql).await.map_err(to_tonic_err)?; // execute the query - let schema: SchemaRef = df.schema().into(); + let schema = Arc::clone(df.schema().inner()); let results = df.collect().await.map_err(to_tonic_err)?; if results.is_empty() { return Err(Status::internal("There were no results from ticket")); diff --git a/datafusion/catalog/src/stream.rs b/datafusion/catalog/src/stream.rs index 608607325f4f..2d66ff4628b9 100644 --- a/datafusion/catalog/src/stream.rs +++ b/datafusion/catalog/src/stream.rs @@ -53,7 +53,7 @@ impl TableProviderFactory for StreamTableFactory { state: &dyn Session, cmd: &CreateExternalTable, ) -> Result> { - let schema: SchemaRef = cmd.schema.as_ref().into(); + let schema: SchemaRef = Arc::clone(cmd.schema.inner()); let location = cmd.location.clone(); let encoding = cmd.file_type.parse()?; let header = if let Ok(opt) = cmd diff --git a/datafusion/catalog/src/view.rs b/datafusion/catalog/src/view.rs index 2cdd07dcef4b..89c6a4a22451 100644 --- a/datafusion/catalog/src/view.rs +++ b/datafusion/catalog/src/view.rs @@ -51,7 +51,7 @@ impl ViewTable { /// Notes: the `LogicalPlan` is not validated or type coerced. If this is /// needed it should be done after calling this function. pub fn new(logical_plan: LogicalPlan, definition: Option) -> Self { - let table_schema = logical_plan.schema().as_ref().into(); + let table_schema = Arc::clone(logical_plan.schema().inner()); Self { logical_plan, table_schema, diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index ef36c5b26224..a0c173644800 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -1067,20 +1067,6 @@ fn format_simple_data_type(data_type: &DataType) -> String { } } -impl From for Schema { - /// Convert DFSchema into a Schema - fn from(df_schema: DFSchema) -> Self { - (&df_schema).into() - } -} - -impl From<&DFSchema> for Schema { - /// Convert DFSchema reference into a Schema - fn from(df_schema: &DFSchema) -> Self { - df_schema.as_arrow().clone() - } -} - impl<'a> From<&'a DFSchema> for &'a Schema { /// Convert DFSchema reference into a Schema fn from(df_schema: &'a DFSchema) -> Self { @@ -1088,12 +1074,6 @@ impl<'a> From<&'a DFSchema> for &'a Schema { } } -impl From<&DFSchema> for SchemaRef { - fn from(df_schema: &DFSchema) -> Self { - Arc::clone(df_schema.inner()) - } -} - /// Allow DFSchema to be converted into an Arrow `&Schema` impl AsRef for DFSchema { fn as_ref(&self) -> &Schema { diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index ca4f9cf641ee..76acb5c1da34 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -2440,7 +2440,7 @@ impl TableProvider for DataFrameTableProvider { } fn schema(&self) -> SchemaRef { - self.plan.schema().as_ref().into() + Arc::clone(self.plan.schema().inner()) } fn table_type(&self) -> TableType { diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index b1791f0eb0d8..f98297d0e3f7 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -27,7 +27,7 @@ use crate::datasource::listing::{ }; use crate::execution::context::SessionState; -use arrow::datatypes::{DataType, SchemaRef}; +use arrow::datatypes::DataType; use datafusion_common::{arrow_datafusion_err, plan_err, DataFusionError, ToDFSchema}; use datafusion_common::{config_datafusion_err, Result}; use datafusion_expr::CreateExternalTable; @@ -105,7 +105,7 @@ impl TableProviderFactory for ListingTableFactory { .collect::>(), ) } else { - let schema: SchemaRef = cmd.schema.as_ref().into(); + let schema = Arc::clone(cmd.schema.inner()); let table_partition_cols = cmd .table_partition_cols .iter() diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 9adab933751e..7c9767ceec86 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -2018,7 +2018,7 @@ mod tests { let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out"; fs::create_dir(&out_dir).unwrap(); let df = ctx.sql("SELECT c1, c2 FROM test").await?; - let schema: SchemaRef = df.schema().into(); + let schema = Arc::clone(df.schema().inner()); // Register a listing table - this will use all files in the directory as data sources // for the query ctx.register_listing_table( diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 3e925939b607..33941d41eb27 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -855,7 +855,7 @@ impl SessionContext { (true, false, Ok(_)) => self.return_empty_dataframe(), (false, true, Ok(_)) => { self.deregister_table(name.clone())?; - let schema = input.schema().as_ref().into(); + let schema = Arc::clone(input.schema().inner()); let physical = DataFrame::new(self.state(), input); let batches: Vec<_> = physical.collect_partitioned().await?; @@ -873,8 +873,7 @@ impl SessionContext { exec_err!("'IF NOT EXISTS' cannot coexist with 'REPLACE'") } (_, _, Err(_)) => { - let df_schema = input.schema(); - let schema = df_schema.as_ref().into(); + let schema = Arc::clone(input.schema().inner()); let physical = DataFrame::new(self.state(), input); let batches: Vec<_> = physical.collect_partitioned().await?; diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index a05ee1117b79..c60561b2c095 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -59,7 +59,7 @@ use crate::schema_equivalence::schema_satisfied_by; use arrow::array::{builder::StringBuilder, RecordBatch}; use arrow::compute::SortOptions; -use arrow::datatypes::{Schema, SchemaRef}; +use arrow::datatypes::Schema; use datafusion_catalog::ScanArgs; use datafusion_common::display::ToStringifiedPlan; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; @@ -476,21 +476,22 @@ impl DefaultPhysicalPlanner { .collect::>>>() }) .collect::>>()?; - MemorySourceConfig::try_new_as_values(schema.as_ref().into(), exprs)? as _ + MemorySourceConfig::try_new_as_values(Arc::clone(schema.inner()), exprs)? + as _ } LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, schema, - }) => Arc::new(EmptyExec::new(schema.as_ref().into())), + }) => Arc::new(EmptyExec::new(Arc::clone(schema.inner()))), LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: true, schema, - }) => Arc::new(PlaceholderRowExec::new(schema.as_ref().into())), + }) => Arc::new(PlaceholderRowExec::new(Arc::clone(schema.inner()))), LogicalPlan::DescribeTable(DescribeTable { schema, output_schema, }) => { - let output_schema: SchemaRef = output_schema.as_ref().into(); + let output_schema = Arc::clone(output_schema.inner()); self.plan_describe(Arc::clone(schema), output_schema)? } @@ -508,7 +509,7 @@ impl DefaultPhysicalPlanner { let parsed_url = ListingTableUrl::parse(output_url)?; let object_store_url = parsed_url.object_store(); - let schema: SchemaRef = input.schema().as_ref().into(); + let schema = Arc::clone(input.schema().inner()); // Note: the DataType passed here is ignored for the purposes of writing and inferred instead // from the schema of the RecordBatch being written. This allows COPY statements to specify only @@ -925,7 +926,7 @@ impl DefaultPhysicalPlanner { .. }) => { let input = children.one()?; - let schema = schema.as_ref().into(); + let schema = Arc::clone(schema.inner()); let list_column_indices = list_type_columns .iter() .map(|(index, unnesting)| ListUnnest { @@ -1633,7 +1634,7 @@ pub fn create_window_expr_with_name( execution_props: &ExecutionProps, ) -> Result> { let name = name.into(); - let physical_schema: SchemaRef = logical_schema.into(); + let physical_schema = Arc::clone(logical_schema.inner()); match e { Expr::WindowFunction(window_fun) => { let WindowFunction { @@ -1844,7 +1845,7 @@ impl DefaultPhysicalPlanner { if !e.logical_optimization_succeeded { return Ok(Arc::new(ExplainExec::new( - e.schema.as_ref().into(), + Arc::clone(e.schema.inner()), e.stringified_plans.clone(), true, ))); @@ -1888,7 +1889,7 @@ impl DefaultPhysicalPlanner { if !stringified_plans.is_empty() { return Ok(Arc::new(ExplainExec::new( - e.schema.as_ref().into(), + Arc::clone(e.schema.inner()), stringified_plans, e.verbose, ))); @@ -2013,7 +2014,7 @@ impl DefaultPhysicalPlanner { } Ok(Arc::new(ExplainExec::new( - e.schema.as_ref().into(), + Arc::clone(e.schema.inner()), stringified_plans, e.verbose, ))) @@ -2025,7 +2026,7 @@ impl DefaultPhysicalPlanner { session_state: &SessionState, ) -> Result> { let input = self.create_physical_plan(&a.input, session_state).await?; - let schema = a.schema.as_ref().into(); + let schema = Arc::clone(a.schema.inner()); let show_statistics = session_state.config_options().explain.show_statistics; Ok(Arc::new(AnalyzeExec::new( a.verbose, @@ -2376,6 +2377,7 @@ mod tests { use crate::execution::session_state::SessionStateBuilder; use arrow::array::{ArrayRef, DictionaryArray, Int32Array}; use arrow::datatypes::{DataType, Field, Int32Type}; + use arrow_schema::SchemaRef; use datafusion_common::config::ConfigOptions; use datafusion_common::{ assert_contains, DFSchemaRef, TableReference, ToDFSchema as _, diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index 31b9d241c0bc..7149c5b0bd8c 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -185,7 +185,7 @@ impl TableProviderFactory for TestTableFactory { ) -> Result> { Ok(Arc::new(TestTableProvider { url: cmd.location.to_string(), - schema: cmd.schema.as_ref().into(), + schema: Arc::clone(cmd.schema.inner()), })) } } diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 3bd6953e1dfc..0820d444f642 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -27,8 +27,7 @@ use arrow::array::{ }; use arrow::buffer::ScalarBuffer; use arrow::datatypes::{ - DataType, Field, Float32Type, Int32Type, Schema, SchemaRef, UInt64Type, UnionFields, - UnionMode, + DataType, Field, Float32Type, Int32Type, Schema, UInt64Type, UnionFields, UnionMode, }; use arrow::error::ArrowError; use arrow::util::pretty::pretty_format_batches; @@ -119,7 +118,7 @@ pub fn table_with_constraints() -> Arc { } async fn assert_logical_expr_schema_eq_physical_expr_schema(df: DataFrame) -> Result<()> { - let logical_expr_schema: SchemaRef = df.schema().into(); + let logical_expr_schema = Arc::clone(df.schema().inner()); let batches = df.collect().await?; let physical_expr_schema = batches[0].schema(); assert_eq!(logical_expr_schema, physical_expr_schema); diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index c3baaeaace92..3cc032277405 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2276,10 +2276,10 @@ impl SubqueryAlias { let func_dependencies = plan.schema().functional_dependencies().clone(); let schema = DFSchema::from_unqualified_fields(fields, meta_data)?; - let schema = Schema::from(schema); + let schema = schema.as_arrow(); let schema = DFSchemaRef::new( - DFSchema::try_from_qualified_schema(alias.clone(), &schema)? + DFSchema::try_from_qualified_schema(alias.clone(), schema)? .with_functional_dependencies(func_dependencies)?, ); Ok(SubqueryAlias { diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 954b251927fc..f0f87ffe1e60 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -1094,8 +1094,8 @@ fn rewrite_expr_to_prunable( Ok((Arc::clone(column_expr), op, Arc::clone(scalar_expr))) } else if let Some(cast) = column_expr_any.downcast_ref::() { // `cast(col) op lit()` - let arrow_schema: SchemaRef = (&schema).into(); - let from_type = cast.expr().data_type(&arrow_schema)?; + let arrow_schema = schema.as_arrow(); + let from_type = cast.expr().data_type(arrow_schema)?; verify_support_type_for_prune(&from_type, cast.cast_type())?; let (left, op, right) = rewrite_expr_to_prunable(cast.expr(), op, scalar_expr, schema)?; @@ -1109,8 +1109,8 @@ fn rewrite_expr_to_prunable( column_expr_any.downcast_ref::() { // `try_cast(col) op lit()` - let arrow_schema: SchemaRef = (&schema).into(); - let from_type = try_cast.expr().data_type(&arrow_schema)?; + let arrow_schema = schema.as_arrow(); + let from_type = try_cast.expr().data_type(arrow_schema)?; verify_support_type_for_prune(&from_type, try_cast.cast_type())?; let (left, op, right) = rewrite_expr_to_prunable(try_cast.expr(), op, scalar_expr, schema)?; diff --git a/datafusion/sql/src/cte.rs b/datafusion/sql/src/cte.rs index 482d877be2c1..aceec676761c 100644 --- a/datafusion/sql/src/cte.rs +++ b/datafusion/sql/src/cte.rs @@ -136,7 +136,7 @@ impl SqlToRel<'_, S> { // Step 2.1: Create a table source for the temporary relation let work_table_source = self .context_provider - .create_cte_work_table(&cte_name, static_plan.schema().as_ref().into())?; + .create_cte_work_table(&cte_name, Arc::clone(static_plan.schema().inner()))?; // Step 2.2: Create a temporary relation logical plan that will be used // as the input to the recursive term diff --git a/datafusion/substrait/tests/cases/substrait_validations.rs b/datafusion/substrait/tests/cases/substrait_validations.rs index d222ff6a368d..c8cc3fe9940c 100644 --- a/datafusion/substrait/tests/cases/substrait_validations.rs +++ b/datafusion/substrait/tests/cases/substrait_validations.rs @@ -51,7 +51,7 @@ mod tests { let ctx = SessionContext::new(); ctx.register_table( table_ref, - Arc::new(EmptyTable::new((&df_schema).into())), + Arc::new(EmptyTable::new(Arc::clone(df_schema.inner()))), )?; Ok(ctx) } diff --git a/datafusion/substrait/tests/utils.rs b/datafusion/substrait/tests/utils.rs index fbb526b8bbde..f84594312b63 100644 --- a/datafusion/substrait/tests/utils.rs +++ b/datafusion/substrait/tests/utils.rs @@ -150,7 +150,7 @@ pub mod test { let df_schema = from_substrait_named_struct(self.consumer, substrait_schema)? .replace_qualifier(table_reference.clone()); - let table = EmptyTable::new((&df_schema).into()); + let table = EmptyTable::new(Arc::clone(df_schema.inner())); self.schemas.push((table_reference, Arc::new(table))); Ok(()) } From 5fdec0cd97b54a4f9a8f8c57424c7da69c60fa6d Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Fri, 19 Sep 2025 17:30:36 +0200 Subject: [PATCH 7/8] retract the new From --- .../examples/flight/flight_sql_server.rs | 4 +--- datafusion/common/src/dfschema.rs | 11 ++--------- datafusion/expr/src/logical_plan/display.rs | 4 ++-- datafusion/physical-expr/src/planner.rs | 2 +- 4 files changed, 6 insertions(+), 15 deletions(-) diff --git a/datafusion-examples/examples/flight/flight_sql_server.rs b/datafusion-examples/examples/flight/flight_sql_server.rs index c8671cc5856f..c35debec7d71 100644 --- a/datafusion-examples/examples/flight/flight_sql_server.rs +++ b/datafusion-examples/examples/flight/flight_sql_server.rs @@ -395,9 +395,7 @@ impl FlightSqlService for FlightSqlServiceImpl { let plan_uuid = Uuid::new_v4().hyphenated().to_string(); self.statements.insert(plan_uuid.clone(), plan.clone()); - let plan_schema = plan.schema(); - - let arrow_schema = plan_schema.as_ref().into(); + let arrow_schema = plan.schema().as_arrow(); let message = SchemaAsIpc::new(arrow_schema, &IpcWriteOptions::default()) .try_into() .map_err(|e| status!("Unable to serialize schema", e))?; diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index a0c173644800..90b62066577a 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -101,7 +101,7 @@ pub type DFSchemaRef = Arc; /// let df_schema = DFSchema::from_unqualified_fields(vec![ /// Field::new("c1", arrow::datatypes::DataType::Int32, false), /// ].into(),HashMap::new()).unwrap(); -/// let schema = Schema::from(df_schema); +/// let schema: &Schema = df_schema.as_arrow(); /// assert_eq!(schema.fields().len(), 1); /// ``` #[derive(Debug, Clone, PartialEq, Eq)] @@ -594,7 +594,7 @@ impl DFSchema { &self, arrow_schema: &Schema, ) -> Result<()> { - let self_arrow_schema: &Schema = self.into(); + let self_arrow_schema = self.as_arrow(); self_arrow_schema .fields() .iter() @@ -1067,13 +1067,6 @@ fn format_simple_data_type(data_type: &DataType) -> String { } } -impl<'a> From<&'a DFSchema> for &'a Schema { - /// Convert DFSchema reference into a Schema - fn from(df_schema: &'a DFSchema) -> Self { - df_schema.as_arrow() - } -} - /// Allow DFSchema to be converted into an Arrow `&Schema` impl AsRef for DFSchema { fn as_ref(&self) -> &Schema { diff --git a/datafusion/expr/src/logical_plan/display.rs b/datafusion/expr/src/logical_plan/display.rs index 2dbaea3ae7a9..174ab28a1e6d 100644 --- a/datafusion/expr/src/logical_plan/display.rs +++ b/datafusion/expr/src/logical_plan/display.rs @@ -72,7 +72,7 @@ impl<'n> TreeNodeVisitor<'n> for IndentVisitor<'_, '_> { write!(self.f, "{:indent$}", "", indent = self.indent * 2)?; write!(self.f, "{}", plan.display())?; if self.with_schema { - write!(self.f, " {}", display_schema(plan.schema().as_ref().into()))?; + write!(self.f, " {}", display_schema(plan.schema().as_arrow()))?; } self.indent += 1; @@ -192,7 +192,7 @@ impl<'n> TreeNodeVisitor<'n> for GraphvizVisitor<'_, '_> { format!( r"{}\nSchema: {}", plan.display(), - display_schema(plan.schema().as_ref().into()) + display_schema(plan.schema().as_arrow()) ) } else { format!("{}", plan.display()) diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index a2017e7fe307..73df60c42e96 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -111,7 +111,7 @@ pub fn create_physical_expr( input_dfschema: &DFSchema, execution_props: &ExecutionProps, ) -> Result> { - let input_schema: &Schema = input_dfschema.into(); + let input_schema = input_dfschema.as_arrow(); match e { Expr::Alias(Alias { expr, metadata, .. }) => { From ac0e0c98af988b94647b7e2c4be45b68555c1586 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Mon, 22 Sep 2025 16:50:52 +0200 Subject: [PATCH 8/8] =?UTF-8?q?empty:=20roll=20the=20dice=20=F0=9F=8E=B2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit