Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance Schema adapter to accommodate evolving struct #15295

Draft
wants to merge 29 commits into
base: main
Choose a base branch
from

Conversation

kosiew
Copy link
Contributor

@kosiew kosiew commented Mar 18, 2025

Which issue does this PR close?

Rationale for this change

arrow-rs suggests that SchemaAdapter is better approach for handling evolving struct.
This PR introduces a NestedStructSchemaAdapter to improve schema evolution handling in DataFusion when dealing with nested struct types. Currently, schema evolution primarily supports flat schemas, but evolving nested structures (such as adding new fields to existing structs) requires special handling. This change ensures better compatibility and adaptability for evolving datasets.

What changes are included in this PR?

  • Introduces NestedStructSchemaAdapter to handle schema evolution for nested struct fields.
  • Implements NestedStructSchemaAdapterFactory to determine whether the specialized adapter is needed based on schema characteristics.
  • Enhances SchemaMapping with a new constructor for improved usability.
  • Updates schema_adapter.rs and integrates the new adapter into the datafusion_datasource module.
  • Adds comprehensive unit tests to verify the correctness of schema adaptation, including nested struct evolution scenarios.

Are these changes tested?

Yes, extensive unit tests have been added to verify:

  • Proper mapping of fields, including added and missing nested struct fields.
  • Correct adaptation from flat schemas to nested schemas.
  • Validation of different adapter selection logic based on schema characteristics.

Are there any user-facing changes?

No breaking changes.
However, users working with evolving nested struct schemas will benefit from improved support for automatic schema adaptation. This enhances compatibility with sources like Parquet, where schemas may change over time.

kosiew added 12 commits March 18, 2025 10:44
- Refactored adapt_fields method to accept Fields instead of Field arrays for better type handling.
- Added create_schema_mapper method to facilitate mapping between source and target schemas.
- Updated map_column_index and map_schema methods to improve schema adaptation and mapping logic.
- Enhanced test cases to validate nested struct evolution with new schema mappings.
…struct schema evolution

- Added NestedStructSchemaAdapterFactory to create schema adapters that manage nested struct fields.
- Introduced methods for creating appropriate schema adapters based on schema characteristics.
- Implemented checks for nested struct fields to enhance schema evolution handling.
…nsformations

- Added an optional source schema parameter to create_appropriate_adapter for better handling of nested structs.
- Updated logic to return NestedStructSchemaAdapter when adapting between schemas with different structures or when the source schema contains nested structs.
- Improved default case handling for simple schemas.
- Added a new test case to validate the adaptation from a simple schema to a nested schema, ensuring correct field mapping and structure.
This commit eliminates the test for the default adapter's failure with nested schema transformations, streamlining the test suite. The focus is now on validating the functionality of the NestedStructSchemaAdapter, which is designed to handle missing nested fields effectively.
…chema handling

- Updated the `create` method in `NestedStructSchemaAdapterFactory` to accept and utilize the full table schema.
- Modified the `NestedStructSchemaAdapter` to store both projected and full table schemas for improved schema adaptation.
- Refactored the `adapt_schema` method to use the full table schema for field adaptation.
- Added helper functions to create basic and enhanced nested schemas for testing.
- Updated tests to validate the new schema handling logic, ensuring compatibility with nested structures.
@github-actions github-actions bot added core Core DataFusion crate datasource Changes to the datasource crate labels Mar 18, 2025
assert!(default_result.is_err());
if let Err(e) = default_result {
assert!(
format!("{}", e).contains("Cannot cast file schema field metadata"),
Copy link
Contributor Author

@kosiew kosiew Mar 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is similar to the error mentioned in #14757

Error: Plan("Cannot cast file schema field additionalInfo of type Struct([Field { name: \"location\", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"timestamp_utc\", data_type: Timestamp(Millisecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"reason\", data_type: Struct([Field { name: \"_level\", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"details\", data_type: Struct([Field { name: \"rurl\", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"s\", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"t\", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) to table schema field of type Struct([Field { name: \"location\", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"timestamp_utc\", data_type: Timestamp(Millisecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }])

@TheBuilderJR
Copy link
Contributor

@kosiew any chance you can try running the test case in #14757? It's a real world example of schema evolution that I hope can be solved with your PR. I managed to solve it with #15259, but it seems quite different than your PR so I'm curious if your PR also solves it.

kosiew added 12 commits March 21, 2025 12:27
…ructSchemaAdapter

- Introduced a new asynchronous test `test_datafusion_schema_evolution_with_compaction` to validate schema evolution and data compaction functionality.
- Added necessary imports for the new test, including `RecordBatch`, `SessionContext`, and various array types.
- Created two sample schemas and corresponding record batches to simulate data before and after schema evolution.
- Implemented logic to write the record batches to Parquet files and read them back to ensure data integrity.
- Verified that the results from the compacted data match the original data, ensuring the correctness of the schema evolution process.
…NestedStructSchemaAdapter

- Added a new example in nested_struct.rs to demonstrate schema evolution using NestedStructSchemaAdapter.
- Created two parquet files with different schemas: one without the 'reason' field and one with it.
- Implemented logic to read and write these parquet files, showcasing the handling of nested structures.
- Added detailed logging to track the process and results of the schema evolution test.
- Included assertions to verify the correctness of the data and schema in the compacted output.

🎉 This enhances the testing capabilities for nested schemas in DataFusion! 🚀
…ompaction in DataFusion examples 📊✨

- Implemented `test_datafusion_schema_evolution_with_compaction` to demonstrate schema evolution and data compaction using Parquet files.
- Created two schemas and corresponding record batches to simulate data processing.
- Added logic to write and read Parquet files, ensuring data integrity and compactness.
- Registered tables in the session context and executed SQL queries to validate results.
- Cleaned up temporary files after execution to maintain a tidy environment. 🗑️
…evolution in schema compaction example 🚀📊"

This reverts commit f286330.
- Added log statements to indicate the start of the test function and the writing of parquet files.
- Included logs for successful creation of ListingTable and registration of the table.
- Improved visibility into the execution flow by logging SQL query execution and result collection.
…rs 📊🔄✨

- Removed redundant code for creating ListingTableConfig and inferring config.
- Added functionality to read parquet files directly and apply NestedStructSchemaAdapter for schema compatibility.
- Created combined DataFrame from adapted batches for seamless querying.
- Enhanced logging for better traceability of schema adaptation process. 📜
kosiew added 3 commits March 21, 2025 19:16
…r nested structs 🌳✨

- Introduced `NestedStructSchemaMapping` to handle schema evolution for nested struct types.
- Updated `map_batch` and `map_partial_batch` methods to adapt nested structures correctly.
- Added detailed handling for new fields in target schemas, ensuring compatibility with existing data.
- Improved null handling for missing fields in the source schema, allowing for better data integrity.
- Included comprehensive tests to validate the new mapping functionality and ensure robustness. 🧪✅
@kosiew kosiew marked this pull request as draft March 21, 2025 12:32
…ode clarity 🛠️✨

- Updated `fields.push(field.as_ref().clone())` to ensure proper field cloning.
- Enhanced error handling in `cast` operations by wrapping them in `Ok(...)` for better result management.
- Cleaned up test imports by consolidating array imports for better readability.
- Removed unnecessary error handling in `StringBuilder.append_value` calls to streamline code.
- Improved variable binding for clarity in test assertions.

These changes enhance the robustness of the schema mapping logic and improve overall code maintainability. 📈🔍
@kosiew
Copy link
Contributor Author

kosiew commented Mar 21, 2025

hi @TheBuilderJR

I haven't completed the PR yet.

Here's the interim progress I used the NestedSchemaAdapter in test_datafusion_schema_evolution_with_compaction.
My next step is to plug in NestedSchemaAdapter somewhere in ListingTableConfig.

use datafusion::arrow::array::{
    Array, Float64Array, StringArray, StructArray, TimestampMillisecondArray,
};
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{
    ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::datasource::nested_schema_adapter::NestedStructSchemaAdapterFactory;
use datafusion::prelude::*;
use std::error::Error;
use std::fs;
use std::sync::Arc;

async fn test_datafusion_schema_evolution_with_compaction() -> Result<(), Box<dyn Error>>
{
    let ctx = SessionContext::new();

    let schema1 = create_schema1();
    let schema2 = create_schema2();

    let batch1 = create_batch1(&schema1)?;

   // adapter start
    let adapter = NestedStructSchemaAdapterFactory::create_appropriate_adapter(
        schema2.clone(),
        schema2.clone(),
    );

    let (mapping, _) = adapter
        .map_schema(&schema1.clone())
        .expect("map schema failed");
    let mapped_batch = mapping.map_batch(batch1)?;  
    // adapter end

    let path1 = "test_data1.parquet";
    let _ = fs::remove_file(path1);

    let df1 = ctx.read_batch(mapped_batch)?;
    df1.write_parquet(
        path1,
        DataFrameWriteOptions::default()
            .with_single_file_output(true)
            .with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
        None,
    )
    .await?;

    let batch2 = create_batch2(&schema2)?;

    let path2 = "test_data2.parquet";
    let _ = fs::remove_file(path2);

    let df2 = ctx.read_batch(batch2)?;
    df2.write_parquet(
        path2,
        DataFrameWriteOptions::default()
            .with_single_file_output(true)
            .with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
        None,
    )
    .await?;

    let paths_str = vec![path1.to_string(), path2.to_string()];

    let config = ListingTableConfig::new_with_multi_paths(
        paths_str
            .into_iter()
            .map(|p| ListingTableUrl::parse(&p))
            .collect::<Result<Vec<_>, _>>()?,
    )
    .with_schema(schema2.as_ref().clone().into());

    let config = config.infer(&ctx.state()).await?;

    let config = ListingTableConfig {
        options: Some(ListingOptions {
            file_sort_order: vec![vec![col("timestamp_utc").sort(true, true)]],
            ..config.options.unwrap_or_else(|| {
                ListingOptions::new(Arc::new(ParquetFormat::default()))
            })
        }),
        ..config
    };

    let listing_table = ListingTable::try_new(config)?;

    ctx.register_table("events", Arc::new(listing_table))?;

    let df = ctx
        .sql("SELECT * FROM events ORDER BY timestamp_utc")
        .await?;

    let results = df.clone().collect().await?;

    assert_eq!(results[0].num_rows(), 2);

    let compacted_path = "test_data_compacted.parquet";
    let _ = fs::remove_file(compacted_path);

    df.write_parquet(
        compacted_path,
        DataFrameWriteOptions::default()
            .with_single_file_output(true)
            .with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
        None,
    )
    .await?;

    let new_ctx = SessionContext::new();
    let config = ListingTableConfig::new_with_multi_paths(vec![ListingTableUrl::parse(
        compacted_path,
    )?])
    .with_schema(schema2.as_ref().clone().into())
    .infer(&new_ctx.state())
    .await?;

    let listing_table = ListingTable::try_new(config)?;
    new_ctx.register_table("events", Arc::new(listing_table))?;

    let df = new_ctx
        .sql("SELECT * FROM events ORDER BY timestamp_utc")
        .await?;
    let compacted_results = df.collect().await?;

    assert_eq!(compacted_results[0].num_rows(), 2);
    assert_eq!(results, compacted_results);

    let _ = fs::remove_file(path1);
    let _ = fs::remove_file(path2);
    let _ = fs::remove_file(compacted_path);

    Ok(())
}

fn create_schema2() -> Arc<Schema> {
    let schema2 = Arc::new(Schema::new(vec![
        Field::new("component", DataType::Utf8, true),
        Field::new("message", DataType::Utf8, true),
        Field::new("stack", DataType::Utf8, true),
        Field::new("timestamp", DataType::Utf8, true),
        Field::new(
            "timestamp_utc",
            DataType::Timestamp(TimeUnit::Millisecond, None),
            true,
        ),
        Field::new(
            "additionalInfo",
            DataType::Struct(
                vec![
                    Field::new("location", DataType::Utf8, true),
                    Field::new(
                        "timestamp_utc",
                        DataType::Timestamp(TimeUnit::Millisecond, None),
                        true,
                    ),
                    Field::new(
                        "reason",
                        DataType::Struct(
                            vec![
                                Field::new("_level", DataType::Float64, true),
                                Field::new(
                                    "details",
                                    DataType::Struct(
                                        vec![
                                            Field::new("rurl", DataType::Utf8, true),
                                            Field::new("s", DataType::Float64, true),
                                            Field::new("t", DataType::Utf8, true),
                                        ]
                                        .into(),
                                    ),
                                    true,
                                ),
                            ]
                            .into(),
                        ),
                        true,
                    ),
                ]
                .into(),
            ),
            true,
        ),
    ]));
    schema2
}

fn create_batch1(schema1: &Arc<Schema>) -> Result<RecordBatch, Box<dyn Error>> {
    let batch1 = RecordBatch::try_new(
        schema1.clone(),
        vec![
            Arc::new(StringArray::from(vec![Some("component1")])),
            Arc::new(StringArray::from(vec![Some("message1")])),
            Arc::new(StringArray::from(vec![Some("stack_trace")])),
            Arc::new(StringArray::from(vec![Some("2025-02-18T00:00:00Z")])),
            Arc::new(TimestampMillisecondArray::from(vec![Some(1640995200000)])),
            Arc::new(StructArray::from(vec![
                (
                    Arc::new(Field::new("location", DataType::Utf8, true)),
                    Arc::new(StringArray::from(vec![Some("USA")])) as Arc<dyn Array>,
                ),
                (
                    Arc::new(Field::new(
                        "timestamp_utc",
                        DataType::Timestamp(TimeUnit::Millisecond, None),
                        true,
                    )),
                    Arc::new(TimestampMillisecondArray::from(vec![Some(1640995200000)])),
                ),
            ])),
        ],
    )?;
    Ok(batch1)
}

fn create_schema1() -> Arc<Schema> {
    let schema1 = Arc::new(Schema::new(vec![
        Field::new("component", DataType::Utf8, true),
        Field::new("message", DataType::Utf8, true),
        Field::new("stack", DataType::Utf8, true),
        Field::new("timestamp", DataType::Utf8, true),
        Field::new(
            "timestamp_utc",
            DataType::Timestamp(TimeUnit::Millisecond, None),
            true,
        ),
        Field::new(
            "additionalInfo",
            DataType::Struct(
                vec![
                    Field::new("location", DataType::Utf8, true),
                    Field::new(
                        "timestamp_utc",
                        DataType::Timestamp(TimeUnit::Millisecond, None),
                        true,
                    ),
                ]
                .into(),
            ),
            true,
        ),
    ]));
    schema1
}

fn create_batch2(schema2: &Arc<Schema>) -> Result<RecordBatch, Box<dyn Error>> {
    let batch2 = RecordBatch::try_new(
        schema2.clone(),
        vec![
            Arc::new(StringArray::from(vec![Some("component1")])),
            Arc::new(StringArray::from(vec![Some("message1")])),
            Arc::new(StringArray::from(vec![Some("stack_trace")])),
            Arc::new(StringArray::from(vec![Some("2025-02-18T00:00:00Z")])),
            Arc::new(TimestampMillisecondArray::from(vec![Some(1640995200000)])),
            Arc::new(StructArray::from(vec![
                (
                    Arc::new(Field::new("location", DataType::Utf8, true)),
                    Arc::new(StringArray::from(vec![Some("USA")])) as Arc<dyn Array>,
                ),
                (
                    Arc::new(Field::new(
                        "timestamp_utc",
                        DataType::Timestamp(TimeUnit::Millisecond, None),
                        true,
                    )),
                    Arc::new(TimestampMillisecondArray::from(vec![Some(1640995200000)])),
                ),
                (
                    Arc::new(Field::new(
                        "reason",
                        DataType::Struct(
                            vec![
                                Field::new("_level", DataType::Float64, true),
                                Field::new(
                                    "details",
                                    DataType::Struct(
                                        vec![
                                            Field::new("rurl", DataType::Utf8, true),
                                            Field::new("s", DataType::Float64, true),
                                            Field::new("t", DataType::Utf8, true),
                                        ]
                                        .into(),
                                    ),
                                    true,
                                ),
                            ]
                            .into(),
                        ),
                        true,
                    )),
                    Arc::new(StructArray::from(vec![
                        (
                            Arc::new(Field::new("_level", DataType::Float64, true)),
                            Arc::new(Float64Array::from(vec![Some(1.5)]))
                                as Arc<dyn Array>,
                        ),
                        (
                            Arc::new(Field::new(
                                "details",
                                DataType::Struct(
                                    vec![
                                        Field::new("rurl", DataType::Utf8, true),
                                        Field::new("s", DataType::Float64, true),
                                        Field::new("t", DataType::Utf8, true),
                                    ]
                                    .into(),
                                ),
                                true,
                            )),
                            Arc::new(StructArray::from(vec![
                                (
                                    Arc::new(Field::new("rurl", DataType::Utf8, true)),
                                    Arc::new(StringArray::from(vec![Some(
                                        "https://example.com",
                                    )]))
                                        as Arc<dyn Array>,
                                ),
                                (
                                    Arc::new(Field::new("s", DataType::Float64, true)),
                                    Arc::new(Float64Array::from(vec![Some(3.14)]))
                                        as Arc<dyn Array>,
                                ),
                                (
                                    Arc::new(Field::new("t", DataType::Utf8, true)),
                                    Arc::new(StringArray::from(vec![Some("data")]))
                                        as Arc<dyn Array>,
                                ),
                            ])),
                        ),
                    ])),
                ),
            ])),
        ],
    )?;
    Ok(batch2)
}

fn main() -> Result<(), Box<dyn Error>> {
    // Create a Tokio runtime for running our async function
    let rt = tokio::runtime::Runtime::new()?;

    // Run the function in the runtime
    rt.block_on(async { test_datafusion_schema_evolution_with_compaction().await })?;

    println!("Example completed successfully!");
    Ok(())
}

@TheBuilderJR
Copy link
Contributor

Nice! Fwiw another edge case I found recently that's probably worth testing is a List where the Struct evolves. I ended up solving it by updating list_coersion but curious if you have a better way: https://github.com/apache/datafusion/pull/15259/files

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate datasource Changes to the datasource crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Datafusion can't seem to cast evolving structs
2 participants