Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ fn type_coercion_demo() -> Result<()> {
)?;
let i8_array = Int8Array::from_iter_values(vec![0, 1, 2]);
let batch = RecordBatch::try_new(
Arc::new(df_schema.as_arrow().to_owned()),
Arc::clone(df_schema.inner()),
vec![Arc::new(i8_array) as _],
)?;

Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/flight/flight_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl FlightService for FlightServiceImpl {
let df = ctx.sql(sql).await.map_err(to_tonic_err)?;

// execute the query
let schema = df.schema().clone().into();
let schema = Arc::clone(df.schema().inner());
let results = df.collect().await.map_err(to_tonic_err)?;
if results.is_empty() {
return Err(Status::internal("There were no results from ticket"));
Expand Down
6 changes: 2 additions & 4 deletions datafusion-examples/examples/flight/flight_sql_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,10 +395,8 @@ impl FlightSqlService for FlightSqlServiceImpl {
let plan_uuid = Uuid::new_v4().hyphenated().to_string();
self.statements.insert(plan_uuid.clone(), plan.clone());

let plan_schema = plan.schema();

let arrow_schema = (&**plan_schema).into();
let message = SchemaAsIpc::new(&arrow_schema, &IpcWriteOptions::default())
let arrow_schema = plan.schema().as_arrow();
let message = SchemaAsIpc::new(arrow_schema, &IpcWriteOptions::default())
.try_into()
.map_err(|e| status!("Unable to serialize schema", e))?;
let IpcMessage(schema_bytes) = message;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/catalog/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl TableProviderFactory for StreamTableFactory {
state: &dyn Session,
cmd: &CreateExternalTable,
) -> Result<Arc<dyn TableProvider>> {
let schema: SchemaRef = Arc::new(cmd.schema.as_ref().into());
let schema: SchemaRef = Arc::clone(cmd.schema.inner());
let location = cmd.location.clone();
let encoding = cmd.file_type.parse()?;
let header = if let Ok(opt) = cmd
Expand Down
2 changes: 1 addition & 1 deletion datafusion/catalog/src/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl ViewTable {
/// Notes: the `LogicalPlan` is not validated or type coerced. If this is
/// needed it should be done after calling this function.
pub fn new(logical_plan: LogicalPlan, definition: Option<String>) -> Self {
let table_schema = logical_plan.schema().as_ref().to_owned().into();
let table_schema = Arc::clone(logical_plan.schema().inner());
Self {
logical_plan,
table_schema,
Expand Down
28 changes: 3 additions & 25 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub type DFSchemaRef = Arc<DFSchema>;
/// let df_schema = DFSchema::from_unqualified_fields(vec![
/// Field::new("c1", arrow::datatypes::DataType::Int32, false),
/// ].into(),HashMap::new()).unwrap();
/// let schema = Schema::from(df_schema);
/// let schema: &Schema = df_schema.as_arrow();
/// assert_eq!(schema.fields().len(), 1);
/// ```
#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -594,7 +594,7 @@ impl DFSchema {
&self,
arrow_schema: &Schema,
) -> Result<()> {
let self_arrow_schema: Schema = self.into();
let self_arrow_schema = self.as_arrow();
self_arrow_schema
.fields()
.iter()
Expand Down Expand Up @@ -1067,22 +1067,6 @@ fn format_simple_data_type(data_type: &DataType) -> String {
}
}

impl From<DFSchema> for Schema {
/// Convert DFSchema into a Schema
fn from(df_schema: DFSchema) -> Self {
let fields: Fields = df_schema.inner.fields.clone();
Schema::new_with_metadata(fields, df_schema.inner.metadata.clone())
}
}

impl From<&DFSchema> for Schema {
/// Convert DFSchema reference into a Schema
fn from(df_schema: &DFSchema) -> Self {
let fields: Fields = df_schema.inner.fields.clone();
Schema::new_with_metadata(fields, df_schema.inner.metadata.clone())
}
}

/// Allow DFSchema to be converted into an Arrow `&Schema`
impl AsRef<Schema> for DFSchema {
fn as_ref(&self) -> &Schema {
Expand Down Expand Up @@ -1120,12 +1104,6 @@ impl TryFrom<SchemaRef> for DFSchema {
}
}

impl From<DFSchema> for SchemaRef {
fn from(df_schema: DFSchema) -> Self {
SchemaRef::new(df_schema.into())
}
}

// Hashing refers to a subset of fields considered in PartialEq.
impl Hash for DFSchema {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
Expand Down Expand Up @@ -1415,7 +1393,7 @@ mod tests {
#[test]
fn from_qualified_schema_into_arrow_schema() -> Result<()> {
let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
let arrow_schema: Schema = schema.into();
let arrow_schema = schema.as_arrow();
let expected = "Field { name: \"c0\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, \
Field { name: \"c1\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }";
assert_eq!(expected, arrow_schema.to_string());
Expand Down
5 changes: 3 additions & 2 deletions datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -834,8 +834,9 @@ impl Hash for ScalarValue {
}

fn hash_nested_array<H: Hasher>(arr: ArrayRef, state: &mut H) {
let arrays = vec![arr.to_owned()];
let hashes_buffer = &mut vec![0; arr.len()];
let len = arr.len();
let arrays = vec![arr];
let hashes_buffer = &mut vec![0; len];
let random_state = ahash::RandomState::with_seeds(0, 0, 0, 0);
let hashes = create_hashes(&arrays, &random_state, hashes_buffer).unwrap();
// Hash back to std::hash::Hasher
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2440,8 +2440,7 @@ impl TableProvider for DataFrameTableProvider {
}

fn schema(&self) -> SchemaRef {
let schema: Schema = self.plan.schema().as_ref().into();
Arc::new(schema)
Arc::clone(self.plan.schema().inner())
}

fn table_type(&self) -> TableType {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::datasource::listing::{
};
use crate::execution::context::SessionState;

use arrow::datatypes::{DataType, SchemaRef};
use arrow::datatypes::DataType;
use datafusion_common::{arrow_datafusion_err, plan_err, DataFusionError, ToDFSchema};
use datafusion_common::{config_datafusion_err, Result};
use datafusion_expr::CreateExternalTable;
Expand Down Expand Up @@ -105,7 +105,7 @@ impl TableProviderFactory for ListingTableFactory {
.collect::<Vec<_>>(),
)
} else {
let schema: SchemaRef = Arc::new(cmd.schema.as_ref().to_owned().into());
let schema = Arc::clone(cmd.schema.inner());
let table_partition_cols = cmd
.table_partition_cols
.iter()
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2018,14 +2018,14 @@ mod tests {
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
fs::create_dir(&out_dir).unwrap();
let df = ctx.sql("SELECT c1, c2 FROM test").await?;
let schema: Schema = df.schema().into();
let schema = Arc::clone(df.schema().inner());
// Register a listing table - this will use all files in the directory as data sources
// for the query
ctx.register_listing_table(
"my_table",
&out_dir,
listing_options,
Some(Arc::new(schema)),
Some(schema),
None,
)
.await
Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ impl SessionContext {
(true, false, Ok(_)) => self.return_empty_dataframe(),
(false, true, Ok(_)) => {
self.deregister_table(name.clone())?;
let schema = Arc::new(input.schema().as_ref().into());
let schema = Arc::clone(input.schema().inner());
let physical = DataFrame::new(self.state(), input);

let batches: Vec<_> = physical.collect_partitioned().await?;
Expand All @@ -873,8 +873,7 @@ impl SessionContext {
exec_err!("'IF NOT EXISTS' cannot coexist with 'REPLACE'")
}
(_, _, Err(_)) => {
let df_schema = input.schema();
let schema = Arc::new(df_schema.as_ref().into());
let schema = Arc::clone(input.schema().inner());
let physical = DataFrame::new(self.state(), input);

let batches: Vec<_> = physical.collect_partitioned().await?;
Expand Down
28 changes: 12 additions & 16 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use crate::schema_equivalence::schema_satisfied_by;

use arrow::array::{builder::StringBuilder, RecordBatch};
use arrow::compute::SortOptions;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::datatypes::Schema;
use datafusion_catalog::ScanArgs;
use datafusion_common::display::ToStringifiedPlan;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
Expand Down Expand Up @@ -466,7 +466,6 @@ impl DefaultPhysicalPlanner {
Arc::clone(res.plan())
}
LogicalPlan::Values(Values { values, schema }) => {
let exec_schema = schema.as_ref().to_owned().into();
let exprs = values
.iter()
.map(|row| {
Expand All @@ -477,27 +476,23 @@ impl DefaultPhysicalPlanner {
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()
})
.collect::<Result<Vec<_>>>()?;
MemorySourceConfig::try_new_as_values(SchemaRef::new(exec_schema), exprs)?
MemorySourceConfig::try_new_as_values(Arc::clone(schema.inner()), exprs)?
as _
}
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema,
}) => Arc::new(EmptyExec::new(SchemaRef::new(
schema.as_ref().to_owned().into(),
))),
}) => Arc::new(EmptyExec::new(Arc::clone(schema.inner()))),
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: true,
schema,
}) => Arc::new(PlaceholderRowExec::new(SchemaRef::new(
schema.as_ref().to_owned().into(),
))),
}) => Arc::new(PlaceholderRowExec::new(Arc::clone(schema.inner()))),
LogicalPlan::DescribeTable(DescribeTable {
schema,
output_schema,
}) => {
let output_schema: Schema = output_schema.as_ref().into();
self.plan_describe(Arc::clone(schema), Arc::new(output_schema))?
let output_schema = Arc::clone(output_schema.inner());
self.plan_describe(Arc::clone(schema), output_schema)?
}

// 1 Child
Expand All @@ -514,7 +509,7 @@ impl DefaultPhysicalPlanner {
let parsed_url = ListingTableUrl::parse(output_url)?;
let object_store_url = parsed_url.object_store();

let schema: Schema = (**input.schema()).clone().into();
let schema = Arc::clone(input.schema().inner());

// Note: the DataType passed here is ignored for the purposes of writing and inferred instead
// from the schema of the RecordBatch being written. This allows COPY statements to specify only
Expand Down Expand Up @@ -551,7 +546,7 @@ impl DefaultPhysicalPlanner {
object_store_url,
table_paths: vec![parsed_url],
file_group: FileGroup::default(),
output_schema: Arc::new(schema),
output_schema: schema,
table_partition_cols,
insert_op: InsertOp::Append,
keep_partition_by_columns,
Expand Down Expand Up @@ -931,7 +926,7 @@ impl DefaultPhysicalPlanner {
..
}) => {
let input = children.one()?;
let schema = SchemaRef::new(schema.as_ref().to_owned().into());
let schema = Arc::clone(schema.inner());
let list_column_indices = list_type_columns
.iter()
.map(|(index, unnesting)| ListUnnest {
Expand Down Expand Up @@ -1639,7 +1634,7 @@ pub fn create_window_expr_with_name(
execution_props: &ExecutionProps,
) -> Result<Arc<dyn WindowExpr>> {
let name = name.into();
let physical_schema: &Schema = &logical_schema.into();
let physical_schema = Arc::clone(logical_schema.inner());
match e {
Expr::WindowFunction(window_fun) => {
let WindowFunction {
Expand Down Expand Up @@ -2031,7 +2026,7 @@ impl DefaultPhysicalPlanner {
session_state: &SessionState,
) -> Result<Arc<dyn ExecutionPlan>> {
let input = self.create_physical_plan(&a.input, session_state).await?;
let schema = SchemaRef::new((*a.schema).clone().into());
let schema = Arc::clone(a.schema.inner());
let show_statistics = session_state.config_options().explain.show_statistics;
Ok(Arc::new(AnalyzeExec::new(
a.verbose,
Expand Down Expand Up @@ -2382,6 +2377,7 @@ mod tests {
use crate::execution::session_state::SessionStateBuilder;
use arrow::array::{ArrayRef, DictionaryArray, Int32Array};
use arrow::datatypes::{DataType, Field, Int32Type};
use arrow_schema::SchemaRef;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{
assert_contains, DFSchemaRef, TableReference, ToDFSchema as _,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl TableProviderFactory for TestTableFactory {
) -> Result<Arc<dyn TableProvider>> {
Ok(Arc::new(TestTableProvider {
url: cmd.location.to_string(),
schema: Arc::new(cmd.schema.as_ref().into()),
schema: Arc::clone(cmd.schema.inner()),
}))
}
}
Expand Down
6 changes: 2 additions & 4 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ use arrow::array::{
};
use arrow::buffer::ScalarBuffer;
use arrow::datatypes::{
DataType, Field, Float32Type, Int32Type, Schema, SchemaRef, UInt64Type, UnionFields,
UnionMode,
DataType, Field, Float32Type, Int32Type, Schema, UInt64Type, UnionFields, UnionMode,
};
use arrow::error::ArrowError;
use arrow::util::pretty::pretty_format_batches;
Expand Down Expand Up @@ -119,8 +118,7 @@ pub fn table_with_constraints() -> Arc<dyn TableProvider> {
}

async fn assert_logical_expr_schema_eq_physical_expr_schema(df: DataFrame) -> Result<()> {
let logical_expr_dfschema = df.schema();
let logical_expr_schema = SchemaRef::from(logical_expr_dfschema.to_owned());
let logical_expr_schema = Arc::clone(df.schema().inner());
let batches = df.collect().await?;
let physical_expr_schema = batches[0].schema();
assert_eq!(logical_expr_schema, physical_expr_schema);
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/tests/fuzz_cases/window_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ async fn bounded_window_causal_non_causal() -> Result<()> {
&partitionby_exprs,
&[],
Arc::new(window_frame),
&extended_schema,
extended_schema,
false,
false,
None,
Expand Down Expand Up @@ -660,7 +660,7 @@ async fn run_window_test(
&partitionby_exprs,
&orderby_exprs.clone(),
Arc::new(window_frame.clone()),
&extended_schema,
Arc::clone(&extended_schema),
false,
false,
None,
Expand All @@ -680,7 +680,7 @@ async fn run_window_test(
&partitionby_exprs,
&orderby_exprs,
Arc::new(window_frame.clone()),
&extended_schema,
extended_schema,
false,
false,
None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3683,7 +3683,7 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> {
&partition_by,
&[],
case.window_frame,
input_schema.as_ref(),
Arc::clone(&input_schema),
false,
false,
None,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ pub fn bounded_window_exec_with_partition(
partition_by,
&sort_exprs,
Arc::new(WindowFrame::new(Some(false))),
schema.as_ref(),
schema,
false,
false,
None,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/sql/sql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ async fn dml_output_schema() {
ctx.sql("CREATE TABLE test (x int)").await.unwrap();
let sql = "INSERT INTO test VALUES (1)";
let df = ctx.sql(sql).await.unwrap();
let count_schema = Schema::new(vec![Field::new("count", DataType::UInt64, false)]);
assert_eq!(Schema::from(df.schema()), count_schema);
let count_schema = &Schema::new(vec![Field::new("count", DataType::UInt64, false)]);
assert_eq!(df.schema().as_arrow(), count_schema);
}

#[tokio::test]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ impl ExprSchemable for Expr {
Ok(Arc::new(Field::new(&schema_name, DataType::Boolean, false)))
}
Expr::ScalarSubquery(subquery) => {
Ok(Arc::new(subquery.subquery.schema().field(0).clone()))
Ok(Arc::clone(&subquery.subquery.schema().fields()[0]))
}
Expr::BinaryExpr(BinaryExpr {
ref left,
Expand Down
Loading