diff --git a/datafusion-examples/examples/expr_api.rs b/datafusion-examples/examples/expr_api.rs index ac17bd417c01..56f960870e58 100644 --- a/datafusion-examples/examples/expr_api.rs +++ b/datafusion-examples/examples/expr_api.rs @@ -519,7 +519,7 @@ fn type_coercion_demo() -> Result<()> { )?; let i8_array = Int8Array::from_iter_values(vec![0, 1, 2]); let batch = RecordBatch::try_new( - Arc::new(df_schema.as_arrow().to_owned()), + Arc::clone(df_schema.inner()), vec![Arc::new(i8_array) as _], )?; diff --git a/datafusion-examples/examples/flight/flight_server.rs b/datafusion-examples/examples/flight/flight_server.rs index cc5f43746ddf..58bfb7a341c1 100644 --- a/datafusion-examples/examples/flight/flight_server.rs +++ b/datafusion-examples/examples/flight/flight_server.rs @@ -98,7 +98,7 @@ impl FlightService for FlightServiceImpl { let df = ctx.sql(sql).await.map_err(to_tonic_err)?; // execute the query - let schema = df.schema().clone().into(); + let schema = Arc::clone(df.schema().inner()); let results = df.collect().await.map_err(to_tonic_err)?; if results.is_empty() { return Err(Status::internal("There were no results from ticket")); diff --git a/datafusion-examples/examples/flight/flight_sql_server.rs b/datafusion-examples/examples/flight/flight_sql_server.rs index 5a573ed52320..c35debec7d71 100644 --- a/datafusion-examples/examples/flight/flight_sql_server.rs +++ b/datafusion-examples/examples/flight/flight_sql_server.rs @@ -395,10 +395,8 @@ impl FlightSqlService for FlightSqlServiceImpl { let plan_uuid = Uuid::new_v4().hyphenated().to_string(); self.statements.insert(plan_uuid.clone(), plan.clone()); - let plan_schema = plan.schema(); - - let arrow_schema = (&**plan_schema).into(); - let message = SchemaAsIpc::new(&arrow_schema, &IpcWriteOptions::default()) + let arrow_schema = plan.schema().as_arrow(); + let message = SchemaAsIpc::new(arrow_schema, &IpcWriteOptions::default()) .try_into() .map_err(|e| status!("Unable to serialize schema", e))?; let IpcMessage(schema_bytes) = message; diff --git a/datafusion/catalog/src/stream.rs b/datafusion/catalog/src/stream.rs index 0fab9beba81f..2d66ff4628b9 100644 --- a/datafusion/catalog/src/stream.rs +++ b/datafusion/catalog/src/stream.rs @@ -53,7 +53,7 @@ impl TableProviderFactory for StreamTableFactory { state: &dyn Session, cmd: &CreateExternalTable, ) -> Result> { - let schema: SchemaRef = Arc::new(cmd.schema.as_ref().into()); + let schema: SchemaRef = Arc::clone(cmd.schema.inner()); let location = cmd.location.clone(); let encoding = cmd.file_type.parse()?; let header = if let Ok(opt) = cmd diff --git a/datafusion/catalog/src/view.rs b/datafusion/catalog/src/view.rs index 3bb7214399b5..89c6a4a22451 100644 --- a/datafusion/catalog/src/view.rs +++ b/datafusion/catalog/src/view.rs @@ -51,7 +51,7 @@ impl ViewTable { /// Notes: the `LogicalPlan` is not validated or type coerced. If this is /// needed it should be done after calling this function. pub fn new(logical_plan: LogicalPlan, definition: Option) -> Self { - let table_schema = logical_plan.schema().as_ref().to_owned().into(); + let table_schema = Arc::clone(logical_plan.schema().inner()); Self { logical_plan, table_schema, diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index d69810db1965..90b62066577a 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -101,7 +101,7 @@ pub type DFSchemaRef = Arc; /// let df_schema = DFSchema::from_unqualified_fields(vec![ /// Field::new("c1", arrow::datatypes::DataType::Int32, false), /// ].into(),HashMap::new()).unwrap(); -/// let schema = Schema::from(df_schema); +/// let schema: &Schema = df_schema.as_arrow(); /// assert_eq!(schema.fields().len(), 1); /// ``` #[derive(Debug, Clone, PartialEq, Eq)] @@ -594,7 +594,7 @@ impl DFSchema { &self, arrow_schema: &Schema, ) -> Result<()> { - let self_arrow_schema: Schema = self.into(); + let self_arrow_schema = self.as_arrow(); self_arrow_schema .fields() .iter() @@ -1067,22 +1067,6 @@ fn format_simple_data_type(data_type: &DataType) -> String { } } -impl From for Schema { - /// Convert DFSchema into a Schema - fn from(df_schema: DFSchema) -> Self { - let fields: Fields = df_schema.inner.fields.clone(); - Schema::new_with_metadata(fields, df_schema.inner.metadata.clone()) - } -} - -impl From<&DFSchema> for Schema { - /// Convert DFSchema reference into a Schema - fn from(df_schema: &DFSchema) -> Self { - let fields: Fields = df_schema.inner.fields.clone(); - Schema::new_with_metadata(fields, df_schema.inner.metadata.clone()) - } -} - /// Allow DFSchema to be converted into an Arrow `&Schema` impl AsRef for DFSchema { fn as_ref(&self) -> &Schema { @@ -1120,12 +1104,6 @@ impl TryFrom for DFSchema { } } -impl From for SchemaRef { - fn from(df_schema: DFSchema) -> Self { - SchemaRef::new(df_schema.into()) - } -} - // Hashing refers to a subset of fields considered in PartialEq. impl Hash for DFSchema { fn hash(&self, state: &mut H) { @@ -1415,7 +1393,7 @@ mod tests { #[test] fn from_qualified_schema_into_arrow_schema() -> Result<()> { let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?; - let arrow_schema: Schema = schema.into(); + let arrow_schema = schema.as_arrow(); let expected = "Field { name: \"c0\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, \ Field { name: \"c1\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }"; assert_eq!(expected, arrow_schema.to_string()); diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index 67287546f438..b4f26ed8a6f2 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -834,8 +834,9 @@ impl Hash for ScalarValue { } fn hash_nested_array(arr: ArrayRef, state: &mut H) { - let arrays = vec![arr.to_owned()]; - let hashes_buffer = &mut vec![0; arr.len()]; + let len = arr.len(); + let arrays = vec![arr]; + let hashes_buffer = &mut vec![0; len]; let random_state = ahash::RandomState::with_seeds(0, 0, 0, 0); let hashes = create_hashes(&arrays, &random_state, hashes_buffer).unwrap(); // Hash back to std::hash::Hasher diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 9832c0e9db1e..76acb5c1da34 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -2440,8 +2440,7 @@ impl TableProvider for DataFrameTableProvider { } fn schema(&self) -> SchemaRef { - let schema: Schema = self.plan.schema().as_ref().into(); - Arc::new(schema) + Arc::clone(self.plan.schema().inner()) } fn table_type(&self) -> TableType { diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 45d0fefded57..f98297d0e3f7 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -27,7 +27,7 @@ use crate::datasource::listing::{ }; use crate::execution::context::SessionState; -use arrow::datatypes::{DataType, SchemaRef}; +use arrow::datatypes::DataType; use datafusion_common::{arrow_datafusion_err, plan_err, DataFusionError, ToDFSchema}; use datafusion_common::{config_datafusion_err, Result}; use datafusion_expr::CreateExternalTable; @@ -105,7 +105,7 @@ impl TableProviderFactory for ListingTableFactory { .collect::>(), ) } else { - let schema: SchemaRef = Arc::new(cmd.schema.as_ref().to_owned().into()); + let schema = Arc::clone(cmd.schema.inner()); let table_partition_cols = cmd .table_partition_cols .iter() diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 6f1c6c4171d1..7c9767ceec86 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -2018,14 +2018,14 @@ mod tests { let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out"; fs::create_dir(&out_dir).unwrap(); let df = ctx.sql("SELECT c1, c2 FROM test").await?; - let schema: Schema = df.schema().into(); + let schema = Arc::clone(df.schema().inner()); // Register a listing table - this will use all files in the directory as data sources // for the query ctx.register_listing_table( "my_table", &out_dir, listing_options, - Some(Arc::new(schema)), + Some(schema), None, ) .await diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 433b9400afa2..33941d41eb27 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -855,7 +855,7 @@ impl SessionContext { (true, false, Ok(_)) => self.return_empty_dataframe(), (false, true, Ok(_)) => { self.deregister_table(name.clone())?; - let schema = Arc::new(input.schema().as_ref().into()); + let schema = Arc::clone(input.schema().inner()); let physical = DataFrame::new(self.state(), input); let batches: Vec<_> = physical.collect_partitioned().await?; @@ -873,8 +873,7 @@ impl SessionContext { exec_err!("'IF NOT EXISTS' cannot coexist with 'REPLACE'") } (_, _, Err(_)) => { - let df_schema = input.schema(); - let schema = Arc::new(df_schema.as_ref().into()); + let schema = Arc::clone(input.schema().inner()); let physical = DataFrame::new(self.state(), input); let batches: Vec<_> = physical.collect_partitioned().await?; diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index e322b1effd94..c60561b2c095 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -59,7 +59,7 @@ use crate::schema_equivalence::schema_satisfied_by; use arrow::array::{builder::StringBuilder, RecordBatch}; use arrow::compute::SortOptions; -use arrow::datatypes::{Schema, SchemaRef}; +use arrow::datatypes::Schema; use datafusion_catalog::ScanArgs; use datafusion_common::display::ToStringifiedPlan; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; @@ -466,7 +466,6 @@ impl DefaultPhysicalPlanner { Arc::clone(res.plan()) } LogicalPlan::Values(Values { values, schema }) => { - let exec_schema = schema.as_ref().to_owned().into(); let exprs = values .iter() .map(|row| { @@ -477,27 +476,23 @@ impl DefaultPhysicalPlanner { .collect::>>>() }) .collect::>>()?; - MemorySourceConfig::try_new_as_values(SchemaRef::new(exec_schema), exprs)? + MemorySourceConfig::try_new_as_values(Arc::clone(schema.inner()), exprs)? as _ } LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, schema, - }) => Arc::new(EmptyExec::new(SchemaRef::new( - schema.as_ref().to_owned().into(), - ))), + }) => Arc::new(EmptyExec::new(Arc::clone(schema.inner()))), LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: true, schema, - }) => Arc::new(PlaceholderRowExec::new(SchemaRef::new( - schema.as_ref().to_owned().into(), - ))), + }) => Arc::new(PlaceholderRowExec::new(Arc::clone(schema.inner()))), LogicalPlan::DescribeTable(DescribeTable { schema, output_schema, }) => { - let output_schema: Schema = output_schema.as_ref().into(); - self.plan_describe(Arc::clone(schema), Arc::new(output_schema))? + let output_schema = Arc::clone(output_schema.inner()); + self.plan_describe(Arc::clone(schema), output_schema)? } // 1 Child @@ -514,7 +509,7 @@ impl DefaultPhysicalPlanner { let parsed_url = ListingTableUrl::parse(output_url)?; let object_store_url = parsed_url.object_store(); - let schema: Schema = (**input.schema()).clone().into(); + let schema = Arc::clone(input.schema().inner()); // Note: the DataType passed here is ignored for the purposes of writing and inferred instead // from the schema of the RecordBatch being written. This allows COPY statements to specify only @@ -551,7 +546,7 @@ impl DefaultPhysicalPlanner { object_store_url, table_paths: vec![parsed_url], file_group: FileGroup::default(), - output_schema: Arc::new(schema), + output_schema: schema, table_partition_cols, insert_op: InsertOp::Append, keep_partition_by_columns, @@ -931,7 +926,7 @@ impl DefaultPhysicalPlanner { .. }) => { let input = children.one()?; - let schema = SchemaRef::new(schema.as_ref().to_owned().into()); + let schema = Arc::clone(schema.inner()); let list_column_indices = list_type_columns .iter() .map(|(index, unnesting)| ListUnnest { @@ -1639,7 +1634,7 @@ pub fn create_window_expr_with_name( execution_props: &ExecutionProps, ) -> Result> { let name = name.into(); - let physical_schema: &Schema = &logical_schema.into(); + let physical_schema = Arc::clone(logical_schema.inner()); match e { Expr::WindowFunction(window_fun) => { let WindowFunction { @@ -2031,7 +2026,7 @@ impl DefaultPhysicalPlanner { session_state: &SessionState, ) -> Result> { let input = self.create_physical_plan(&a.input, session_state).await?; - let schema = SchemaRef::new((*a.schema).clone().into()); + let schema = Arc::clone(a.schema.inner()); let show_statistics = session_state.config_options().explain.show_statistics; Ok(Arc::new(AnalyzeExec::new( a.verbose, @@ -2382,6 +2377,7 @@ mod tests { use crate::execution::session_state::SessionStateBuilder; use arrow::array::{ArrayRef, DictionaryArray, Int32Array}; use arrow::datatypes::{DataType, Field, Int32Type}; + use arrow_schema::SchemaRef; use datafusion_common::config::ConfigOptions; use datafusion_common::{ assert_contains, DFSchemaRef, TableReference, ToDFSchema as _, diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index 299b73ccbe40..7149c5b0bd8c 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -185,7 +185,7 @@ impl TableProviderFactory for TestTableFactory { ) -> Result> { Ok(Arc::new(TestTableProvider { url: cmd.location.to_string(), - schema: Arc::new(cmd.schema.as_ref().into()), + schema: Arc::clone(cmd.schema.inner()), })) } } diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index aa984775e457..0820d444f642 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -27,8 +27,7 @@ use arrow::array::{ }; use arrow::buffer::ScalarBuffer; use arrow::datatypes::{ - DataType, Field, Float32Type, Int32Type, Schema, SchemaRef, UInt64Type, UnionFields, - UnionMode, + DataType, Field, Float32Type, Int32Type, Schema, UInt64Type, UnionFields, UnionMode, }; use arrow::error::ArrowError; use arrow::util::pretty::pretty_format_batches; @@ -119,8 +118,7 @@ pub fn table_with_constraints() -> Arc { } async fn assert_logical_expr_schema_eq_physical_expr_schema(df: DataFrame) -> Result<()> { - let logical_expr_dfschema = df.schema(); - let logical_expr_schema = SchemaRef::from(logical_expr_dfschema.to_owned()); + let logical_expr_schema = Arc::clone(df.schema().inner()); let batches = df.collect().await?; let physical_expr_schema = batches[0].schema(); assert_eq!(logical_expr_schema, physical_expr_schema); diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs b/datafusion/core/tests/fuzz_cases/window_fuzz.rs index 9da92f7cd49a..65a41d39d3c5 100644 --- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs @@ -286,7 +286,7 @@ async fn bounded_window_causal_non_causal() -> Result<()> { &partitionby_exprs, &[], Arc::new(window_frame), - &extended_schema, + extended_schema, false, false, None, @@ -660,7 +660,7 @@ async fn run_window_test( &partitionby_exprs, &orderby_exprs.clone(), Arc::new(window_frame.clone()), - &extended_schema, + Arc::clone(&extended_schema), false, false, None, @@ -680,7 +680,7 @@ async fn run_window_test( &partitionby_exprs, &orderby_exprs, Arc::new(window_frame.clone()), - &extended_schema, + extended_schema, false, false, None, diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs index a19dd7ace977..3858e70eaf3e 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs @@ -3683,7 +3683,7 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> { &partition_by, &[], case.window_frame, - input_schema.as_ref(), + Arc::clone(&input_schema), false, false, None, diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index 7c9fb9de5340..e6f005678dec 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -263,7 +263,7 @@ pub fn bounded_window_exec_with_partition( partition_by, &sort_exprs, Arc::new(WindowFrame::new(Some(false))), - schema.as_ref(), + schema, false, false, None, diff --git a/datafusion/core/tests/sql/sql_api.rs b/datafusion/core/tests/sql/sql_api.rs index ec086bcc50c7..b87afd27ddea 100644 --- a/datafusion/core/tests/sql/sql_api.rs +++ b/datafusion/core/tests/sql/sql_api.rs @@ -84,8 +84,8 @@ async fn dml_output_schema() { ctx.sql("CREATE TABLE test (x int)").await.unwrap(); let sql = "INSERT INTO test VALUES (1)"; let df = ctx.sql(sql).await.unwrap(); - let count_schema = Schema::new(vec![Field::new("count", DataType::UInt64, false)]); - assert_eq!(Schema::from(df.schema()), count_schema); + let count_schema = &Schema::new(vec![Field::new("count", DataType::UInt64, false)]); + assert_eq!(df.schema().as_arrow(), count_schema); } #[tokio::test] diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index c844bbd5384b..e803e3534130 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -482,7 +482,7 @@ impl ExprSchemable for Expr { Ok(Arc::new(Field::new(&schema_name, DataType::Boolean, false))) } Expr::ScalarSubquery(subquery) => { - Ok(Arc::new(subquery.subquery.schema().field(0).clone())) + Ok(Arc::clone(&subquery.subquery.schema().fields()[0])) } Expr::BinaryExpr(BinaryExpr { ref left, diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index ac699b9d30c3..29d94f8c6d7e 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1970,6 +1970,7 @@ pub fn table_scan_with_filter_and_fetch( } pub fn table_source(table_schema: &Schema) -> Arc { + // TODO should we take SchemaRef and avoid cloning? let table_schema = Arc::new(table_schema.clone()); Arc::new(LogicalTableSource { table_schema, @@ -1981,6 +1982,7 @@ pub fn table_source_with_constraints( table_schema: &Schema, constraints: Constraints, ) -> Arc { + // TODO should we take SchemaRef and avoid cloning? let table_schema = Arc::new(table_schema.clone()); Arc::new(LogicalTableSource { table_schema, diff --git a/datafusion/expr/src/logical_plan/display.rs b/datafusion/expr/src/logical_plan/display.rs index cc3fbad7b0c2..174ab28a1e6d 100644 --- a/datafusion/expr/src/logical_plan/display.rs +++ b/datafusion/expr/src/logical_plan/display.rs @@ -72,11 +72,7 @@ impl<'n> TreeNodeVisitor<'n> for IndentVisitor<'_, '_> { write!(self.f, "{:indent$}", "", indent = self.indent * 2)?; write!(self.f, "{}", plan.display())?; if self.with_schema { - write!( - self.f, - " {}", - display_schema(&plan.schema().as_ref().to_owned().into()) - )?; + write!(self.f, " {}", display_schema(plan.schema().as_arrow()))?; } self.indent += 1; @@ -196,7 +192,7 @@ impl<'n> TreeNodeVisitor<'n> for GraphvizVisitor<'_, '_> { format!( r"{}\nSchema: {}", plan.display(), - display_schema(&plan.schema().as_ref().to_owned().into()) + display_schema(plan.schema().as_arrow()) ) } else { format!("{}", plan.display()) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index c271e48a01f7..3cc032277405 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2276,10 +2276,10 @@ impl SubqueryAlias { let func_dependencies = plan.schema().functional_dependencies().clone(); let schema = DFSchema::from_unqualified_fields(fields, meta_data)?; - let schema = Schema::from(schema); + let schema = schema.as_arrow(); let schema = DFSchemaRef::new( - DFSchema::try_from_qualified_schema(alias.clone(), &schema)? + DFSchema::try_from_qualified_schema(alias.clone(), schema)? .with_functional_dependencies(func_dependencies)?, ); Ok(SubqueryAlias { @@ -2702,7 +2702,7 @@ impl TableScan { let df_schema = DFSchema::new_with_metadata( p.iter() .map(|i| { - (Some(table_name.clone()), Arc::new(schema.field(*i).clone())) + (Some(table_name.clone()), Arc::clone(&schema.fields()[*i])) }) .collect(), schema.metadata.clone(), diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index fb491341f81d..73df60c42e96 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -111,7 +111,7 @@ pub fn create_physical_expr( input_dfschema: &DFSchema, execution_props: &ExecutionProps, ) -> Result> { - let input_schema: &Schema = &input_dfschema.into(); + let input_schema = input_dfschema.as_arrow(); match e { Expr::Alias(Alias { expr, metadata, .. }) => { @@ -407,6 +407,7 @@ where /// Convert a logical expression to a physical expression (without any simplification, etc) pub fn logical2physical(expr: &Expr, schema: &Schema) -> Arc { + // TODO this makes a deep copy of the Schema. Should take SchemaRef instead and avoid deep copy let df_schema = schema.clone().to_dfschema().unwrap(); let execution_props = ExecutionProps::new(); create_physical_expr(expr, &df_schema, &execution_props).unwrap() diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index e51b2d3f569c..e1946c8b2be6 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -1375,7 +1375,7 @@ mod tests { &partitionby_exprs, &orderby_exprs, Arc::new(window_frame), - &input.schema(), + input.schema(), false, false, None, diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index dccd9200fc77..4b4e018fd561 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -96,7 +96,7 @@ pub fn create_window_expr( partition_by: &[Arc], order_by: &[PhysicalSortExpr], window_frame: Arc, - input_schema: &Schema, + input_schema: SchemaRef, ignore_nulls: bool, distinct: bool, filter: Option>, @@ -105,7 +105,7 @@ pub fn create_window_expr( WindowFunctionDefinition::AggregateUDF(fun) => { let aggregate = if distinct { AggregateExprBuilder::new(Arc::clone(fun), args.to_vec()) - .schema(Arc::new(input_schema.clone())) + .schema(input_schema) .alias(name) .with_ignore_nulls(ignore_nulls) .distinct() @@ -113,7 +113,7 @@ pub fn create_window_expr( .map(Arc::new)? } else { AggregateExprBuilder::new(Arc::clone(fun), args.to_vec()) - .schema(Arc::new(input_schema.clone())) + .schema(input_schema) .alias(name) .with_ignore_nulls(ignore_nulls) .build() @@ -128,7 +128,7 @@ pub fn create_window_expr( ) } WindowFunctionDefinition::WindowUDF(fun) => Arc::new(StandardWindowExpr::new( - create_udwf_window_expr(fun, args, input_schema, name, ignore_nulls)?, + create_udwf_window_expr(fun, args, &input_schema, name, ignore_nulls)?, partition_by, order_by, window_frame, @@ -814,7 +814,7 @@ mod tests { &[], &[], Arc::new(WindowFrame::new(None)), - schema.as_ref(), + schema, false, false, None, diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 85de56e97280..910a3fd73c60 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -177,7 +177,7 @@ pub fn parse_physical_window_expr( &partition_by, &order_by, Arc::new(window_frame), - &extended_schema, + extended_schema, proto.ignore_nulls, proto.distinct, None, diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 5e92dbe227fd..f0f87ffe1e60 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -1094,8 +1094,8 @@ fn rewrite_expr_to_prunable( Ok((Arc::clone(column_expr), op, Arc::clone(scalar_expr))) } else if let Some(cast) = column_expr_any.downcast_ref::() { // `cast(col) op lit()` - let arrow_schema: SchemaRef = schema.clone().into(); - let from_type = cast.expr().data_type(&arrow_schema)?; + let arrow_schema = schema.as_arrow(); + let from_type = cast.expr().data_type(arrow_schema)?; verify_support_type_for_prune(&from_type, cast.cast_type())?; let (left, op, right) = rewrite_expr_to_prunable(cast.expr(), op, scalar_expr, schema)?; @@ -1109,8 +1109,8 @@ fn rewrite_expr_to_prunable( column_expr_any.downcast_ref::() { // `try_cast(col) op lit()` - let arrow_schema: SchemaRef = schema.clone().into(); - let from_type = try_cast.expr().data_type(&arrow_schema)?; + let arrow_schema = schema.as_arrow(); + let from_type = try_cast.expr().data_type(arrow_schema)?; verify_support_type_for_prune(&from_type, try_cast.cast_type())?; let (left, op, right) = rewrite_expr_to_prunable(try_cast.expr(), op, scalar_expr, schema)?; diff --git a/datafusion/sql/src/cte.rs b/datafusion/sql/src/cte.rs index 3650aea9c3c2..aceec676761c 100644 --- a/datafusion/sql/src/cte.rs +++ b/datafusion/sql/src/cte.rs @@ -19,7 +19,6 @@ use std::sync::Arc; use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; -use arrow::datatypes::Schema; use datafusion_common::{ not_impl_err, plan_err, tree_node::{TreeNode, TreeNodeRecursion}, @@ -135,10 +134,9 @@ impl SqlToRel<'_, S> { // ---------- Step 2: Create a temporary relation ------------------ // Step 2.1: Create a table source for the temporary relation - let work_table_source = self.context_provider.create_cte_work_table( - &cte_name, - Arc::new(Schema::from(static_plan.schema().as_ref())), - )?; + let work_table_source = self + .context_provider + .create_cte_work_table(&cte_name, Arc::clone(static_plan.schema().inner()))?; // Step 2.2: Create a temporary relation logical plan that will be used // as the input to the recursive term diff --git a/datafusion/substrait/tests/cases/substrait_validations.rs b/datafusion/substrait/tests/cases/substrait_validations.rs index a31b3ca385e9..c8cc3fe9940c 100644 --- a/datafusion/substrait/tests/cases/substrait_validations.rs +++ b/datafusion/substrait/tests/cases/substrait_validations.rs @@ -51,7 +51,7 @@ mod tests { let ctx = SessionContext::new(); ctx.register_table( table_ref, - Arc::new(EmptyTable::new(df_schema.inner().clone())), + Arc::new(EmptyTable::new(Arc::clone(df_schema.inner()))), )?; Ok(ctx) } diff --git a/datafusion/substrait/tests/utils.rs b/datafusion/substrait/tests/utils.rs index e3e3ec3fab01..f84594312b63 100644 --- a/datafusion/substrait/tests/utils.rs +++ b/datafusion/substrait/tests/utils.rs @@ -150,7 +150,7 @@ pub mod test { let df_schema = from_substrait_named_struct(self.consumer, substrait_schema)? .replace_qualifier(table_reference.clone()); - let table = EmptyTable::new(df_schema.inner().clone()); + let table = EmptyTable::new(Arc::clone(df_schema.inner())); self.schemas.push((table_reference, Arc::new(table))); Ok(()) }