diff --git a/CHANGELOG.md b/CHANGELOG.md index 363884713131..769798c307c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ - feat: Improve DataType display for `RunEndEncoded` [\#8596](https://github.com/apache/arrow-rs/pull/8596) [[arrow](https://github.com/apache/arrow-rs/labels/arrow)] ([Weijun-H](https://github.com/Weijun-H)) - Add `ArrowError::AvroError`, remaining types and roundtrip tests to `arrow-avro`, [\#8595](https://github.com/apache/arrow-rs/pull/8595) [[arrow](https://github.com/apache/arrow-rs/labels/arrow)] ([jecsand838](https://github.com/jecsand838)) - \[thrift-remodel\] Refactor Thrift encryption and store encodings as bitmask [\#8587](https://github.com/apache/arrow-rs/pull/8587) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] ([etseidl](https://github.com/etseidl)) +- Change default page encoding stats decoding mode from `Full` to `Mask` in `ParquetMetaDataOptions` [\#8859](https://github.com/apache/arrow-rs/pull/8859) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] ([user](https://github.com/user)) - feat: Enhance `Map` display formatting in DataType [\#8570](https://github.com/apache/arrow-rs/pull/8570) [[arrow](https://github.com/apache/arrow-rs/labels/arrow)] ([Weijun-H](https://github.com/Weijun-H)) - feat: Enhance DataType display formatting for `ListView` and `LargeListView` variants [\#8569](https://github.com/apache/arrow-rs/pull/8569) [[arrow](https://github.com/apache/arrow-rs/labels/arrow)] ([Weijun-H](https://github.com/Weijun-H)) - Use custom thrift parser for parquet metadata \(phase 1 of Thrift remodel\) [\#8530](https://github.com/apache/arrow-rs/pull/8530) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] ([etseidl](https://github.com/etseidl)) diff --git a/parquet/benches/metadata.rs b/parquet/benches/metadata.rs index 43b08e6b26a4..5cbfe00fae4b 100644 --- a/parquet/benches/metadata.rs +++ b/parquet/benches/metadata.rs @@ -78,7 +78,7 @@ fn encoded_meta() -> Vec { .set_data_page_offset(rng.random_range(4..2000000000)) .set_dictionary_page_offset(Some(rng.random_range(4..2000000000))) .set_statistics(stats.clone()) - .set_page_encoding_stats(vec![ + .set_page_encoding_stats_full(vec![ PageEncodingStats { page_type: PageType::DICTIONARY_PAGE, encoding: Encoding::PLAIN, @@ -165,7 +165,7 @@ fn criterion_benchmark(c: &mut Criterion) { }); let schema = ParquetMetaDataReader::decode_schema(&meta_data).unwrap(); - let options = ParquetMetaDataOptions::new().with_schema(schema); + let options = ParquetMetaDataOptions::new().with_schema(schema.clone()); c.bench_function("decode metadata with schema", |b| { b.iter(|| { ParquetMetaDataReader::decode_metadata_with_options(&meta_data, Some(&options)) @@ -173,6 +173,37 @@ fn criterion_benchmark(c: &mut Criterion) { }) }); + // Benchmark different page encoding stats modes + let options_mask = ParquetMetaDataOptions::new() + .with_schema(schema.clone()) + .with_page_encoding_stats_mode(parquet::file::metadata::PageEncodingStatsMode::Mask); + c.bench_function("decode metadata (mask mode)", |b| { + b.iter(|| { + ParquetMetaDataReader::decode_metadata_with_options(&meta_data, Some(&options_mask)) + .unwrap(); + }) + }); + + let options_full = ParquetMetaDataOptions::new() + .with_schema(schema.clone()) + .with_page_encoding_stats_mode(parquet::file::metadata::PageEncodingStatsMode::Full); + c.bench_function("decode metadata (full mode)", |b| { + b.iter(|| { + ParquetMetaDataReader::decode_metadata_with_options(&meta_data, Some(&options_full)) + .unwrap(); + }) + }); + + let options_skip = ParquetMetaDataOptions::new() + .with_schema(schema.clone()) + .with_page_encoding_stats_mode(parquet::file::metadata::PageEncodingStatsMode::Skip); + c.bench_function("decode metadata (skip mode)", |b| { + b.iter(|| { + ParquetMetaDataReader::decode_metadata_with_options(&meta_data, Some(&options_skip)) + .unwrap(); + }) + }); + let buf: Bytes = black_box(encoded_meta()).into(); c.bench_function("decode parquet metadata (wide)", |b| { b.iter(|| { diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index 54be89f23084..42250e3ad278 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -51,7 +51,7 @@ mod test_util; // Note that this crate is public under the `experimental` feature flag. use crate::file::metadata::RowGroupMetaData; -pub use builder::{ArrayReaderBuilder, CacheOptions, CacheOptionsBuilder}; +pub use builder::{ArrayReaderBuilder, CacheOptionsBuilder}; pub use byte_array::make_byte_array_reader; pub use byte_array_dictionary::make_byte_array_dictionary_reader; #[allow(unused_imports)] // Only used for benchmarks diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 3e3c9108d59c..4029bc301981 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -43,7 +43,7 @@ use crate::data_type::{ByteArray, FixedLenByteArray}; #[cfg(feature = "encryption")] use crate::encryption::encrypt::FileEncryptor; use crate::errors::{ParquetError, Result}; -use crate::file::metadata::{KeyValue, ParquetMetaData, RowGroupMetaData}; +use crate::file::metadata::{KeyValue, PageEncodingStatsMode, ParquetMetaData, ParquetMetaDataOptions, RowGroupMetaData}; use crate::file::properties::{WriterProperties, WriterPropertiesPtr}; use crate::file::reader::{ChunkReader, Length}; use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter}; @@ -4433,7 +4433,10 @@ mod tests { .unwrap(); // check that the read metadata is also correct - let options = ReadOptionsBuilder::new().with_page_index().build(); + let options = ReadOptionsBuilder::new() + .with_page_index() + .with_metadata_options(ParquetMetaDataOptions::new().with_page_encoding_stats_mode(PageEncodingStatsMode::Full)) + .build(); let reader = SerializedFileReader::new_with_options(file, options).unwrap(); let rowgroup = reader.get_row_group(0).expect("row group missing"); diff --git a/parquet/src/arrow/schema/extension.rs b/parquet/src/arrow/schema/extension.rs index fe3e856a6c38..cfe71713920e 100644 --- a/parquet/src/arrow/schema/extension.rs +++ b/parquet/src/arrow/schema/extension.rs @@ -27,7 +27,6 @@ use crate::basic::LogicalType; use crate::errors::ParquetError; use crate::schema::types::Type; use arrow_schema::Field; -use arrow_schema::extension::ExtensionType; /// Adds extension type metadata, if necessary, based on the Parquet field's /// [`LogicalType`] @@ -36,7 +35,7 @@ use arrow_schema::extension::ExtensionType; /// Arrow DataType, and instead are represented by an Arrow ExtensionType. /// Extension types are attached to Arrow Fields via metadata. pub(crate) fn try_add_extension_type( - mut arrow_field: Field, + arrow_field: Field, parquet_type: &Type, ) -> Result { let Some(parquet_logical_type) = parquet_type.get_basic_info().logical_type_ref() else { diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 417c0112759a..cd63d6614279 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -40,7 +40,7 @@ use crate::encryption::encrypt::get_column_crypto_metadata; use crate::errors::{ParquetError, Result}; use crate::file::metadata::{ ColumnChunkMetaData, ColumnChunkMetaDataBuilder, ColumnIndexBuilder, LevelHistogram, - OffsetIndexBuilder, PageEncodingStats, + OffsetIndexBuilder, PageEncodingStats, ParquetPageEncodingStats, }; use crate::file::properties::{ EnabledStatistics, WriterProperties, WriterPropertiesPtr, WriterVersion, @@ -1191,7 +1191,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { let mut builder = ColumnChunkMetaData::builder(self.descr.clone()) .set_compression(self.codec) .set_encodings_mask(EncodingMask::new_from_encodings(self.encodings.iter())) - .set_page_encoding_stats(self.encoding_stats.clone()) + .set_page_encoding_stats_full(self.encoding_stats.clone()) .set_total_compressed_size(total_compressed_size) .set_total_uncompressed_size(total_uncompressed_size) .set_num_values(num_values) @@ -2485,21 +2485,24 @@ mod tests { (PageType::DATA_PAGE, 1, 3), ] ); - assert_eq!( - r.metadata.page_encoding_stats(), - Some(&vec![ - PageEncodingStats { - page_type: PageType::DICTIONARY_PAGE, - encoding: Encoding::PLAIN, - count: 1 - }, - PageEncodingStats { - page_type: PageType::DATA_PAGE, - encoding: Encoding::RLE_DICTIONARY, - count: 2, - } - ]) - ); + match r.metadata.page_encoding_stats() { + Some(ParquetPageEncodingStats::Full(stats)) => assert_eq!( + stats, + &vec![ + PageEncodingStats { + page_type: PageType::DICTIONARY_PAGE, + encoding: Encoding::PLAIN, + count: 1 + }, + PageEncodingStats { + page_type: PageType::DATA_PAGE, + encoding: Encoding::RLE_DICTIONARY, + count: 2, + } + ] + ), + _ => panic!("Expected full page encoding stats"), + } } #[test] @@ -4104,7 +4107,10 @@ mod tests { let meta = column_write_and_get_metadata::(props, data); assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset); assert_eq!(meta.encodings().collect::>(), encodings); - assert_eq!(meta.page_encoding_stats().unwrap(), page_encoding_stats); + match meta.page_encoding_stats().unwrap() { + ParquetPageEncodingStats::Full(stats) => assert_eq!(stats, page_encoding_stats), + _ => panic!("Expected full page encoding stats"), + } } /// Returns column writer. diff --git a/parquet/src/file/metadata/memory.rs b/parquet/src/file/metadata/memory.rs index 11536bbbd41e..526ce1a9ea88 100644 --- a/parquet/src/file/metadata/memory.rs +++ b/parquet/src/file/metadata/memory.rs @@ -21,7 +21,8 @@ use crate::basic::{BoundaryOrder, ColumnOrder, Compression, Encoding, PageType}; use crate::data_type::private::ParquetValueType; use crate::file::metadata::{ - ColumnChunkMetaData, FileMetaData, KeyValue, PageEncodingStats, RowGroupMetaData, SortingColumn, + ColumnChunkMetaData, FileMetaData, KeyValue, PageEncodingStats, ParquetPageEncodingStats, + RowGroupMetaData, SortingColumn, }; use crate::file::page_index::column_index::{ ByteArrayColumnIndex, ColumnIndex, ColumnIndexMetaData, PrimitiveColumnIndex, @@ -332,3 +333,12 @@ impl HeapSize for ColumnOrder { 0 // no heap allocations in ColumnOrder } } + +impl HeapSize for ParquetPageEncodingStats { + fn heap_size(&self) -> usize { + match self { + ParquetPageEncodingStats::Mask(_) => 0, // EncodingMask has no heap allocations + ParquetPageEncodingStats::Full(vec) => vec.heap_size(), + } + } +} diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index 45b69a66799f..62ae5abf9e7f 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -470,6 +470,34 @@ pub struct PageEncodingStats { } ); +/// Mode for decoding page encoding statistics in Parquet metadata. +/// +/// This enum controls how page encoding statistics are represented in memory. +/// The default mode is `Mask`, which uses a compact bitmask representation. +/// `Full` mode retains the full vector of statistics, and `Skip` mode omits them entirely. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum PageEncodingStatsMode { + /// Use a compact bitmask representation for page encoding statistics. + #[default] + Mask, + /// Retain the full vector of page encoding statistics. + Full, + /// Skip decoding page encoding statistics entirely. + Skip, +} + +/// In-memory representation of page encoding statistics for a column chunk. +/// +/// This enum allows storing page encoding stats either as a compact mask +/// or as the full vector of statistics, depending on the decoding mode. +#[derive(Debug, Clone, PartialEq)] +pub enum ParquetPageEncodingStats { + /// Compact representation using a bitmask. + Mask(EncodingMask), + /// Full representation with detailed statistics per page. + Full(Vec), +} + /// Reference counted pointer for [`FileMetaData`]. pub type FileMetaDataPtr = Arc; @@ -812,7 +840,7 @@ pub struct ColumnChunkMetaData { dictionary_page_offset: Option, statistics: Option, geo_statistics: Option>, - encoding_stats: Option>, + encoding_stats: Option, bloom_filter_offset: Option, bloom_filter_length: Option, offset_index_offset: Option, @@ -1050,12 +1078,30 @@ impl ColumnChunkMetaData { self.geo_statistics.as_deref() } - /// Returns the offset for the page encoding stats, + /// Returns the page encoding stats for this column chunk, /// or `None` if no page encoding stats are available. - pub fn page_encoding_stats(&self) -> Option<&Vec> { + pub fn page_encoding_stats(&self) -> Option<&ParquetPageEncodingStats> { self.encoding_stats.as_ref() } + /// Returns the page encoding stats mask if available and stored as a mask, + /// or `None` otherwise. + pub fn page_encoding_stats_mask(&self) -> Option<&EncodingMask> { + match self.encoding_stats.as_ref()? { + ParquetPageEncodingStats::Mask(mask) => Some(mask), + _ => None, + } + } + + /// Returns the full page encoding stats if available and stored as full stats, + /// or `None` otherwise. + pub fn page_encoding_stats_full(&self) -> Option<&Vec> { + match self.encoding_stats.as_ref()? { + ParquetPageEncodingStats::Full(stats) => Some(stats), + _ => None, + } + } + /// Returns the offset for the bloom filter. pub fn bloom_filter_offset(&self) -> Option { self.bloom_filter_offset @@ -1273,11 +1319,17 @@ impl ColumnChunkMetaDataBuilder { } /// Sets page encoding stats for this column chunk. - pub fn set_page_encoding_stats(mut self, value: Vec) -> Self { + pub fn set_page_encoding_stats(mut self, value: ParquetPageEncodingStats) -> Self { self.0.encoding_stats = Some(value); self } + /// Sets page encoding stats as full stats for this column chunk. + pub fn set_page_encoding_stats_full(mut self, value: Vec) -> Self { + self.0.encoding_stats = Some(ParquetPageEncodingStats::Full(value)); + self + } + /// Clears the page encoding stats for this column chunk. pub fn clear_page_encoding_stats(mut self) -> Self { self.0.encoding_stats = None; @@ -1633,7 +1685,7 @@ mod tests { let mut writer = ThriftCompactOutputProtocol::new(&mut buf); row_group_meta.write_thrift(&mut writer).unwrap(); - let row_group_res = read_row_group(&mut buf, schema_descr).unwrap(); + let row_group_res = read_row_group(&mut buf, schema_descr, None).unwrap(); assert_eq!(row_group_res, row_group_meta); } @@ -1715,7 +1767,7 @@ mod tests { let mut writer = ThriftCompactOutputProtocol::new(&mut buf); row_group_meta_2cols.write_thrift(&mut writer).unwrap(); - let err = read_row_group(&mut buf, schema_descr_3cols) + let err = read_row_group(&mut buf, schema_descr_3cols, None) .unwrap_err() .to_string(); assert_eq!( @@ -1738,7 +1790,7 @@ mod tests { .set_total_uncompressed_size(3000) .set_data_page_offset(4000) .set_dictionary_page_offset(Some(5000)) - .set_page_encoding_stats(vec![ + .set_page_encoding_stats_full(vec![ PageEncodingStats { page_type: PageType::DATA_PAGE, encoding: Encoding::PLAIN, @@ -1765,7 +1817,7 @@ mod tests { let mut buf = Vec::new(); let mut writer = ThriftCompactOutputProtocol::new(&mut buf); col_metadata.write_thrift(&mut writer).unwrap(); - let col_chunk_res = read_column_chunk(&mut buf, column_descr).unwrap(); + let col_chunk_res = read_column_chunk(&mut buf, column_descr, None).unwrap(); assert_eq!(col_chunk_res, col_metadata); } @@ -1781,7 +1833,7 @@ mod tests { let mut buf = Vec::new(); let mut writer = ThriftCompactOutputProtocol::new(&mut buf); col_metadata.write_thrift(&mut writer).unwrap(); - let col_chunk_res = read_column_chunk(&mut buf, column_descr).unwrap(); + let col_chunk_res = read_column_chunk(&mut buf, column_descr, None).unwrap(); assert_eq!(col_chunk_res, col_metadata); } @@ -2017,4 +2069,292 @@ mod tests { Arc::new(SchemaDescriptor::new(Arc::new(schema))) } + + #[test] + fn test_page_encoding_stats_mode_mask() { + // Test that default options use Mask mode + let options = ParquetMetaDataOptions::new(); + assert_eq!( + options.page_encoding_stats_mode(), + PageEncodingStatsMode::Mask + ); + + // Test that Mask mode decodes to Mask variant + let column_descr = get_test_schema_descr().column(0); + let col_metadata = ColumnChunkMetaData::builder(column_descr.clone()) + .set_page_encoding_stats_full(vec![PageEncodingStats { + page_type: PageType::DATA_PAGE, + encoding: Encoding::PLAIN, + count: 3, + }]) + .build() + .unwrap(); + + let mut buf = Vec::new(); + let mut writer = ThriftCompactOutputProtocol::new(&mut buf); + col_metadata.write_thrift(&mut writer).unwrap(); + + let options = ParquetMetaDataOptions::new() + .with_page_encoding_stats_mode(PageEncodingStatsMode::Mask); + let decoded = read_column_chunk(&mut buf, column_descr, Some(&options)).unwrap(); + + match decoded.page_encoding_stats() { + Some(ParquetPageEncodingStats::Mask(mask)) => { + // Should have PLAIN encoding in mask + assert!(mask.is_set(Encoding::PLAIN)); + } + _ => panic!("Expected Mask variant"), + } + } + + #[test] + fn test_page_encoding_stats_mode_full() { + // Test that Full mode decodes to Full variant + let column_descr = get_test_schema_descr().column(0); + let col_metadata = ColumnChunkMetaData::builder(column_descr.clone()) + .set_page_encoding_stats_full(vec![PageEncodingStats { + page_type: PageType::DATA_PAGE, + encoding: Encoding::PLAIN, + count: 3, + }]) + .build() + .unwrap(); + + let mut buf = Vec::new(); + let mut writer = ThriftCompactOutputProtocol::new(&mut buf); + col_metadata.write_thrift(&mut writer).unwrap(); + + let options = ParquetMetaDataOptions::new() + .with_page_encoding_stats_mode(PageEncodingStatsMode::Full); + let decoded = read_column_chunk(&mut buf, column_descr, Some(&options)).unwrap(); + + match decoded.page_encoding_stats() { + Some(ParquetPageEncodingStats::Full(stats)) => { + assert_eq!(stats.len(), 1); + assert_eq!(stats[0].encoding, Encoding::PLAIN); + assert_eq!(stats[0].count, 3); + } + _ => panic!("Expected Full variant"), + } + } + + #[test] + fn test_page_encoding_stats_mode_skip() { + // Test that Skip mode results in None + let column_descr = get_test_schema_descr().column(0); + let col_metadata = ColumnChunkMetaData::builder(column_descr.clone()) + .set_page_encoding_stats_full(vec![PageEncodingStats { + page_type: PageType::DATA_PAGE, + encoding: Encoding::PLAIN, + count: 3, + }]) + .build() + .unwrap(); + + let mut buf = Vec::new(); + let mut writer = ThriftCompactOutputProtocol::new(&mut buf); + col_metadata.write_thrift(&mut writer).unwrap(); + + let options = ParquetMetaDataOptions::new() + .with_page_encoding_stats_mode(PageEncodingStatsMode::Skip); + let decoded = read_column_chunk(&mut buf, column_descr, Some(&options)).unwrap(); + + assert!(decoded.page_encoding_stats().is_none()); + } + + #[test] + fn test_mask_mode_handles_missing_encoding_stats_field() { + // Test that Mask mode handles missing encoding_stats field gracefully (returns None) + let column_descr = get_test_schema_descr().column(0); + let col_metadata = ColumnChunkMetaData::builder(column_descr.clone()) + .build() + .unwrap(); // No encoding stats set + + let mut buf = Vec::new(); + let mut writer = ThriftCompactOutputProtocol::new(&mut buf); + col_metadata.write_thrift(&mut writer).unwrap(); + + let options = ParquetMetaDataOptions::new() + .with_page_encoding_stats_mode(PageEncodingStatsMode::Mask); + let decoded = read_column_chunk(&mut buf, column_descr, Some(&options)).unwrap(); + + assert!(decoded.page_encoding_stats().is_none()); + } + + #[test] + fn test_full_mode_handles_missing_encoding_stats_field() { + // Test that Full mode handles missing encoding_stats field gracefully (returns None) + let column_descr = get_test_schema_descr().column(0); + let col_metadata = ColumnChunkMetaData::builder(column_descr.clone()) + .build() + .unwrap(); // No encoding stats set + + let mut buf = Vec::new(); + let mut writer = ThriftCompactOutputProtocol::new(&mut buf); + col_metadata.write_thrift(&mut writer).unwrap(); + + let options = ParquetMetaDataOptions::new() + .with_page_encoding_stats_mode(PageEncodingStatsMode::Full); + let decoded = read_column_chunk(&mut buf, column_descr, Some(&options)).unwrap(); + + assert!(decoded.page_encoding_stats().is_none()); + } + + #[test] + fn test_skip_mode_handles_missing_encoding_stats_field() { + // Test that Skip mode handles missing encoding_stats field (returns None, as expected) + let column_descr = get_test_schema_descr().column(0); + let col_metadata = ColumnChunkMetaData::builder(column_descr.clone()) + .build() + .unwrap(); // No encoding stats set + + let mut buf = Vec::new(); + let mut writer = ThriftCompactOutputProtocol::new(&mut buf); + col_metadata.write_thrift(&mut writer).unwrap(); + + let options = ParquetMetaDataOptions::new() + .with_page_encoding_stats_mode(PageEncodingStatsMode::Skip); + let decoded = read_column_chunk(&mut buf, column_descr, Some(&options)).unwrap(); + + assert!(decoded.page_encoding_stats().is_none()); + } + + #[test] + fn test_mask_mode_with_empty_encoding_stats() { + // Test Mask mode with empty encoding stats list (should produce empty mask) + let column_descr = get_test_schema_descr().column(0); + let col_metadata = ColumnChunkMetaData::builder(column_descr.clone()) + .set_page_encoding_stats_full(vec![]) // Empty list + .build() + .unwrap(); + + let mut buf = Vec::new(); + let mut writer = ThriftCompactOutputProtocol::new(&mut buf); + col_metadata.write_thrift(&mut writer).unwrap(); + + let options = ParquetMetaDataOptions::new() + .with_page_encoding_stats_mode(PageEncodingStatsMode::Mask); + let decoded = read_column_chunk(&mut buf, column_descr, Some(&options)).unwrap(); + + match decoded.page_encoding_stats().unwrap() { + ParquetPageEncodingStats::Mask(mask) => assert!(mask.as_i32() == 0), // Empty mask + _ => panic!("Expected Mask variant"), + } + } + + #[test] + fn test_full_mode_with_empty_encoding_stats() { + // Test Full mode with empty encoding stats list (should preserve empty vec) + let column_descr = get_test_schema_descr().column(0); + let col_metadata = ColumnChunkMetaData::builder(column_descr.clone()) + .set_page_encoding_stats_full(vec![]) // Empty list + .build() + .unwrap(); + + let mut buf = Vec::new(); + let mut writer = ThriftCompactOutputProtocol::new(&mut buf); + col_metadata.write_thrift(&mut writer).unwrap(); + + let options = ParquetMetaDataOptions::new() + .with_page_encoding_stats_mode(PageEncodingStatsMode::Full); + let decoded = read_column_chunk(&mut buf, column_descr, Some(&options)).unwrap(); + + match decoded.page_encoding_stats().unwrap() { + ParquetPageEncodingStats::Full(stats) => assert!(stats.is_empty()), + _ => panic!("Expected Full variant"), + } + } + + #[test] + fn test_mask_mode_with_duplicate_encodings() { + // Test Mask mode deduplicates encodings in stats (mask should reflect unique encodings) + let column_descr = get_test_schema_descr().column(0); + let col_metadata = ColumnChunkMetaData::builder(column_descr.clone()) + .set_page_encoding_stats_full(vec![ + PageEncodingStats { + page_type: PageType::DATA_PAGE, + encoding: Encoding::PLAIN, + count: 1, + }, + PageEncodingStats { + page_type: PageType::DATA_PAGE, + encoding: Encoding::PLAIN, // Duplicate + count: 2, + }, + PageEncodingStats { + page_type: PageType::DICTIONARY_PAGE, + encoding: Encoding::RLE_DICTIONARY, + count: 1, + }, + ]) + .build() + .unwrap(); + + let mut buf = Vec::new(); + let mut writer = ThriftCompactOutputProtocol::new(&mut buf); + col_metadata.write_thrift(&mut writer).unwrap(); + + let options = ParquetMetaDataOptions::new() + .with_page_encoding_stats_mode(PageEncodingStatsMode::Mask); + let decoded = read_column_chunk(&mut buf, column_descr, Some(&options)).unwrap(); + + match decoded.page_encoding_stats().unwrap() { + ParquetPageEncodingStats::Mask(mask) => { + // Mask should contain PLAIN and RLE_DICTIONARY + assert!(mask.is_set(Encoding::PLAIN)); + assert!(mask.is_set(Encoding::RLE_DICTIONARY)); + assert!(!mask.is_set(Encoding::DELTA_BINARY_PACKED)); // Not present + } + _ => panic!("Expected Mask variant"), + } + } + + #[test] + fn test_mask_mode_with_various_encodings() { + // Test Mask mode with a variety of encodings to ensure comprehensive coverage + let column_descr = get_test_schema_descr().column(0); + let col_metadata = ColumnChunkMetaData::builder(column_descr.clone()) + .set_page_encoding_stats_full(vec![ + PageEncodingStats { + page_type: PageType::DATA_PAGE, + encoding: Encoding::PLAIN, + count: 1, + }, + PageEncodingStats { + page_type: PageType::DICTIONARY_PAGE, + encoding: Encoding::RLE_DICTIONARY, + count: 1, + }, + PageEncodingStats { + page_type: PageType::DATA_PAGE, + encoding: Encoding::DELTA_BINARY_PACKED, + count: 1, + }, + PageEncodingStats { + page_type: PageType::DATA_PAGE, + encoding: Encoding::BYTE_STREAM_SPLIT, + count: 1, + }, + ]) + .build() + .unwrap(); + + let mut buf = Vec::new(); + let mut writer = ThriftCompactOutputProtocol::new(&mut buf); + col_metadata.write_thrift(&mut writer).unwrap(); + + let options = ParquetMetaDataOptions::new() + .with_page_encoding_stats_mode(PageEncodingStatsMode::Mask); + let decoded = read_column_chunk(&mut buf, column_descr, Some(&options)).unwrap(); + + match decoded.page_encoding_stats().unwrap() { + ParquetPageEncodingStats::Mask(mask) => { + assert!(mask.is_set(Encoding::PLAIN)); + assert!(mask.is_set(Encoding::RLE_DICTIONARY)); + assert!(mask.is_set(Encoding::DELTA_BINARY_PACKED)); + assert!(mask.is_set(Encoding::BYTE_STREAM_SPLIT)); + } + _ => panic!("Expected Mask variant"), + } + } } diff --git a/parquet/src/file/metadata/options.rs b/parquet/src/file/metadata/options.rs index bbc5314d3ac7..edd21093e5af 100644 --- a/parquet/src/file/metadata/options.rs +++ b/parquet/src/file/metadata/options.rs @@ -29,6 +29,8 @@ use crate::schema::types::SchemaDescPtr; #[derive(Default, Debug, Clone)] pub struct ParquetMetaDataOptions { schema_descr: Option, + /// Mode for decoding page encoding statistics. Defaults to `Mask` for compact representation. + page_encoding_stats_mode: super::PageEncodingStatsMode, } impl ParquetMetaDataOptions { @@ -53,6 +55,17 @@ impl ParquetMetaDataOptions { self.schema_descr = Some(val); self } + + /// Returns the mode for decoding page encoding statistics. + pub fn page_encoding_stats_mode(&self) -> super::PageEncodingStatsMode { + self.page_encoding_stats_mode + } + + /// Set the mode for decoding page encoding statistics. Returns `Self` for chaining. + pub fn with_page_encoding_stats_mode(mut self, mode: super::PageEncodingStatsMode) -> Self { + self.page_encoding_stats_mode = mode; + self + } } #[cfg(test)] diff --git a/parquet/src/file/metadata/thrift/mod.rs b/parquet/src/file/metadata/thrift/mod.rs index 225c4d29d2dc..194415e33b1d 100644 --- a/parquet/src/file/metadata/thrift/mod.rs +++ b/parquet/src/file/metadata/thrift/mod.rs @@ -387,6 +387,7 @@ fn validate_column_metadata(mask: u16) -> Result<()> { fn read_column_metadata<'a>( prot: &mut ThriftSliceInputProtocol<'a>, column: &mut ColumnChunkMetaData, + options: Option<&ParquetMetaDataOptions>, ) -> Result { // mask for seen required fields in ColumnMetaData let mut seen_mask = 0u16; @@ -462,9 +463,26 @@ fn read_column_metadata<'a>( convert_stats(column_descr, Some(Statistics::read_thrift(&mut *prot)?))?; } 13 => { - let val = - read_thrift_vec::(&mut *prot)?; - column.encoding_stats = Some(val); + use crate::file::metadata::{PageEncodingStatsMode, ParquetPageEncodingStats}; + match options.map(|o| o.page_encoding_stats_mode()) { + Some(PageEncodingStatsMode::Skip) => { + // Skip reading encoding_stats entirely + prot.skip(field_ident.field_type)?; + } + Some(PageEncodingStatsMode::Mask) => { + let val = read_thrift_vec::( + &mut *prot, + )?; + let mask = EncodingMask::new_from_encodings(val.iter().map(|s| &s.encoding)); + column.encoding_stats = Some(ParquetPageEncodingStats::Mask(mask)); + } + Some(PageEncodingStatsMode::Full) | None => { + let val = read_thrift_vec::( + &mut *prot, + )?; + column.encoding_stats = Some(ParquetPageEncodingStats::Full(val)); + } + } } 14 => { column.bloom_filter_offset = Some(i64::read_thrift(&mut *prot)?); @@ -499,6 +517,7 @@ fn read_column_metadata<'a>( fn read_column_chunk<'a>( prot: &mut ThriftSliceInputProtocol<'a>, column_descr: &Arc, + options: Option<&ParquetMetaDataOptions>, ) -> Result { // create a default initialized ColumnMetaData let mut col = ColumnChunkMetaDataBuilder::new(column_descr.clone()).build()?; @@ -535,7 +554,7 @@ fn read_column_chunk<'a>( has_file_offset = true; } 3 => { - col_meta_mask = read_column_metadata(&mut *prot, &mut col)?; + col_meta_mask = read_column_metadata(&mut *prot, &mut col, options)?; } 4 => { col.offset_index_offset = Some(i64::read_thrift(&mut *prot)?); @@ -585,6 +604,7 @@ fn read_column_chunk<'a>( fn read_row_group( prot: &mut ThriftSliceInputProtocol, schema_descr: &Arc, + options: Option<&ParquetMetaDataOptions>, ) -> Result { // create default initialized RowGroupMetaData let mut row_group = RowGroupMetaDataBuilder::new(schema_descr.clone()).build_unchecked(); @@ -623,7 +643,7 @@ fn read_row_group( )); } for i in 0..list_ident.size as usize { - let col = read_column_chunk(prot, &schema_descr.columns()[i])?; + let col = read_column_chunk(prot, &schema_descr.columns()[i], options)?; row_group.columns.push(col); } mask |= RG_COLUMNS; @@ -774,7 +794,7 @@ pub(crate) fn parquet_metadata_from_bytes( "Row group ordinal {ordinal} exceeds i16 max value", )) })?; - let rg = read_row_group(&mut prot, schema_descr)?; + let rg = read_row_group(&mut prot, schema_descr, options)?; rg_vec.push(assigner.ensure(ordinal, rg)?); } row_groups = Some(rg_vec); @@ -1302,7 +1322,7 @@ pub(super) fn serialize_column_meta_data( if let Some(stats) = stats { last_field_id = stats.write_thrift_field(w, 12, last_field_id)?; } - if let Some(page_encoding_stats) = column_chunk.page_encoding_stats() { + if let Some(page_encoding_stats) = column_chunk.page_encoding_stats_full() { last_field_id = page_encoding_stats.write_thrift_field(w, 13, last_field_id)?; } if let Some(bloom_filter_offset) = column_chunk.bloom_filter_offset { @@ -1670,7 +1690,7 @@ write_thrift_field!(RustBoundingBox, FieldType::Struct); pub(crate) mod tests { use crate::errors::Result; use crate::file::metadata::thrift::{BoundingBox, SchemaElement, write_schema}; - use crate::file::metadata::{ColumnChunkMetaData, RowGroupMetaData}; + use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaDataOptions, RowGroupMetaData}; use crate::parquet_thrift::tests::test_roundtrip; use crate::parquet_thrift::{ ElementType, ThriftCompactOutputProtocol, ThriftSliceInputProtocol, read_thrift_vec, @@ -1684,17 +1704,19 @@ pub(crate) mod tests { pub(crate) fn read_row_group( buf: &mut [u8], schema_descr: Arc, + options: Option<&ParquetMetaDataOptions>, ) -> Result { let mut reader = ThriftSliceInputProtocol::new(buf); - crate::file::metadata::thrift::read_row_group(&mut reader, &schema_descr) + crate::file::metadata::thrift::read_row_group(&mut reader, &schema_descr, options) } pub(crate) fn read_column_chunk( buf: &mut [u8], column_descr: Arc, + options: Option<&ParquetMetaDataOptions>, ) -> Result { let mut reader = ThriftSliceInputProtocol::new(buf); - crate::file::metadata::thrift::read_column_chunk(&mut reader, &column_descr) + crate::file::metadata::thrift::read_column_chunk(&mut reader, &column_descr, options) } pub(crate) fn roundtrip_schema(schema: TypePtr) -> Result { diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index ce608c77172b..845e5bc3e4e8 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -160,6 +160,12 @@ impl ReadOptionsBuilder { self } + /// Set the metadata options for reading Parquet metadata. + pub fn with_metadata_options(mut self, options: ParquetMetaDataOptions) -> Self { + self.metadata_options = options; + self + } + /// Seal the builder and return the read options pub fn build(self) -> ReadOptions { let props = self @@ -1842,7 +1848,7 @@ mod tests { assert_eq!(col0_metadata.bloom_filter_offset().unwrap(), 192); // test page encoding stats - let page_encoding_stats = &col0_metadata.page_encoding_stats().unwrap()[0]; + let page_encoding_stats = &col0_metadata.page_encoding_stats_full().unwrap()[0]; assert_eq!(page_encoding_stats.page_type, basic::PageType::DATA_PAGE); assert_eq!(page_encoding_stats.encoding, Encoding::PLAIN); diff --git a/rustup-init.exe b/rustup-init.exe new file mode 100644 index 000000000000..111a0590eb0f Binary files /dev/null and b/rustup-init.exe differ