Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance Schema adapter to accommodate evolving struct #15295

Open
wants to merge 55 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
c8236ed
feat: implement NestedStructSchemaAdapter for handling schema evoluti…
kosiew Mar 18, 2025
afbe1ed
feat: enhance NestedStructSchemaAdapter with schema mapping capabilities
kosiew Mar 18, 2025
c774cab
test: add schema mapping test for NestedStructSchemaAdapter
kosiew Mar 18, 2025
5f5cd45
feat: implement NestedStructSchemaAdapterFactory for handling nested …
kosiew Mar 18, 2025
6065bc1
test: add unit test for NestedStructSchemaAdapterFactory to validate …
kosiew Mar 18, 2025
410f8d7
test: refactor test_create_appropriate_adapter for clarity and effici…
kosiew Mar 18, 2025
50cf134
feat: enhance create_appropriate_adapter to support nested schema tra…
kosiew Mar 18, 2025
3f52617
refactor: simplify create_appropriate_adapter logic for nested schema…
kosiew Mar 18, 2025
ad74d3a
refactor: remove redundant default adapter test in nested schema adapter
kosiew Mar 18, 2025
134dace
feat: enhance NestedStructSchemaAdapter to support additional table s…
kosiew Mar 18, 2025
aa89671
refactor: simplify test_nested_struct_evolution
kosiew Mar 18, 2025
f361311
refactor: streamline schema creation in nested schema adapter tests
kosiew Mar 18, 2025
a914a6b
Fix clippy errors
kosiew Mar 18, 2025
d8eb3eb
test: add async test for schema evolution with compaction in NestedSt…
kosiew Mar 21, 2025
1735b45
refactor: add missing imports and clean up test code in nested_schema…
kosiew Mar 21, 2025
72aee85
Rollback to before adding test_datafusion_schema_evolution_with_compa…
kosiew Mar 21, 2025
772fbce
feat: add nested_struct.rs to test nested schema evolution test with …
kosiew Mar 21, 2025
20af2c0
chore: remove nested_struct.rs example file to streamline repository …
kosiew Mar 21, 2025
3c0844c
feat: Add nested_struct.rs async function for schema evolution with c…
kosiew Mar 21, 2025
ad09e60
feat: Enhance logging in nested_struct.rs for better traceability 📜✨
kosiew Mar 21, 2025
61f1f6e
created helper functions
kosiew Mar 21, 2025
16a47d3
map batch1 to schema2
kosiew Mar 21, 2025
7b7183e
feat: Enhance NestedStructSchemaAdapter with custom schema mapping fo…
kosiew Mar 21, 2025
84ab195
feat: Add debug print statements to map_batch for tracing execution f…
kosiew Mar 21, 2025
51dacc5
fix: Refactor nested schema mapping for improved error handling and c…
kosiew Mar 21, 2025
aa5128a
refactor: Remove debug print statements for cleaner code execution 🧹✨
kosiew Mar 21, 2025
839bf61
nested_struct - plug adapter into ListingTableConfig
kosiew Mar 24, 2025
2e99158
feat: Add optional schema adapter factory to ListingTableConfig for e…
kosiew Mar 24, 2025
fe7ff84
feat: Add optional schema adapter factory to FileScanConfig for enhan…
kosiew Mar 24, 2025
3689140
feat: Enhance ListingTableConfig to support schema adapter factory fo…
kosiew Mar 24, 2025
76fbc6f
struct NestedStructSchemaMapping - remove table_schema, file_schema
kosiew Mar 25, 2025
f2d6b60
refactor: Remove nested_struct.rs example for schema evolution and co…
kosiew Mar 25, 2025
6b7fed9
style: Fix comment tests in ListingOptions documentation 📜✨
kosiew Mar 25, 2025
2cef654
Merge branch 'main' into test-merge
kosiew Mar 25, 2025
565ad5c
SchemaMapping remove table_schema, nested_schema_adapter remove map_p…
kosiew Mar 25, 2025
778da1e
docs: Update comments for schema_adapter_factory in ListingTableConfi…
kosiew Mar 25, 2025
f066e59
refactor: Extract schema adapter preservation logic into a helper fun…
kosiew Mar 25, 2025
4cc5f77
refactor: Extract schema adapter application logic into a dedicated f…
kosiew Mar 25, 2025
b6a828c
docs: Enhance adapt_fields documentation with performance considerati…
kosiew Mar 25, 2025
41fb40c
docs: Add detailed documentation for RecordBatch mapping in NestedStr…
kosiew Mar 25, 2025
3133cd7
refactor: Add missing import for FileSource in ListingTable implement…
kosiew Mar 25, 2025
5ad6287
refactor: Update license documentation comments for NestedSchemaAdapt…
kosiew Mar 25, 2025
8fa34da
refactor: Remove unused file_scan_exec.rs to clean up the codebase 🗑️✨
kosiew Mar 25, 2025
d229dd3
refactor: Remove unused file_scan_config.rs to streamline the codebas…
kosiew Mar 25, 2025
ff41c43
Moved the adapt_column method from NestedStructSchemaMapping to a sta…
kosiew Mar 25, 2025
2df74b6
Fix Clippy errors
kosiew Mar 25, 2025
bb4a5de
docs: Correct the struct names in documentation for NestedStructSchem…
kosiew Mar 25, 2025
a8cce59
Merge branch 'main' into schema-adapter
kosiew Mar 25, 2025
f547355
fix: remove unnecessary clone in create_physical_plan call for Listin…
kosiew Mar 25, 2025
fa7c17f
refactor: rename preserve_schema_adapter_factory to preserve_conf_sch…
kosiew Mar 25, 2025
e9c93d6
refactor: rename create_appropriate_adapter to create_adapter for cla…
kosiew Mar 25, 2025
64a4e3f
feature gate parquet
kosiew Mar 26, 2025
dd9f66d
Trigger CI
kosiew Mar 26, 2025
ca511df
refactor: mod tests, add user_infos
kosiew Mar 26, 2025
54590f4
feat: expose nested schema adapter and source for improved data handl…
kosiew Mar 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix clippy errors
kosiew committed Mar 18, 2025
commit a914a6bc9c50f5d57756533c340d08db0709651f
105 changes: 51 additions & 54 deletions datafusion/datasource/src/nested_schema_adapter.rs
Original file line number Diff line number Diff line change
@@ -102,6 +102,51 @@ pub struct NestedStructSchemaAdapter {
table_schema: SchemaRef,
}

/// Adapt the source schema fields to match the target schema while preserving
/// nested struct fields and handling field additions/removals
fn adapt_fields(source_fields: &Fields, target_fields: &Fields) -> Vec<Field> {
let mut adapted_fields = Vec::new();
let source_map: HashMap<_, _> = source_fields
.iter()
.map(|f| (f.name().as_str(), f))
.collect();

for target_field in target_fields {
match source_map.get(target_field.name().as_str()) {
Some(source_field) => {
match (source_field.data_type(), target_field.data_type()) {
// Recursively adapt nested struct fields
(
DataType::Struct(source_children),
DataType::Struct(target_children),
) => {
let adapted_children =
adapt_fields(source_children, target_children);
adapted_fields.push(Field::new(
target_field.name(),
DataType::Struct(adapted_children.into()),
target_field.is_nullable(),
));
}
// If types match exactly, keep source field
_ if source_field.data_type() == target_field.data_type() => {
adapted_fields.push(source_field.as_ref().clone());
}
// Types don't match - use target field definition
_ => {
adapted_fields.push(target_field.as_ref().clone());
}
}
}
// Field doesn't exist in source - add from target
None => {
adapted_fields.push(target_field.as_ref().clone());
}
}
}

adapted_fields
}
impl NestedStructSchemaAdapter {
/// Create a new NestedStructSchemaAdapter with the target schema
pub fn new(projected_table_schema: SchemaRef, table_schema: SchemaRef) -> Self {
@@ -119,56 +164,10 @@ impl NestedStructSchemaAdapter {
self.table_schema.as_ref()
}

/// Adapt the source schema fields to match the target schema while preserving
/// nested struct fields and handling field additions/removals
fn adapt_fields(&self, source_fields: &Fields, target_fields: &Fields) -> Vec<Field> {
let mut adapted_fields = Vec::new();
let source_map: HashMap<_, _> = source_fields
.iter()
.map(|f| (f.name().as_str(), f))
.collect();

for target_field in target_fields {
match source_map.get(target_field.name().as_str()) {
Some(source_field) => {
match (source_field.data_type(), target_field.data_type()) {
// Recursively adapt nested struct fields
(
DataType::Struct(source_children),
DataType::Struct(target_children),
) => {
let adapted_children =
self.adapt_fields(source_children, target_children);
adapted_fields.push(Field::new(
target_field.name(),
DataType::Struct(adapted_children.into()),
target_field.is_nullable(),
));
}
// If types match exactly, keep source field
_ if source_field.data_type() == target_field.data_type() => {
adapted_fields.push(source_field.as_ref().clone());
}
// Types don't match - use target field definition
_ => {
adapted_fields.push(target_field.as_ref().clone());
}
}
}
// Field doesn't exist in source - add from target
None => {
adapted_fields.push(target_field.as_ref().clone());
}
}
}

adapted_fields
}

// Takes a source schema and transforms it to match the structure of the target schema.
fn adapt_schema(&self, source_schema: SchemaRef) -> Result<SchemaRef> {
let adapted_fields =
self.adapt_fields(source_schema.fields(), self.table_schema.fields());
adapt_fields(source_schema.fields(), self.table_schema.fields());

Ok(Arc::new(Schema::new_with_metadata(
adapted_fields,
@@ -487,7 +486,7 @@ mod tests {
assert_eq!(adapted.fields().len(), 2); // Should have id and user_info

// Check that user_info is a struct
if let Some(idx) = adapted.index_of("user_info").ok() {
if let Ok(idx) = adapted.index_of("user_info") {
let user_info_field = adapted.field(idx);
assert!(matches!(user_info_field.data_type(), DataType::Struct(_)));

@@ -535,7 +534,7 @@ mod tests {
}

fn create_nested_schema() -> Arc<Schema> {
let nested_schema = Arc::new(Schema::new(vec![
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add another field called user_infos which is a List<user_info>? This is another common evolving struct use case that's good to support.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"user_info",
@@ -563,20 +562,18 @@ mod tests {
),
true,
),
]));
nested_schema
]))
}

fn create_flat_schema() -> Arc<Schema> {
let flat_schema = Arc::new(Schema::new(vec![
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("user", DataType::Utf8, true),
Field::new(
"timestamp",
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
]));
flat_schema
]))
}
}