Closed
Description
Describe the bug
Basically I want
- Schema evolution so if we add more fields, I don't have to remigrate all the old data
- Compaction support so I can take multiple parquet files of evolving schemas and merge them into one
- ListingTable support to query all these various types of evolving schema parquet files
I've written a repro below. I think in the past "schema evolution" has been too vague and hard to define. Perhaps the following integration test can be used as a "we are done with v1 of schema evolution" test? cc @alamb
To Reproduce
use std::fs;
use std::sync::Arc;
use datafusion::prelude::*;
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::array::{Array, StringArray, StructArray, TimestampMillisecondArray};
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl};
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::dataframe::DataFrameWriteOptions;
#[tokio::test]
async fn test_schema_evolution_with_compaction() -> Result<(), Box<dyn std::error::Error>> {
let ctx = SessionContext::new();
let schema1 = Arc::new(Schema::new(vec![
Field::new("event", DataType::Utf8, false),
Field::new("timestamp_utc", DataType::Timestamp(TimeUnit::Millisecond, None), false),
]));
let batch1 = RecordBatch::try_new(
schema1.clone(),
vec![
Arc::new(StringArray::from(vec!["event1"])),
Arc::new(TimestampMillisecondArray::from(vec![1640995200000]))
]
)?;
let path1 = "test_data1.parquet";
let _ = fs::remove_file(path1);
let df1 = ctx.read_batch(batch1)?;
df1.write_parquet(
path1,
DataFrameWriteOptions::default()
.with_single_file_output(true)
.with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
None
).await?;
let schema2 = Arc::new(Schema::new(vec![
Field::new("event", DataType::Utf8, false),
Field::new("timestamp_utc", DataType::Timestamp(TimeUnit::Millisecond, None), false),
Field::new("data", DataType::Struct(vec![
Field::new("some_data", DataType::Utf8, false)
].into()), false),
]));
let batch2 = RecordBatch::try_new(
schema2.clone(),
vec![
Arc::new(StringArray::from(vec!["event2"])),
Arc::new(TimestampMillisecondArray::from(vec![1641081600000])),
Arc::new(StructArray::from(vec![(
Arc::new(Field::new("some_data", DataType::Utf8, false)),
Arc::new(StringArray::from(vec!["additional_data"])) as Arc<dyn Array>
)]))
]
)?;
let path2 = "test_data2.parquet";
let _ = fs::remove_file(path2);
let df2 = ctx.read_batch(batch2)?;
df2.write_parquet(
path2,
DataFrameWriteOptions::default()
.with_single_file_output(true)
.with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
None
).await?;
let schema3 = Arc::new(Schema::new(vec![
Field::new("event", DataType::Utf8, false),
Field::new("timestamp_utc", DataType::Timestamp(TimeUnit::Millisecond, None), false),
Field::new("data", DataType::Struct(vec![
Field::new("even_more_nested_data", DataType::Struct(vec![
Field::new("some_data", DataType::Utf8, false)
].into()), false)
].into()), false),
]));
let batch3 = RecordBatch::try_new(
schema3.clone(),
vec![
Arc::new(StringArray::from(vec!["event3"])),
Arc::new(TimestampMillisecondArray::from(vec![1641168000000])),
Arc::new(StructArray::from(vec![(
Arc::new(Field::new("even_more_nested_data", DataType::Struct(vec![
Field::new("some_data", DataType::Utf8, false)
].into()), false)),
Arc::new(StructArray::from(vec![(
Arc::new(Field::new("some_data", DataType::Utf8, false)),
Arc::new(StringArray::from(vec!["deeply_nested_value"])) as Arc<dyn Array>
)])) as Arc<dyn Array>
)]))
]
)?;
let path3 = "test_data3.parquet";
let _ = fs::remove_file(path3);
let df3 = ctx.read_batch(batch3)?;
df3.write_parquet(
path3,
DataFrameWriteOptions::default()
.with_single_file_output(true)
.with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
None
).await?;
let paths_str = vec![path1.to_string(), path2.to_string(), path3.to_string()];
let config = ListingTableConfig::new_with_multi_paths(
paths_str
.into_iter()
.map(|p| ListingTableUrl::parse(&p))
.collect::<Result<Vec<_>, _>>()?
)
.with_schema(schema3.as_ref().clone().into())
.infer(&ctx.state()).await?;
let config = ListingTableConfig {
options: Some(ListingOptions {
file_sort_order: vec![vec![
col("timestamp_utc").sort(true, true),
]],
..config.options.unwrap_or_else(|| ListingOptions::new(Arc::new(ParquetFormat::default())))
}),
..config
};
let listing_table = ListingTable::try_new(config)?;
ctx.register_table("events", Arc::new(listing_table))?;
let df = ctx.sql("SELECT * FROM events ORDER BY event").await?;
let results = df.clone().collect().await?;
assert_eq!(results[0].num_rows(), 3);
let compacted_path = "test_data_compacted.parquet";
let _ = fs::remove_file(compacted_path);
df.write_parquet(
compacted_path,
DataFrameWriteOptions::default()
.with_single_file_output(true)
.with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
None
).await?;
let new_ctx = SessionContext::new();
let config = ListingTableConfig::new_with_multi_paths(vec![ListingTableUrl::parse(compacted_path)?])
.with_schema(schema3.as_ref().clone().into())
.infer(&new_ctx.state()).await?;
let listing_table = ListingTable::try_new(config)?;
new_ctx.register_table("compacted_events", Arc::new(listing_table))?;
let df = new_ctx.sql("SELECT * FROM compacted_events ORDER BY event").await?;
let compacted_results = df.collect().await?;
assert_eq!(compacted_results[0].num_rows(), 3);
assert_eq!(results, compacted_results);
let _ = fs::remove_file(path1);
let _ = fs::remove_file(path2);
let _ = fs::remove_file(path3);
let _ = fs::remove_file(compacted_path);
Ok(())
}
Expected behavior
It gets all the way through
Additional context
No response