-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support default values for columns in SchemaAdapter #15220
Comments
I’ll note that this is currently broken without any new features. I can give an example later but basically you make a custom SchemaMapper that eg instead of filling in nulls fills in default values. That’s doable with current public APIs. Then you get different behavior between filter push down and not. |
Here's an example of how this functionality currently breaks with a custom SchemaAdapter: Exampleuse std::any::Any;
use std::sync::Arc;
use arrow::array::{BooleanArray, RecordBatch, StringArray};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::catalog::{Session, TableProvider};
use datafusion::common::{DFSchema, Result};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper};
use datafusion::execution::context::SessionContext;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::logical_expr::utils::conjunction;
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType};
use datafusion::parquet::arrow::ArrowWriter;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::lit;
use futures::StreamExt;
use object_store::memory::InMemory;
use object_store::path::Path;
use object_store::{ObjectStore, PutPayload};
#[tokio::main]
async fn main() -> Result<()> {
let (table_schema, batch) = create_sample_data();
let store = InMemory::new();
let buf = {
let mut buf = vec![];
let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).expect("creating writer");
writer.write(&batch).expect("Writing batch");
writer.close().unwrap();
buf
};
let path = Path::from("example.parquet");
let payload = PutPayload::from_bytes(buf.into());
store.put(&path, payload).await?;
let ctx = SessionContext::new();
ctx.runtime_env()
.register_object_store(ObjectStoreUrl::parse("memory://")?.as_ref(), Arc::new(store));
let table_provider = Arc::new(ExampleTableProvider::new(table_schema.clone(), false));
ctx.register_table("data", table_provider)?;
let batches = ctx
.sql("SELECT name FROM data WHERE NOT is_admin")
.await?
.collect()
.await?;
arrow::util::pretty::print_batches(&batches)?;
ctx.deregister_table("data")?;
let table_provider = Arc::new(ExampleTableProvider::new(table_schema, true));
ctx.register_table("data", table_provider)?;
let batches = ctx
.sql("SELECT name FROM data WHERE NOT is_admin")
.await?
.collect()
.await?;
arrow::util::pretty::print_batches(&batches)?;
Ok(())
}
fn create_sample_data() -> (SchemaRef, RecordBatch) {
let table_schema = Schema::new(vec![
Field::new("name", DataType::Utf8, false),
Field::new("is_admin", DataType::Boolean, false),
]);
let file_schema = Schema::new(vec![Field::new("name", DataType::Utf8, false)]);
// Create a record batch with the data
let batch = RecordBatch::try_new(
Arc::new(file_schema.clone()),
vec![Arc::new(StringArray::from(vec!["Alice", "Bob"]))],
)
.unwrap();
(Arc::new(table_schema), batch)
}
/// Custom TableProvider that uses a StructFieldRewriter
#[derive(Debug)]
struct ExampleTableProvider {
schema: SchemaRef,
pushdown_filters: bool,
}
impl ExampleTableProvider {
fn new(schema: SchemaRef, pushdown_filters: bool) -> Self {
Self {
schema,
pushdown_filters,
}
}
}
#[async_trait]
impl TableProvider for ExampleTableProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn table_type(&self) -> TableType {
TableType::Base
}
fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result<Vec<TableProviderFilterPushDown>> {
Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
}
async fn scan(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let schema = self.schema.clone();
let df_schema = DFSchema::try_from(schema.clone())?;
let filter = state.create_physical_expr(
conjunction(filters.iter().cloned()).unwrap_or_else(|| lit(true)),
&df_schema,
)?;
let parquet_source = ParquetSource::default()
.with_predicate(self.schema.clone(), filter)
.with_pushdown_filters(self.pushdown_filters)
.with_schema_adapter_factory(Arc::new(CustomSchemaAdapterFactory));
let object_store_url = ObjectStoreUrl::parse("memory://")?;
let store = state.runtime_env().object_store(object_store_url)?;
let mut files = vec![];
let mut listing = store.list(None);
while let Some(file) = listing.next().await {
if let Ok(file) = file {
files.push(file);
}
}
let file_group = files
.iter()
.map(|file| PartitionedFile::new(file.location.clone(), u64::try_from(file.size).expect("fits in a u64")))
.collect();
let file_scan_config =
FileScanConfig::new(ObjectStoreUrl::parse("memory://")?, schema, Arc::new(parquet_source))
.with_projection(projection.cloned())
.with_limit(limit)
.with_file_group(file_group);
Ok(file_scan_config.build())
}
}
#[derive(Debug)]
struct CustomSchemaMapper;
impl SchemaMapper for CustomSchemaMapper {
fn map_batch(&self, batch: RecordBatch) -> datafusion::common::Result<RecordBatch> {
// add an is_admin column to the batch
let is_admin = BooleanArray::from(vec![false, false]);
let mut new_columns = batch.columns().to_vec();
new_columns.push(Arc::new(is_admin));
let new_schema = Schema::new(vec![
Field::new("name", DataType::Utf8, false),
Field::new("is_admin", DataType::Boolean, false),
]);
let new_batch = RecordBatch::try_new(Arc::new(new_schema), new_columns)?;
Ok(new_batch)
}
fn map_partial_batch(&self, batch: RecordBatch) -> datafusion::common::Result<RecordBatch> {
Ok(batch)
}
}
#[derive(Debug)]
struct CustomSchemaAdapter;
impl SchemaAdapter for CustomSchemaAdapter {
fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
if index < file_schema.fields().len() {
Some(index)
} else {
None
}
}
fn map_schema(&self, file_schema: &Schema) -> datafusion::common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
let schema_mapper = Arc::new(CustomSchemaMapper);
let column_indices = (0..file_schema.fields().len()).collect();
Ok((schema_mapper, column_indices))
}
}
#[derive(Debug)]
struct CustomSchemaAdapterFactory;
impl SchemaAdapterFactory for CustomSchemaAdapterFactory {
fn create(&self, _projected_table_schema: SchemaRef, _table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
Box::new(CustomSchemaAdapter)
}
} The point is that predicate pushdown assumes that any column that is in the table schema but not the file schema can be treated as if it is all nulls, which is only true for the default SchemaAdapter. |
It was someone in delta.rs as I recall (@ion-elgreco perhaps)? |
Is your feature request related to a problem or challenge?
From conversation with Andrew a couple days ago he mentioned this was an open feature request however I could not find an issue. @alamb do you remember who else was asking for this?
We have an implementation of this internally, it actually is more generic because we use it to generate columns from other columns, but it covers the use case of default values and it would be easy to make that API simple.
Essentially we declare:
And then you pass in one or more
MissingColumnGeneratorFactory
intoSchemaAdapterFactory
.There was a lot of pain figuring out how to properly adjust projections to take into account the injected dependency columns, but we've done that work already on our end.
The other thing to note is that adjustments are needed in filter pushdown, specifically here:
datafusion/datafusion/datasource-parquet/src/row_filter.rs
Lines 355 to 384 in 8061485
This last bit applies no matter if simpler defaults are being generated or more complex derived columns.
The text was updated successfully, but these errors were encountered: