Skip to content

Commit 4ea7601

Browse files
authored
Avoid redundant Schema clones (#17643)
* Collocate variants of From DFSchema to Schema * Remove duplicated logic for obtaining Schema from DFSchema * Remove Arc clone in hash_nested_array * Avoid redundant Schema clones * Avoid some Field clones * make arc clones explicit * retract the new From * empty: roll the dice 🎲
1 parent eeec017 commit 4ea7601

File tree

30 files changed

+67
-101
lines changed

30 files changed

+67
-101
lines changed

datafusion-examples/examples/expr_api.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -519,7 +519,7 @@ fn type_coercion_demo() -> Result<()> {
519519
)?;
520520
let i8_array = Int8Array::from_iter_values(vec![0, 1, 2]);
521521
let batch = RecordBatch::try_new(
522-
Arc::new(df_schema.as_arrow().to_owned()),
522+
Arc::clone(df_schema.inner()),
523523
vec![Arc::new(i8_array) as _],
524524
)?;
525525

datafusion-examples/examples/flight/flight_server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ impl FlightService for FlightServiceImpl {
9898
let df = ctx.sql(sql).await.map_err(to_tonic_err)?;
9999

100100
// execute the query
101-
let schema = df.schema().clone().into();
101+
let schema = Arc::clone(df.schema().inner());
102102
let results = df.collect().await.map_err(to_tonic_err)?;
103103
if results.is_empty() {
104104
return Err(Status::internal("There were no results from ticket"));

datafusion-examples/examples/flight/flight_sql_server.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -395,10 +395,8 @@ impl FlightSqlService for FlightSqlServiceImpl {
395395
let plan_uuid = Uuid::new_v4().hyphenated().to_string();
396396
self.statements.insert(plan_uuid.clone(), plan.clone());
397397

398-
let plan_schema = plan.schema();
399-
400-
let arrow_schema = (&**plan_schema).into();
401-
let message = SchemaAsIpc::new(&arrow_schema, &IpcWriteOptions::default())
398+
let arrow_schema = plan.schema().as_arrow();
399+
let message = SchemaAsIpc::new(arrow_schema, &IpcWriteOptions::default())
402400
.try_into()
403401
.map_err(|e| status!("Unable to serialize schema", e))?;
404402
let IpcMessage(schema_bytes) = message;

datafusion/catalog/src/stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ impl TableProviderFactory for StreamTableFactory {
5353
state: &dyn Session,
5454
cmd: &CreateExternalTable,
5555
) -> Result<Arc<dyn TableProvider>> {
56-
let schema: SchemaRef = Arc::new(cmd.schema.as_ref().into());
56+
let schema: SchemaRef = Arc::clone(cmd.schema.inner());
5757
let location = cmd.location.clone();
5858
let encoding = cmd.file_type.parse()?;
5959
let header = if let Ok(opt) = cmd

datafusion/catalog/src/view.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ impl ViewTable {
5151
/// Notes: the `LogicalPlan` is not validated or type coerced. If this is
5252
/// needed it should be done after calling this function.
5353
pub fn new(logical_plan: LogicalPlan, definition: Option<String>) -> Self {
54-
let table_schema = logical_plan.schema().as_ref().to_owned().into();
54+
let table_schema = Arc::clone(logical_plan.schema().inner());
5555
Self {
5656
logical_plan,
5757
table_schema,

datafusion/common/src/dfschema.rs

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ pub type DFSchemaRef = Arc<DFSchema>;
101101
/// let df_schema = DFSchema::from_unqualified_fields(vec![
102102
/// Field::new("c1", arrow::datatypes::DataType::Int32, false),
103103
/// ].into(),HashMap::new()).unwrap();
104-
/// let schema = Schema::from(df_schema);
104+
/// let schema: &Schema = df_schema.as_arrow();
105105
/// assert_eq!(schema.fields().len(), 1);
106106
/// ```
107107
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -594,7 +594,7 @@ impl DFSchema {
594594
&self,
595595
arrow_schema: &Schema,
596596
) -> Result<()> {
597-
let self_arrow_schema: Schema = self.into();
597+
let self_arrow_schema = self.as_arrow();
598598
self_arrow_schema
599599
.fields()
600600
.iter()
@@ -1081,22 +1081,6 @@ fn format_simple_data_type(data_type: &DataType) -> String {
10811081
}
10821082
}
10831083

1084-
impl From<DFSchema> for Schema {
1085-
/// Convert DFSchema into a Schema
1086-
fn from(df_schema: DFSchema) -> Self {
1087-
let fields: Fields = df_schema.inner.fields.clone();
1088-
Schema::new_with_metadata(fields, df_schema.inner.metadata.clone())
1089-
}
1090-
}
1091-
1092-
impl From<&DFSchema> for Schema {
1093-
/// Convert DFSchema reference into a Schema
1094-
fn from(df_schema: &DFSchema) -> Self {
1095-
let fields: Fields = df_schema.inner.fields.clone();
1096-
Schema::new_with_metadata(fields, df_schema.inner.metadata.clone())
1097-
}
1098-
}
1099-
11001084
/// Allow DFSchema to be converted into an Arrow `&Schema`
11011085
impl AsRef<Schema> for DFSchema {
11021086
fn as_ref(&self) -> &Schema {
@@ -1138,12 +1122,6 @@ impl TryFrom<SchemaRef> for DFSchema {
11381122
}
11391123
}
11401124

1141-
impl From<DFSchema> for SchemaRef {
1142-
fn from(df_schema: DFSchema) -> Self {
1143-
SchemaRef::new(df_schema.into())
1144-
}
1145-
}
1146-
11471125
// Hashing refers to a subset of fields considered in PartialEq.
11481126
impl Hash for DFSchema {
11491127
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
@@ -1433,7 +1411,7 @@ mod tests {
14331411
#[test]
14341412
fn from_qualified_schema_into_arrow_schema() -> Result<()> {
14351413
let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1436-
let arrow_schema: Schema = schema.into();
1414+
let arrow_schema = schema.as_arrow();
14371415
let expected = "Field { name: \"c0\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, \
14381416
Field { name: \"c1\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }";
14391417
assert_eq!(expected, arrow_schema.to_string());

datafusion/common/src/scalar/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -876,8 +876,9 @@ impl Hash for ScalarValue {
876876
}
877877

878878
fn hash_nested_array<H: Hasher>(arr: ArrayRef, state: &mut H) {
879-
let arrays = vec![arr.to_owned()];
880-
let hashes_buffer = &mut vec![0; arr.len()];
879+
let len = arr.len();
880+
let arrays = vec![arr];
881+
let hashes_buffer = &mut vec![0; len];
881882
let random_state = ahash::RandomState::with_seeds(0, 0, 0, 0);
882883
let hashes = create_hashes(&arrays, &random_state, hashes_buffer).unwrap();
883884
// Hash back to std::hash::Hasher

datafusion/core/src/dataframe/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2440,8 +2440,7 @@ impl TableProvider for DataFrameTableProvider {
24402440
}
24412441

24422442
fn schema(&self) -> SchemaRef {
2443-
let schema: Schema = self.plan.schema().as_ref().into();
2444-
Arc::new(schema)
2443+
Arc::clone(self.plan.schema().inner())
24452444
}
24462445

24472446
fn table_type(&self) -> TableType {

datafusion/core/src/datasource/listing_table_factory.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use crate::datasource::listing::{
2727
};
2828
use crate::execution::context::SessionState;
2929

30-
use arrow::datatypes::{DataType, SchemaRef};
30+
use arrow::datatypes::DataType;
3131
use datafusion_common::{arrow_datafusion_err, plan_err, DataFusionError, ToDFSchema};
3232
use datafusion_common::{config_datafusion_err, Result};
3333
use datafusion_expr::CreateExternalTable;
@@ -105,7 +105,7 @@ impl TableProviderFactory for ListingTableFactory {
105105
.collect::<Vec<_>>(),
106106
)
107107
} else {
108-
let schema: SchemaRef = Arc::new(cmd.schema.as_ref().to_owned().into());
108+
let schema = Arc::clone(cmd.schema.inner());
109109
let table_partition_cols = cmd
110110
.table_partition_cols
111111
.iter()

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2018,14 +2018,14 @@ mod tests {
20182018
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
20192019
fs::create_dir(&out_dir).unwrap();
20202020
let df = ctx.sql("SELECT c1, c2 FROM test").await?;
2021-
let schema: Schema = df.schema().into();
2021+
let schema = Arc::clone(df.schema().inner());
20222022
// Register a listing table - this will use all files in the directory as data sources
20232023
// for the query
20242024
ctx.register_listing_table(
20252025
"my_table",
20262026
&out_dir,
20272027
listing_options,
2028-
Some(Arc::new(schema)),
2028+
Some(schema),
20292029
None,
20302030
)
20312031
.await

0 commit comments

Comments
 (0)