Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datafusion can't seem to cast evolving structs #14757

Open
TheBuilderJR opened this issue Feb 19, 2025 · 14 comments · May be fixed by #15295
Open

Datafusion can't seem to cast evolving structs #14757

TheBuilderJR opened this issue Feb 19, 2025 · 14 comments · May be fixed by #15295
Labels
bug Something isn't working

Comments

@TheBuilderJR
Copy link
Contributor

TheBuilderJR commented Feb 19, 2025

Describe the bug

I'd expect as I add fields to structs, I should be able to cast one into another. You can see in the repro below this doesn't seem to be allowed:

To Reproduce

use std::fs;
use std::sync::Arc;
use datafusion::prelude::*;
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::array::{Array, StringArray, StructArray, TimestampMillisecondArray, Float64Array};
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl};
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::dataframe::DataFrameWriteOptions;

#[tokio::test]
async fn test_datafusion_schema_evolution_with_compaction() -> Result<(), Box<dyn std::error::Error>> {
    let ctx = SessionContext::new();

    let schema1 = Arc::new(Schema::new(vec![
        Field::new("component", DataType::Utf8, true),
        Field::new("message", DataType::Utf8, true),
        Field::new("stack", DataType::Utf8, true),
        Field::new("timestamp", DataType::Utf8, true),
        Field::new(
            "timestamp_utc",
            DataType::Timestamp(TimeUnit::Millisecond, None),
            true,
        ),
        Field::new(
            "additionalInfo",
            DataType::Struct(vec![
                Field::new("location", DataType::Utf8, true),
                Field::new(
                    "timestamp_utc",
                    DataType::Timestamp(TimeUnit::Millisecond, None),
                    true,
                ),
            ].into()),
            true,
        ),
    ]));
    
    let batch1 = RecordBatch::try_new(
        schema1.clone(),
        vec![
            Arc::new(StringArray::from(vec![Some("component1")])),
            Arc::new(StringArray::from(vec![Some("message1")])),
            Arc::new(StringArray::from(vec![Some("stack_trace")])),
            Arc::new(StringArray::from(vec![Some("2025-02-18T00:00:00Z")])),
            Arc::new(TimestampMillisecondArray::from(vec![Some(1640995200000)])),
            Arc::new(StructArray::from(vec![
                (
                    Arc::new(Field::new("location", DataType::Utf8, true)),
                    Arc::new(StringArray::from(vec![Some("USA")])) as Arc<dyn Array>,
                ),
                (
                    Arc::new(Field::new(
                        "timestamp_utc",
                        DataType::Timestamp(TimeUnit::Millisecond, None),
                        true,
                    )),
                    Arc::new(TimestampMillisecondArray::from(vec![Some(1640995200000)])),
                ),
            ])),
        ],
    )?;

    let path1 = "test_data1.parquet";
    let _ = fs::remove_file(path1);
    
    let df1 = ctx.read_batch(batch1)?;
    df1.write_parquet(
        path1,
        DataFrameWriteOptions::default()
            .with_single_file_output(true)
            .with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
        None
    ).await?;

    let schema2 = Arc::new(Schema::new(vec![
        Field::new("component", DataType::Utf8, true),
        Field::new("message", DataType::Utf8, true),
        Field::new("stack", DataType::Utf8, true),
        Field::new("timestamp", DataType::Utf8, true),
        Field::new(
            "timestamp_utc",
            DataType::Timestamp(TimeUnit::Millisecond, None),
            true,
        ),
        Field::new(
            "additionalInfo",
            DataType::Struct(vec![
                Field::new("location", DataType::Utf8, true),
                Field::new(
                    "timestamp_utc",
                    DataType::Timestamp(TimeUnit::Millisecond, None),
                    true,
                ),
                Field::new(
                    "reason",
                    DataType::Struct(vec![
                        Field::new("_level", DataType::Float64, true),
                        Field::new(
                            "details",
                            DataType::Struct(vec![
                                Field::new("rurl", DataType::Utf8, true),
                                Field::new("s", DataType::Float64, true),
                                Field::new("t", DataType::Utf8, true),
                            ].into()),
                            true,
                        ),
                    ].into()),
                    true,
                ),
            ].into()),
            true,
        ),
    ]));

    let batch2 = RecordBatch::try_new(
        schema2.clone(),
        vec![
            Arc::new(StringArray::from(vec![Some("component1")])),
            Arc::new(StringArray::from(vec![Some("message1")])),
            Arc::new(StringArray::from(vec![Some("stack_trace")])),
            Arc::new(StringArray::from(vec![Some("2025-02-18T00:00:00Z")])),
            Arc::new(TimestampMillisecondArray::from(vec![Some(1640995200000)])),
            Arc::new(StructArray::from(vec![
                (
                    Arc::new(Field::new("location", DataType::Utf8, true)),
                    Arc::new(StringArray::from(vec![Some("USA")])) as Arc<dyn Array>,
                ),
                (
                    Arc::new(Field::new(
                        "timestamp_utc",
                        DataType::Timestamp(TimeUnit::Millisecond, None),
                        true,
                    )),
                    Arc::new(TimestampMillisecondArray::from(vec![Some(1640995200000)])),
                ),
                (
                    Arc::new(Field::new(
                        "reason",
                        DataType::Struct(vec![
                            Field::new("_level", DataType::Float64, true),
                            Field::new(
                                "details",
                                DataType::Struct(vec![
                                    Field::new("rurl", DataType::Utf8, true),
                                    Field::new("s", DataType::Float64, true),
                                    Field::new("t", DataType::Utf8, true),
                                ].into()),
                                true,
                            ),
                        ].into()),
                        true,
                    )),
                    Arc::new(StructArray::from(vec![
                        (
                            Arc::new(Field::new("_level", DataType::Float64, true)),
                            Arc::new(Float64Array::from(vec![Some(1.5)])) as Arc<dyn Array>,
                        ),
                        (
                            Arc::new(Field::new(
                                "details",
                                DataType::Struct(vec![
                                    Field::new("rurl", DataType::Utf8, true),
                                    Field::new("s", DataType::Float64, true),
                                    Field::new("t", DataType::Utf8, true),
                                ].into()),
                                true,
                            )),
                            Arc::new(StructArray::from(vec![
                                (
                                    Arc::new(Field::new("rurl", DataType::Utf8, true)),
                                    Arc::new(StringArray::from(vec![Some("https://example.com")])) as Arc<dyn Array>,
                                ),
                                (
                                    Arc::new(Field::new("s", DataType::Float64, true)),
                                    Arc::new(Float64Array::from(vec![Some(3.14)])) as Arc<dyn Array>,
                                ),
                                (
                                    Arc::new(Field::new("t", DataType::Utf8, true)),
                                    Arc::new(StringArray::from(vec![Some("data")])) as Arc<dyn Array>,
                                ),
                            ])),
                        ),
                    ])),
                ),
            ])),
        ],
    )?;

    let path2 = "test_data2.parquet";
    let _ = fs::remove_file(path2);
    
    let df2 = ctx.read_batch(batch2)?;
    df2.write_parquet(
        path2,
        DataFrameWriteOptions::default()
            .with_single_file_output(true)
            .with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
        None
    ).await?;

    let paths_str = vec![path1.to_string(), path2.to_string()];
    let config = ListingTableConfig::new_with_multi_paths(
        paths_str
            .into_iter()
            .map(|p| ListingTableUrl::parse(&p))
            .collect::<Result<Vec<_>, _>>()?
    )
        .with_schema(schema2.as_ref().clone().into())
        .infer(&ctx.state()).await?;

    let config = ListingTableConfig {
        options: Some(ListingOptions {
            file_sort_order: vec![vec![
                col("timestamp_utc").sort(true, true),
            ]],
            ..config.options.unwrap_or_else(|| ListingOptions::new(Arc::new(ParquetFormat::default())))
        }),
        ..config
    };

    let listing_table = ListingTable::try_new(config)?;
    ctx.register_table("events", Arc::new(listing_table))?;

    let df = ctx.sql("SELECT * FROM events ORDER BY timestamp_utc").await?;
    let results = df.clone().collect().await?;

    assert_eq!(results[0].num_rows(), 2);

    let compacted_path = "test_data_compacted.parquet";
    let _ = fs::remove_file(compacted_path);

    df.write_parquet(
        compacted_path,
        DataFrameWriteOptions::default()
            .with_single_file_output(true)
            .with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
        None
    ).await?;

    let new_ctx = SessionContext::new();
    let config = ListingTableConfig::new_with_multi_paths(vec![ListingTableUrl::parse(compacted_path)?])
        .with_schema(schema2.as_ref().clone().into())
        .infer(&new_ctx.state()).await?;
    
    let listing_table = ListingTable::try_new(config)?;
    new_ctx.register_table("events", Arc::new(listing_table))?;

    let df = new_ctx.sql("SELECT * FROM events ORDER BY timestamp_utc").await?;
    let compacted_results = df.collect().await?;
    
    assert_eq!(compacted_results[0].num_rows(), 2);
    assert_eq!(results, compacted_results);

    let _ = fs::remove_file(path1);
    let _ = fs::remove_file(path2);
    let _ = fs::remove_file(compacted_path);

    Ok(())
}

produces

Error: Plan("Cannot cast file schema field additionalInfo of type Struct([Field { name: \"location\", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"timestamp_utc\", data_type: Timestamp(Millisecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"reason\", data_type: Struct([Field { name: \"_level\", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"details\", data_type: Struct([Field { name: \"rurl\", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"s\", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"t\", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) to table schema field of type Struct([Field { name: \"location\", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"timestamp_utc\", data_type: Timestamp(Millisecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }])

Expected behavior

i expected that test to pass

Additional context

No response

@TheBuilderJR TheBuilderJR added the bug Something isn't working label Feb 19, 2025
@TheBuilderJR
Copy link
Contributor Author

TheBuilderJR commented Feb 19, 2025

cc @alamb @zhuqi-lucas many of my users can't query their data because of this evolution. any chance you can take a look to see if there's any workaround I can do for now?

@alamb
Copy link
Contributor

alamb commented Feb 19, 2025

cc @alamb @zhuqi-lucas many of my users can't query their data because of this evolution. any chance you can take a look to see if there's any workaround I can do for now?

I don't know of a workaround here.

This seems like a feature we would have to implement (likely in arrow-rs 's cast kernel first)

We had some discussion on the semantics of struct casting recently

@TheBuilderJR
Copy link
Contributor Author

@alamb perhaps a subtask is making Schema::try_merge consistent with datafusion's ability to query these merged schemas. Schema::try_merge currently happily merges these structs, and then I as an end user am in this danger zone where data has been mutated with an assumption it can be queried when it can't.

@zhuqi-lucas
Copy link
Contributor

Can't find a workaround for this, and i think the Schema::try_merge passed before this error.

So when we map_schema at the end, we should still check the cast error why it not passed. I am not familiar with the checking logic. Need help from other folks.

@TheBuilderJR
Copy link
Contributor Author

@alamb how do y'all handle this at influx? This one comes as quite a shocker to me. Does no one else using datafusion support struct evolution?

@alamb
Copy link
Contributor

alamb commented Feb 22, 2025

@alamb how do y'all handle this at influx? This one comes as quite a shocker to me. Does no one else using datafusion support struct evolution?

InfluxData doesnt have a struct datatype so it doesn't come up

Basically I think extending the schema merging and casting logic to handle struct evolution sounds like a good idea to me. Eventually the code probably belongs in arrow-rs but we could try to implement it first in DataFusion and then port it upstream

Can someone file / find a ticket upstream in arrow-rs to get the conversation started there?

@TheBuilderJR
Copy link
Contributor Author

@alamb given that the arrow folks don't seem super motivated to fix this in a timely manner, can we do a fix on the datafusion side? Maybe the fix is we can try to do an arrow cast, but if it fails we fallback to a datafusion cast specialized to this struct use case?

@alamb
Copy link
Contributor

alamb commented Feb 24, 2025

@alamb given that the arrow folks don't seem super motivated to fix this in a timely manner, can we do a fix on the datafusion side?

Yes, of course!

Maybe the fix is we can try to do an arrow cast, but if it fails we fallback to a datafusion cast specialized to this struct use case?

Yes I think that would be a good idea! Can you perhaps work on a PR?

I think it will be related to #14396 as well so maybe it would be good to give that a look, help @Lordworms with it

@Lordworms
Copy link
Contributor

I forgot to rebase the main branch, but I can refactor it to use arrow-cast + specialized case

@TheBuilderJR
Copy link
Contributor Author

@Lordworms can you share your branch? I'm happy to take a look as well if you don't have the bandwidth.

@TheBuilderJR
Copy link
Contributor Author

@alamb I should have some free cycles soon to do this. Any chance you can give me some code points or reference PRs that would help with implementation? Thanks in advance!

@alamb
Copy link
Contributor

alamb commented Mar 5, 2025

I think if you look at the code in these two PRs you'll find the relevant casting:

@TheBuilderJR
Copy link
Contributor Author

This PR fixes it but I'm not sure it's the "right way" to do it #15259

If anyone wants to take this to the finish line, please do, I won't be offended :)

@kosiew kosiew linked a pull request Mar 18, 2025 that will close this issue
@alamb
Copy link
Contributor

alamb commented Mar 18, 2025

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants