-
Notifications
You must be signed in to change notification settings - Fork 972
[Variant] Define shredding schema for VariantArrayBuilder
#7921
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,349 @@ | ||||||||||||
// Licensed to the Apache Software Foundation (ASF) under one | ||||||||||||
// or more contributor license agreements. See the NOTICE file | ||||||||||||
// distributed with this work for additional information | ||||||||||||
// regarding copyright ownership. The ASF licenses this file | ||||||||||||
// to you under the Apache License, Version 2.0 (the | ||||||||||||
// "License"); you may not use this file except in compliance | ||||||||||||
// with the License. You may obtain a copy of the License at | ||||||||||||
// | ||||||||||||
// http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||
// | ||||||||||||
// Unless required by applicable law or agreed to in writing, | ||||||||||||
// software distributed under the License is distributed on an | ||||||||||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||||||||||||
// KIND, either express or implied. See the License for the | ||||||||||||
// specific language governing permissions and limitations | ||||||||||||
// under the License. | ||||||||||||
|
||||||||||||
use arrow_schema::{ArrowError, DataType, Fields}; | ||||||||||||
|
||||||||||||
// Keywords defined by the shredding spec | ||||||||||||
pub const METADATA: &str = "metadata"; | ||||||||||||
pub const VALUE: &str = "value"; | ||||||||||||
pub const TYPED_VALUE: &str = "typed_value"; | ||||||||||||
|
||||||||||||
pub fn validate_value_and_typed_value( | ||||||||||||
fields: &Fields, | ||||||||||||
allow_both_null: bool, | ||||||||||||
) -> Result<(), ArrowError> { | ||||||||||||
let value_field_res = fields.iter().find(|f| f.name() == VALUE); | ||||||||||||
let typed_value_field_res = fields.iter().find(|f| f.name() == TYPED_VALUE); | ||||||||||||
|
||||||||||||
if !allow_both_null { | ||||||||||||
if let (None, None) = (value_field_res, typed_value_field_res) { | ||||||||||||
return Err(ArrowError::InvalidArgumentError( | ||||||||||||
"Invalid VariantArray: StructArray must contain either `value` or `typed_value` fields or both.".to_string() | ||||||||||||
)); | ||||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
if let Some(value_field) = value_field_res { | ||||||||||||
// if !value_field.is_nullable() { | ||||||||||||
// return Err(ArrowError::InvalidArgumentError( | ||||||||||||
// "Expected value field to be nullable".to_string(), | ||||||||||||
// )); | ||||||||||||
// } | ||||||||||||
Comment on lines
+41
to
+45
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did not see anything in the shredding spec that explicitly states There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Spec says that both can be nullable:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What I don't know, is what should happen with
The spec says that
I'm pretty sure that means Even if one tried to store path names of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure the spec is 100% clear on this one, unfortunately. Maybe we need to ask the parquet-variant folks for clarification and/or refinement of the spec's wording? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There should only be one Also, the immediately There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The relevant paragraph from the spec for my second comment:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One more slightly pedantic clarification: the type of
Comment on lines
+41
to
+45
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we delete this check?
Suggested change
|
||||||||||||
|
||||||||||||
if value_field.data_type() != &DataType::BinaryView { | ||||||||||||
return Err(ArrowError::NotYetImplemented(format!( | ||||||||||||
"VariantArray 'value' field must be BinaryView, got {}", | ||||||||||||
value_field.data_type() | ||||||||||||
))); | ||||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
if let Some(typed_value_field) = fields.iter().find(|f| f.name() == TYPED_VALUE) { | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||
// if !typed_value_field.is_nullable() { | ||||||||||||
// return Err(ArrowError::InvalidArgumentError( | ||||||||||||
// "Expected value field to be nullable".to_string(), | ||||||||||||
// )); | ||||||||||||
// } | ||||||||||||
|
||||||||||||
// this is directly mapped from the spec's parquet physical types | ||||||||||||
// note, there are more data types we can support | ||||||||||||
// but for the sake of simplicity, I chose the smallest subset | ||||||||||||
match typed_value_field.data_type() { | ||||||||||||
DataType::Boolean | ||||||||||||
| DataType::Int32 | ||||||||||||
| DataType::Int64 | ||||||||||||
| DataType::Float32 | ||||||||||||
| DataType::Float64 | ||||||||||||
| DataType::BinaryView => {} | ||||||||||||
DataType::Union(union_fields, _) => { | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My initial reaction was that I don't think variant data can represent a union type? I guess this is a way to slightly relax strongly typed data, as long as the union members themselves are all valid variant types? And whichever union member is active becomes the only field+value of a variant object? But how would a reader of that shredded data know to read it back as a union, instead of the (sparse) struct it appears to be? Would it be better to just require a struct from the start? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I recommend we start without support for union type and then add it as we implement additional functionality |
||||||||||||
union_fields | ||||||||||||
.iter() | ||||||||||||
.map(|(_, f)| f.clone()) | ||||||||||||
.try_for_each(|f| { | ||||||||||||
let DataType::Struct(fields) = f.data_type().clone() else { | ||||||||||||
return Err(ArrowError::InvalidArgumentError( | ||||||||||||
"Expected struct".to_string(), | ||||||||||||
)); | ||||||||||||
}; | ||||||||||||
|
||||||||||||
validate_value_and_typed_value(&fields, false) | ||||||||||||
})?; | ||||||||||||
} | ||||||||||||
|
||||||||||||
foreign => { | ||||||||||||
return Err(ArrowError::NotYetImplemented(format!( | ||||||||||||
"Unsupported VariantArray 'typed_value' field, got {foreign}" | ||||||||||||
))) | ||||||||||||
} | ||||||||||||
} | ||||||||||||
} | ||||||||||||
Comment on lines
+62
to
+93
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't love this, but I treat the field I'm curious to get your thoughts, maybe we should stick with the Variant type mapping? One reason why the current logic isn't the best is because when we go to reconstruct variants, certain variant types like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we need to store (logical) int32 in memory just because parquet physically encodes them that way? When reading an int8 column from normal parquet, doesn't it come back as an int8 PrimitiveArray? |
||||||||||||
|
||||||||||||
Ok(()) | ||||||||||||
} | ||||||||||||
|
||||||||||||
/// Validates that the provided [`Fields`] conform to the Variant shredding specification. | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One thing I thought of while reviewing this PR was maybe we could potentially wrap this into its own Structure, like struct VariantSchema {
inner: Fields
} And then all this validation logic could be part of the constructor impl VariantSchema {
fn try_new(fields: Fields) -> Result<Self> {... }
...
} The benefits of this would be
|
||||||||||||
/// | ||||||||||||
/// # Requirements | ||||||||||||
/// - Must contain a "metadata" field of type BinaryView | ||||||||||||
/// - Must contain at least one of "value" (optional BinaryView) or "typed_value" (optional with valid Parquet type) | ||||||||||||
/// - Both "value" and "typed_value" can only be null simultaneously for shredded object fields | ||||||||||||
pub fn validate_shredded_schema(fields: &Fields) -> Result<(), ArrowError> { | ||||||||||||
let metadata_field = fields | ||||||||||||
.iter() | ||||||||||||
.find(|f| f.name() == METADATA) | ||||||||||||
.ok_or_else(|| { | ||||||||||||
ArrowError::InvalidArgumentError( | ||||||||||||
"Invalid VariantArray: StructArray must contain a 'metadata' field".to_string(), | ||||||||||||
) | ||||||||||||
})?; | ||||||||||||
|
||||||||||||
if metadata_field.is_nullable() { | ||||||||||||
return Err(ArrowError::InvalidArgumentError( | ||||||||||||
"Invalid VariantArray: metadata field can not be nullable".to_string(), | ||||||||||||
)); | ||||||||||||
} | ||||||||||||
Comment on lines
+114
to
+118
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I make sure to check metadata is not nullable. But I wonder if we should remove this. You could imagine a user wanting to use the same metadata throughout the entire building process? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think for variant columns nested inside a shredded variant, we must not have a |
||||||||||||
|
||||||||||||
if metadata_field.data_type() != &DataType::BinaryView { | ||||||||||||
return Err(ArrowError::NotYetImplemented(format!( | ||||||||||||
"VariantArray 'metadata' field must be BinaryView, got {}", | ||||||||||||
metadata_field.data_type() | ||||||||||||
))); | ||||||||||||
} | ||||||||||||
|
||||||||||||
validate_value_and_typed_value(fields, false)?; | ||||||||||||
|
||||||||||||
Ok(()) | ||||||||||||
} | ||||||||||||
|
||||||||||||
#[cfg(test)] | ||||||||||||
mod tests { | ||||||||||||
use super::*; | ||||||||||||
|
||||||||||||
use arrow_schema::{Field, UnionFields, UnionMode}; | ||||||||||||
|
||||||||||||
#[test] | ||||||||||||
fn test_regular_variant_schema() { | ||||||||||||
// a regular variant schema | ||||||||||||
let metadata_field = Field::new("metadata", DataType::BinaryView, false); | ||||||||||||
let value_field = Field::new("value", DataType::BinaryView, false); | ||||||||||||
|
||||||||||||
let schema = Fields::from(vec![metadata_field, value_field]); | ||||||||||||
|
||||||||||||
validate_shredded_schema(&schema).unwrap(); | ||||||||||||
} | ||||||||||||
|
||||||||||||
#[test] | ||||||||||||
fn test_regular_variant_schema_order_agnostic() { | ||||||||||||
// a regular variant schema | ||||||||||||
let metadata_field = Field::new("metadata", DataType::BinaryView, false); | ||||||||||||
let value_field = Field::new("value", DataType::BinaryView, false); | ||||||||||||
|
||||||||||||
let schema = Fields::from(vec![value_field, metadata_field]); // note the order switch | ||||||||||||
|
||||||||||||
validate_shredded_schema(&schema).unwrap(); | ||||||||||||
} | ||||||||||||
|
||||||||||||
#[test] | ||||||||||||
fn test_regular_variant_schema_allow_other_columns() { | ||||||||||||
// a regular variant schema | ||||||||||||
let metadata_field = Field::new("metadata", DataType::BinaryView, false); | ||||||||||||
let value_field = Field::new("value", DataType::BinaryView, false); | ||||||||||||
|
||||||||||||
let trace_field = Field::new("trace_id", DataType::Utf8View, false); | ||||||||||||
let created_at_field = Field::new("created_at", DataType::Date64, false); | ||||||||||||
|
||||||||||||
let schema = Fields::from(vec![ | ||||||||||||
metadata_field, | ||||||||||||
trace_field, | ||||||||||||
created_at_field, | ||||||||||||
value_field, | ||||||||||||
]); | ||||||||||||
|
||||||||||||
validate_shredded_schema(&schema).unwrap(); | ||||||||||||
} | ||||||||||||
|
||||||||||||
#[test] | ||||||||||||
fn test_regular_variant_schema_missing_metadata() { | ||||||||||||
let value_field = Field::new("value", DataType::BinaryView, false); | ||||||||||||
let schema = Fields::from(vec![value_field]); | ||||||||||||
|
||||||||||||
let err = validate_shredded_schema(&schema).unwrap_err(); | ||||||||||||
|
||||||||||||
assert_eq!( | ||||||||||||
err.to_string(), | ||||||||||||
"Invalid argument error: Invalid VariantArray: StructArray must contain a 'metadata' field" | ||||||||||||
); | ||||||||||||
} | ||||||||||||
|
||||||||||||
#[test] | ||||||||||||
fn test_regular_variant_schema_nullable_metadata() { | ||||||||||||
let metadata_field = Field::new("metadata", DataType::BinaryView, true); | ||||||||||||
let value_field = Field::new("value", DataType::BinaryView, false); | ||||||||||||
|
||||||||||||
let schema = Fields::from(vec![metadata_field, value_field]); | ||||||||||||
|
||||||||||||
let err = validate_shredded_schema(&schema).unwrap_err(); | ||||||||||||
|
||||||||||||
assert_eq!( | ||||||||||||
err.to_string(), | ||||||||||||
"Invalid argument error: Invalid VariantArray: metadata field can not be nullable" | ||||||||||||
); | ||||||||||||
} | ||||||||||||
|
||||||||||||
#[test] | ||||||||||||
fn test_regular_variant_schema_allow_nullable_value() { | ||||||||||||
// a regular variant schema | ||||||||||||
let metadata_field = Field::new("metadata", DataType::BinaryView, false); | ||||||||||||
let value_field = Field::new("value", DataType::BinaryView, true); | ||||||||||||
|
||||||||||||
let schema = Fields::from(vec![metadata_field, value_field]); | ||||||||||||
|
||||||||||||
validate_shredded_schema(&schema).unwrap(); | ||||||||||||
} | ||||||||||||
|
||||||||||||
#[test] | ||||||||||||
fn test_shredded_variant_schema() { | ||||||||||||
let metadata_field = Field::new("metadata", DataType::BinaryView, false); | ||||||||||||
let typed_value_field = Field::new("typed_value", DataType::Int64, false); | ||||||||||||
let schema = Fields::from(vec![metadata_field, typed_value_field]); | ||||||||||||
|
||||||||||||
validate_shredded_schema(&schema).unwrap(); | ||||||||||||
} | ||||||||||||
|
||||||||||||
#[test] | ||||||||||||
fn test_partially_shredded_variant_schema() { | ||||||||||||
let metadata_field = Field::new("metadata", DataType::BinaryView, false); | ||||||||||||
let value_field = Field::new("value", DataType::BinaryView, false); | ||||||||||||
let typed_value_field = Field::new("typed_value", DataType::Int64, false); | ||||||||||||
let schema = Fields::from(vec![metadata_field, value_field, typed_value_field]); | ||||||||||||
|
||||||||||||
validate_shredded_schema(&schema).unwrap(); | ||||||||||||
} | ||||||||||||
|
||||||||||||
#[test] | ||||||||||||
fn test_partially_shredded_variant_list_schema() { | ||||||||||||
/* | ||||||||||||
optional group tags (VARIANT) { | ||||||||||||
required binary metadata; | ||||||||||||
optional binary value; | ||||||||||||
optional group typed_value (LIST) { # must be optional to allow a null list | ||||||||||||
repeated group list { | ||||||||||||
required group element { # shredded element | ||||||||||||
optional binary value; | ||||||||||||
optional binary typed_value (STRING); | ||||||||||||
} | ||||||||||||
required group element { # shredded element | ||||||||||||
optional binary value; | ||||||||||||
optional int64 typed_value ; | ||||||||||||
} | ||||||||||||
} | ||||||||||||
} | ||||||||||||
} | ||||||||||||
*/ | ||||||||||||
|
||||||||||||
let metadata_field = Field::new("metadata", DataType::BinaryView, false); | ||||||||||||
let value_field = Field::new("value", DataType::BinaryView, false); | ||||||||||||
|
||||||||||||
// Define union fields for different element types | ||||||||||||
let string_element = { | ||||||||||||
let value_field = Field::new("value", DataType::BinaryView, true); | ||||||||||||
let typed_value = Field::new("typed_value", DataType::BinaryView, true); | ||||||||||||
DataType::Struct(Fields::from(vec![value_field, typed_value])) | ||||||||||||
}; | ||||||||||||
|
||||||||||||
let int_element = { | ||||||||||||
let value_field = Field::new("value", DataType::BinaryView, true); | ||||||||||||
let typed_value = Field::new("typed_value", DataType::Int64, true); | ||||||||||||
DataType::Struct(Fields::from(vec![value_field, typed_value])) | ||||||||||||
}; | ||||||||||||
|
||||||||||||
// Create union of different element types | ||||||||||||
let union_fields = UnionFields::new( | ||||||||||||
vec![0, 1], | ||||||||||||
vec![ | ||||||||||||
Field::new("string_element", string_element, true), | ||||||||||||
Field::new("int_element", int_element, true), | ||||||||||||
], | ||||||||||||
); | ||||||||||||
Comment on lines
+274
to
+281
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't love this, the field names are weird. However, we need a way to support a heterogenous list of I'm curious if there is a nicer way to represent a group. |
||||||||||||
|
||||||||||||
let typed_value_field = Field::new( | ||||||||||||
"typed_value", | ||||||||||||
DataType::Union(union_fields, UnionMode::Sparse), | ||||||||||||
false, | ||||||||||||
); | ||||||||||||
let schema = Fields::from(vec![metadata_field, value_field, typed_value_field]); | ||||||||||||
|
||||||||||||
validate_shredded_schema(&schema).unwrap(); | ||||||||||||
} | ||||||||||||
|
||||||||||||
#[test] | ||||||||||||
fn test_partially_shredded_variant_object_schema() { | ||||||||||||
/* | ||||||||||||
optional group event (VARIANT) { | ||||||||||||
required binary metadata; | ||||||||||||
optional binary value; # a variant, expected to be an object | ||||||||||||
optional group typed_value { # shredded fields for the variant object | ||||||||||||
required group event_type { # shredded field for event_type | ||||||||||||
optional binary value; | ||||||||||||
optional binary typed_value (STRING); | ||||||||||||
} | ||||||||||||
required group event_ts { # shredded field for event_ts | ||||||||||||
optional binary value; | ||||||||||||
optional int64 typed_value (TIMESTAMP(true, MICROS)); | ||||||||||||
} | ||||||||||||
} | ||||||||||||
} | ||||||||||||
*/ | ||||||||||||
|
||||||||||||
let metadata_field = Field::new("metadata", DataType::BinaryView, false); | ||||||||||||
let value_field = Field::new("value", DataType::BinaryView, false); | ||||||||||||
|
||||||||||||
// event_type | ||||||||||||
let element_group_1 = { | ||||||||||||
let value_field = Field::new("value", DataType::BinaryView, false); | ||||||||||||
let typed_value = Field::new("typed_value", DataType::BinaryView, false); // this is the string case | ||||||||||||
|
||||||||||||
Fields::from(vec![value_field, typed_value]) | ||||||||||||
}; | ||||||||||||
|
||||||||||||
// event_ts | ||||||||||||
let element_group_2 = { | ||||||||||||
let value_field = Field::new("value", DataType::BinaryView, false); | ||||||||||||
let typed_value = Field::new("typed_value", DataType::Int64, false); | ||||||||||||
|
||||||||||||
Fields::from(vec![value_field, typed_value]) | ||||||||||||
}; | ||||||||||||
|
||||||||||||
let typed_value_field = Field::new( | ||||||||||||
"typed_value", | ||||||||||||
DataType::Union( | ||||||||||||
UnionFields::new( | ||||||||||||
vec![0, 1], | ||||||||||||
vec![ | ||||||||||||
Field::new("event_type", DataType::Struct(element_group_1), true), | ||||||||||||
Field::new("event_ts", DataType::Struct(element_group_2), true), | ||||||||||||
], | ||||||||||||
), | ||||||||||||
UnionMode::Sparse, | ||||||||||||
), | ||||||||||||
false, | ||||||||||||
); | ||||||||||||
Comment on lines
+331
to
+344
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to https://github.com/apache/arrow-rs/pull/7921/files#r2203048613, but this is nicer, since we can treat field names as key names. |
||||||||||||
let schema = Fields::from(vec![metadata_field, value_field, typed_value_field]); | ||||||||||||
|
||||||||||||
validate_shredded_schema(&schema).unwrap(); | ||||||||||||
} | ||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When would we allow (or forbid) both fields being null?