From 15b433c4883b3c037d278bcc6615f6a9374b3133 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Tue, 24 Dec 2024 19:45:23 +0800 Subject: [PATCH 1/3] fix parse var len of decimal for parquet statistic --- crates/iceberg/src/arrow/schema.rs | 27 ++- .../src/writer/file_writer/parquet_writer.rs | 188 +++++++++++++++++- 2 files changed, 213 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index 91dfe85e9..c182ec079 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -30,6 +30,7 @@ use arrow_array::{ use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit}; use bitvec::macros::internal::funty::Fundamental; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; +use parquet::data_type::ByteArray; use parquet::file::statistics::Statistics; use rust_decimal::prelude::ToPrimitive; use uuid::Uuid; @@ -680,6 +681,30 @@ pub(crate) fn get_arrow_datum(datum: &Datum) -> Result Result<[u8; 16]> { + if array.len() > 16 { + return Err(Error::new( + ErrorKind::DataInvalid, + "fail to extend array with len > 16 to array with 16", + )); + } + + // Check the sign bit: if the first byte's MSB is 1, it's negative + let is_negative = array.data().first().map_or(false, |&b| b & 0x80 != 0); + + // Create a buffer of 16 bytes filled with the sign extension value + let mut extended = if is_negative { + [0xFF; 16] // Fill with 0xFF for negative numbers + } else { + [0x00; 16] // Fill with 0x00 for positive numbers + }; + + let start = 16 - array.len(); + extended[start..].copy_from_slice(array.data()); + + Ok(extended) +} + macro_rules! get_parquet_stat_as_datum { ($limit_type:tt) => { paste::paste! { @@ -741,7 +766,7 @@ macro_rules! get_parquet_stat_as_datum { }; Some(Datum::new( primitive_type.clone(), - PrimitiveLiteral::Int128(i128::from_be_bytes(bytes.try_into()?)), + PrimitiveLiteral::Int128(i128::from_be_bytes(extend_to_i128_big_endian(bytes.into())?)), )) } ( diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 596228f7c..7710a45b7 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -478,15 +478,18 @@ mod tests { use anyhow::Result; use arrow_array::types::Int64Type; use arrow_array::{ - Array, ArrayRef, BooleanArray, Int32Array, Int64Array, ListArray, RecordBatch, StructArray, + Array, ArrayRef, BooleanArray, Decimal128Array, Int32Array, Int64Array, ListArray, + RecordBatch, StructArray, }; use arrow_schema::{DataType, SchemaRef as ArrowSchemaRef}; use arrow_select::concat::concat_batches; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + use rust_decimal::Decimal; use tempfile::TempDir; use uuid::Uuid; use super::*; + use crate::arrow::schema_to_arrow_schema; use crate::io::FileIOBuilder; use crate::spec::{PrimitiveLiteral, Struct, *}; use crate::writer::file_writer::location_generator::test::MockLocationGenerator; @@ -1169,4 +1172,187 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_decimal_bound() -> Result<()> { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let loccation_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + // test 1.1 and 2.2 + let schema = Arc::new( + Schema::builder() + .with_fields(vec![NestedField::optional( + 0, + "decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 28, + scale: 10, + }), + ) + .into()]) + .build() + .unwrap(), + ); + let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap()); + let mut pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + schema.clone(), + file_io.clone(), + loccation_gen.clone(), + file_name_gen.clone(), + ) + .build() + .await?; + let col0 = Arc::new( + Decimal128Array::from(vec![Some(22000000000), Some(11000000000)]) + .with_data_type(DataType::Decimal128(28, 10)), + ) as ArrayRef; + let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap(); + pw.write(&to_write).await?; + let res = pw.close().await?; + assert_eq!(res.len(), 1); + let data_file = res + .into_iter() + .next() + .unwrap() + .content(crate::spec::DataContentType::Data) + .partition(Struct::empty()) + .build() + .unwrap(); + assert_eq!( + data_file.upper_bounds().get(&0), + Some(Datum::decimal_with_precision(Decimal::new(22000000000_i64, 10), 28).unwrap()) + .as_ref() + ); + assert_eq!( + data_file.lower_bounds().get(&0), + Some(Datum::decimal_with_precision(Decimal::new(11000000000_i64, 10), 28).unwrap()) + .as_ref() + ); + + // test -1.1 and -2.2 + let schema = Arc::new( + Schema::builder() + .with_fields(vec![NestedField::optional( + 0, + "decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 28, + scale: 10, + }), + ) + .into()]) + .build() + .unwrap(), + ); + let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap()); + let mut pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + schema.clone(), + file_io.clone(), + loccation_gen.clone(), + file_name_gen.clone(), + ) + .build() + .await?; + let col0 = Arc::new( + Decimal128Array::from(vec![Some(-22000000000), Some(-11000000000)]) + .with_data_type(DataType::Decimal128(28, 10)), + ) as ArrayRef; + let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap(); + pw.write(&to_write).await?; + let res = pw.close().await?; + assert_eq!(res.len(), 1); + let data_file = res + .into_iter() + .next() + .unwrap() + .content(crate::spec::DataContentType::Data) + .partition(Struct::empty()) + .build() + .unwrap(); + assert_eq!( + data_file.upper_bounds().get(&0), + Some(Datum::decimal_with_precision(Decimal::new(-11000000000_i64, 10), 28).unwrap()) + .as_ref() + ); + assert_eq!( + data_file.lower_bounds().get(&0), + Some(Datum::decimal_with_precision(Decimal::new(-22000000000_i64, 10), 28).unwrap()) + .as_ref() + ); + + // test max and min for scale 38 + let schema = Arc::new( + Schema::builder() + .with_fields(vec![NestedField::optional( + 0, + "decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 38, + scale: 0, + }), + ) + .into()]) + .build() + .unwrap(), + ); + let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap()); + let mut pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + schema, + file_io.clone(), + loccation_gen, + file_name_gen, + ) + .build() + .await?; + let col0 = Arc::new( + Decimal128Array::from(vec![ + Some(99999999999999999999999999999999999999_i128), + Some(-99999999999999999999999999999999999999_i128), + ]) + .with_data_type(DataType::Decimal128(38, 0)), + ) as ArrayRef; + let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap(); + pw.write(&to_write).await?; + let res = pw.close().await?; + assert_eq!(res.len(), 1); + let data_file = res + .into_iter() + .next() + .unwrap() + .content(crate::spec::DataContentType::Data) + .partition(Struct::empty()) + .build() + .unwrap(); + assert_eq!( + data_file.upper_bounds().get(&0), + Some(Datum::new( + PrimitiveType::Decimal { + precision: 38, + scale: 0 + }, + PrimitiveLiteral::Int128(99999999999999999999999999999999999999_i128) + )) + .as_ref() + ); + assert_eq!( + data_file.lower_bounds().get(&0), + Some(Datum::new( + PrimitiveType::Decimal { + precision: 38, + scale: 0 + }, + PrimitiveLiteral::Int128(-99999999999999999999999999999999999999_i128) + )) + .as_ref() + ); + + Ok(()) + } } From 34c134cb1ce247d1b23b6ea9c595b066db08c29c Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Thu, 2 Jan 2025 15:53:49 +0800 Subject: [PATCH 2/3] fix test --- .../src/writer/file_writer/parquet_writer.rs | 98 +++++++++++++++---- 1 file changed, 78 insertions(+), 20 deletions(-) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 7710a45b7..5561b1913 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -1286,7 +1286,10 @@ mod tests { .as_ref() ); - // test max and min for scale 38 + // test max and min of rust_decimal + let decimal_max = Decimal::MAX; + let decimal_min = Decimal::MIN; + assert_eq!(decimal_max.scale(), decimal_min.scale()); let schema = Arc::new( Schema::builder() .with_fields(vec![NestedField::optional( @@ -1294,7 +1297,7 @@ mod tests { "decimal", Type::Primitive(PrimitiveType::Decimal { precision: 38, - scale: 0, + scale: decimal_max.scale(), }), ) .into()]) @@ -1313,8 +1316,8 @@ mod tests { .await?; let col0 = Arc::new( Decimal128Array::from(vec![ - Some(99999999999999999999999999999999999999_i128), - Some(-99999999999999999999999999999999999999_i128), + Some(decimal_max.mantissa()), + Some(decimal_min.mantissa()), ]) .with_data_type(DataType::Decimal128(38, 0)), ) as ArrayRef; @@ -1332,27 +1335,82 @@ mod tests { .unwrap(); assert_eq!( data_file.upper_bounds().get(&0), - Some(Datum::new( - PrimitiveType::Decimal { - precision: 38, - scale: 0 - }, - PrimitiveLiteral::Int128(99999999999999999999999999999999999999_i128) - )) - .as_ref() + Some(Datum::decimal(decimal_max).unwrap()).as_ref() ); assert_eq!( data_file.lower_bounds().get(&0), - Some(Datum::new( - PrimitiveType::Decimal { - precision: 38, - scale: 0 - }, - PrimitiveLiteral::Int128(-99999999999999999999999999999999999999_i128) - )) - .as_ref() + Some(Datum::decimal(decimal_min).unwrap()).as_ref() ); + // test max and min for scale 38 + // # TODO + // Readd this case after resolve https://github.com/apache/iceberg-rust/issues/669 + // let schema = Arc::new( + // Schema::builder() + // .with_fields(vec![NestedField::optional( + // 0, + // "decimal", + // Type::Primitive(PrimitiveType::Decimal { + // precision: 38, + // scale: 0, + // }), + // ) + // .into()]) + // .build() + // .unwrap(), + // ); + // let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap()); + // let mut pw = ParquetWriterBuilder::new( + // WriterProperties::builder().build(), + // schema, + // file_io.clone(), + // loccation_gen, + // file_name_gen, + // ) + // .build() + // .await?; + // let col0 = Arc::new( + // Decimal128Array::from(vec![ + // Some(99999999999999999999999999999999999999_i128), + // Some(-99999999999999999999999999999999999999_i128), + // ]) + // .with_data_type(DataType::Decimal128(38, 0)), + // ) as ArrayRef; + // let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap(); + // pw.write(&to_write).await?; + // let res = pw.close().await?; + // assert_eq!(res.len(), 1); + // let data_file = res + // .into_iter() + // .next() + // .unwrap() + // .content(crate::spec::DataContentType::Data) + // .partition(Struct::empty()) + // .build() + // .unwrap(); + // assert_eq!( + // data_file.upper_bounds().get(&0), + // Some(Datum::new( + // PrimitiveType::Decimal { + // precision: 38, + // scale: 0 + // }, + // PrimitiveLiteral::Int128(99999999999999999999999999999999999999_i128) + // )) + // .as_ref() + // ); + // assert_eq!( + // data_file.lower_bounds().get(&0), + // Some(Datum::new( + // PrimitiveType::Decimal { + // precision: 38, + // scale: 0 + // }, + // PrimitiveLiteral::Int128(-99999999999999999999999999999999999999_i128) + // )) + // .as_ref() + // ); + Ok(()) } } From 77a6cbe41fd01779cde73d84a8b8192aa6e20175 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Thu, 2 Jan 2025 16:30:01 +0800 Subject: [PATCH 3/3] reuse BigInt logic to decode --- crates/iceberg/src/arrow/schema.rs | 34 +++++++----------------------- 1 file changed, 8 insertions(+), 26 deletions(-) diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index c182ec079..b590c8bc8 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -29,8 +29,8 @@ use arrow_array::{ }; use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit}; use bitvec::macros::internal::funty::Fundamental; +use num_bigint::BigInt; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; -use parquet::data_type::ByteArray; use parquet::file::statistics::Statistics; use rust_decimal::prelude::ToPrimitive; use uuid::Uuid; @@ -681,30 +681,6 @@ pub(crate) fn get_arrow_datum(datum: &Datum) -> Result Result<[u8; 16]> { - if array.len() > 16 { - return Err(Error::new( - ErrorKind::DataInvalid, - "fail to extend array with len > 16 to array with 16", - )); - } - - // Check the sign bit: if the first byte's MSB is 1, it's negative - let is_negative = array.data().first().map_or(false, |&b| b & 0x80 != 0); - - // Create a buffer of 16 bytes filled with the sign extension value - let mut extended = if is_negative { - [0xFF; 16] // Fill with 0xFF for negative numbers - } else { - [0x00; 16] // Fill with 0x00 for positive numbers - }; - - let start = 16 - array.len(); - extended[start..].copy_from_slice(array.data()); - - Ok(extended) -} - macro_rules! get_parquet_stat_as_datum { ($limit_type:tt) => { paste::paste! { @@ -764,9 +740,15 @@ macro_rules! get_parquet_stat_as_datum { let Some(bytes) = stats.[<$limit_type _bytes_opt>]() else { return Ok(None); }; + let unscaled_value = BigInt::from_signed_bytes_be(bytes); Some(Datum::new( primitive_type.clone(), - PrimitiveLiteral::Int128(i128::from_be_bytes(extend_to_i128_big_endian(bytes.into())?)), + PrimitiveLiteral::Int128(unscaled_value.to_i128().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Can't convert bytes to i128: {:?}", bytes), + ) + })?), )) } (