diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 70d3ce7cf9a9..1d5dd7e9a13e 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -791,6 +791,24 @@ impl ArrowReaderOptions { }) } + /// Returns the configured virtual columns, if any. + pub fn virtual_columns(&self) -> &[FieldRef] { + &self.virtual_columns + } + + /// Returns the supplied arrow [`Schema`], if one was provided via + /// [`Self::with_schema`]. + pub fn supplied_schema(&self) -> Option<&SchemaRef> { + self.supplied_schema.as_ref() + } + + /// Returns whether the reader will skip the embedded arrow schema + /// metadata stored in the parquet file (set via + /// [`Self::with_skip_arrow_metadata`]). + pub fn skip_arrow_metadata(&self) -> bool { + self.skip_arrow_metadata + } + #[deprecated( since = "57.2.0", note = "Use `column_index_policy` or `offset_index_policy` instead" @@ -885,6 +903,35 @@ impl ArrowReaderMetadata { Self::try_new(Arc::new(metadata), options) } + /// Construct an [`ArrowReaderMetadata`] from precomputed + /// [`FieldLevels`]. + /// + /// Use this when the caller has already computed the arrow [`Schema`] + /// and [`FieldLevels`] for `metadata` (e.g. by calling + /// [`parquet_to_arrow_field_levels`]) and wants to avoid recomputing + /// them. For wide schemas the field-levels conversion walks every leaf + /// in the parquet schema, so reusing it across reader builds for the + /// same metadata is significantly cheaper than calling [`Self::try_new`] + /// repeatedly. + /// + /// `arrow_schema_metadata` is used for the [`Schema`]'s key/value + /// metadata and is otherwise opaque; pass an empty map when you do not + /// have one. + /// + /// [`parquet_to_arrow_field_levels`]: crate::arrow::schema::parquet_to_arrow_field_levels + pub fn from_field_levels( + metadata: Arc, + arrow_schema: SchemaRef, + field_levels: FieldLevels, + ) -> Self { + let FieldLevels { fields: _, levels } = field_levels; + Self { + metadata, + schema: arrow_schema, + fields: levels.map(Arc::new), + } + } + /// Create a new [`ArrowReaderMetadata`] from a pre-existing /// [`ParquetMetaData`] and [`ArrowReaderOptions`]. /// diff --git a/parquet/src/arrow/arrow_reader/statistics.rs b/parquet/src/arrow/arrow_reader/statistics.rs index 19d3e34f5243..98180ff3faa1 100644 --- a/parquet/src/arrow/arrow_reader/statistics.rs +++ b/parquet/src/arrow/arrow_reader/statistics.rs @@ -1495,6 +1495,32 @@ impl<'a> StatisticsConverter<'a> { }) } + /// Construct a [`StatisticsConverter`] from a precomputed + /// `(arrow_field, parquet_leaf_index)` pair. + /// + /// This is a low-overhead alternative to [`Self::try_new`] for callers + /// that need to build many converters against the same schemas — for + /// example when extracting per-column statistics for a wide schema. + /// [`Self::try_new`] performs O(N) name lookups against both the arrow + /// and parquet schemas; this constructor avoids those lookups by taking + /// the resolved field and column index directly. + /// + /// `parquet_leaf_index` should be `None` if the column does not exist in + /// the parquet file (schema evolution case). + pub fn from_arrow_field( + arrow_field: &'a Field, + parquet_schema: &'a SchemaDescriptor, + parquet_leaf_index: Option, + ) -> Self { + Self { + parquet_column_index: parquet_leaf_index, + arrow_field, + missing_null_counts_as_zero: true, + physical_type: parquet_leaf_index + .map(|idx| parquet_schema.column(idx).physical_type()), + } + } + /// Extract the minimum values from row group statistics in [`RowGroupMetaData`] /// /// # Return Value diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 6c7890b75c45..b64fb807a6e8 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -110,6 +110,31 @@ pub trait AsyncFileReader: Send { &'a mut self, options: Option<&'a ArrowReaderOptions>, ) -> BoxFuture<'a, Result>>; + + /// Return a future resolving to a fully-built [`ArrowReaderMetadata`]. + /// + /// The default implementation calls [`Self::get_metadata`] and then + /// constructs the [`ArrowReaderMetadata`] via + /// [`ArrowReaderMetadata::try_new`], which walks every leaf in the + /// parquet schema to derive the matching arrow [`Schema`] and field + /// levels. + /// + /// Implementations that already cache the per-file metadata (e.g. + /// because the same file is opened many times across queries) can + /// override this method to also cache the derived arrow schema view. + /// For wide schemas this can dominate per-file CPU overhead, since the + /// per-leaf walk is `O(N_columns)` and is otherwise repeated on every + /// reader build for the same file. + fn get_arrow_reader_metadata<'a>( + &'a mut self, + options: ArrowReaderOptions, + ) -> BoxFuture<'a, Result> { + async move { + let metadata = self.get_metadata(Some(&options)).await?; + ArrowReaderMetadata::try_new(metadata, options) + } + .boxed() + } } /// This allows Box to be used as an AsyncFileReader, @@ -128,6 +153,13 @@ impl AsyncFileReader for Box { ) -> BoxFuture<'a, Result>> { self.as_mut().get_metadata(options) } + + fn get_arrow_reader_metadata<'a>( + &'a mut self, + options: ArrowReaderOptions, + ) -> BoxFuture<'a, Result> { + self.as_mut().get_arrow_reader_metadata(options) + } } impl MetadataSuffixFetch for T { @@ -194,8 +226,11 @@ impl ArrowReaderMetadata { input: &mut T, options: ArrowReaderOptions, ) -> Result { - let metadata = input.get_metadata(Some(&options)).await?; - Self::try_new(metadata, options) + // Delegate to AsyncFileReader::get_arrow_reader_metadata so + // implementations that cache derived state (e.g. an arrow Schema + // and field-levels view) can short-circuit the per-leaf walk + // performed by Self::try_new. + input.get_arrow_reader_metadata(options).await } } diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index 52152988166f..728362ab600e 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -210,7 +210,8 @@ use arrow_schema::{FieldRef, Schema}; pub use self::schema::{ ArrowSchemaConverter, FieldLevels, add_encoded_arrow_schema_to_metadata, encode_arrow_schema, parquet_to_arrow_field_levels, parquet_to_arrow_field_levels_with_virtual, - parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns, virtual_type::*, + parquet_to_arrow_schema, parquet_to_arrow_schema_and_field_levels, + parquet_to_arrow_schema_by_columns, virtual_type::*, }; /// Schema metadata key used to store serialized Arrow schema @@ -485,9 +486,11 @@ pub fn parquet_column<'a>( return None; } - // This could be made more efficient (#TBD) - let parquet_idx = (0..parquet_schema.columns().len()) - .find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?; + // For non-nested fields the parquet column ordering matches the arrow + // root field ordering, so the first leaf descended from `root_idx` is + // exactly the parquet column we want. SchemaDescriptor caches this + // mapping for O(1) lookup. + let parquet_idx = parquet_schema.root_first_leaf_index(root_idx)?; Some((parquet_idx, field)) } diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index b2b93687ba89..e9224c6092ad 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -67,6 +67,39 @@ pub fn parquet_to_arrow_schema_by_columns( Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, key_value_metadata, &[])?.0) } +/// Convert a parquet [`SchemaDescriptor`] into both the arrow [`Schema`] +/// and matching [`FieldLevels`] in a single walk. +/// +/// This is a convenience around [`parquet_to_arrow_field_levels`] that +/// also produces a [`Schema`] (with the parquet file's key/value metadata +/// attached). Useful for callers that want to pre-compute and reuse both +/// for [`ArrowReaderMetadata::from_field_levels`]. +/// +/// [`ArrowReaderMetadata::from_field_levels`]: crate::arrow::arrow_reader::ArrowReaderMetadata::from_field_levels +pub fn parquet_to_arrow_schema_and_field_levels( + parquet_schema: &SchemaDescriptor, + mask: ProjectionMask, + key_value_metadata: Option<&Vec>, +) -> Result<(Schema, FieldLevels)> { + let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default(); + let maybe_schema = metadata + .remove(super::ARROW_SCHEMA_META_KEY) + .map(|value| get_arrow_schema_from_metadata(&value)) + .transpose()?; + + if let Some(arrow_schema) = &maybe_schema { + arrow_schema.metadata().iter().for_each(|(k, v)| { + metadata.entry(k.clone()).or_insert_with(|| v.clone()); + }); + } + + let hint = maybe_schema.as_ref().map(|s| s.fields()); + let field_levels = + parquet_to_arrow_field_levels_with_virtual(parquet_schema, mask, hint, &[])?; + let schema = Schema::new_with_metadata(field_levels.fields.clone(), metadata); + Ok((schema, field_levels)) +} + /// Determines the Arrow Schema from a Parquet schema /// /// Looks for an Arrow schema metadata "hint" (see diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index 156385ddaa70..ffa6da7f2ee1 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -2033,10 +2033,12 @@ mod tests { .set_row_groups(row_group_meta_with_stats) .build(); + // Sizes include the `root_to_first_leaf: Vec` cache that + // SchemaDescriptor adds to make `parquet_column` lookups O(1). #[cfg(not(feature = "encryption"))] - let base_expected_size = 2766; + let base_expected_size = 2806; #[cfg(feature = "encryption")] - let base_expected_size = 2934; + let base_expected_size = 2974; assert_eq!(parquet_meta.memory_size(), base_expected_size); @@ -2065,9 +2067,9 @@ mod tests { .build(); #[cfg(not(feature = "encryption"))] - let bigger_expected_size = 3192; + let bigger_expected_size = 3232; #[cfg(feature = "encryption")] - let bigger_expected_size = 3360; + let bigger_expected_size = 3400; // more set fields means more memory usage assert!(bigger_expected_size > base_expected_size); @@ -2114,7 +2116,7 @@ mod tests { .set_row_groups(row_group_meta.clone()) .build(); - let base_expected_size = 2058; + let base_expected_size = 2098; assert_eq!(parquet_meta_data.memory_size(), base_expected_size); let footer_key = "0123456789012345".as_bytes(); @@ -2140,7 +2142,7 @@ mod tests { .set_file_decryptor(Some(decryptor)) .build(); - let expected_size_with_decryptor = 3072; + let expected_size_with_decryptor = 3112; assert!(expected_size_with_decryptor > base_expected_size); assert_eq!( diff --git a/parquet/src/schema/types.rs b/parquet/src/schema/types.rs index 2c63da74df47..e5b6abd5cb33 100644 --- a/parquet/src/schema/types.rs +++ b/parquet/src/schema/types.rs @@ -1059,6 +1059,12 @@ pub struct SchemaDescriptor { /// -- -- -- -- d /// ``` leaf_to_base: Vec, + + /// For each root field index, the index of its first leaf column in + /// [`Self::leaves`]. This is the inverse of (the first occurrence in) + /// [`Self::leaf_to_base`] and is used to look up a leaf column from a + /// root field index in O(1) time. + root_to_first_leaf: Vec, } impl fmt::Debug for SchemaDescriptor { @@ -1073,7 +1079,10 @@ impl fmt::Debug for SchemaDescriptor { // Need to implement HeapSize in this module as the fields are private impl HeapSize for SchemaDescriptor { fn heap_size(&self) -> usize { - self.schema.heap_size() + self.leaves.heap_size() + self.leaf_to_base.heap_size() + self.schema.heap_size() + + self.leaves.heap_size() + + self.leaf_to_base.heap_size() + + self.root_to_first_leaf.heap_size() } } @@ -1101,13 +1110,39 @@ impl SchemaDescriptor { ); } + // Build root_to_first_leaf inverse map: for each root field index, + // record the index of its first leaf column. Leaves are emitted in + // root order by build_tree, so the first occurrence of each root_idx + // in leaf_to_base is the first leaf for that root. + let n_roots = tp.get_fields().len(); + let mut root_to_first_leaf = vec![usize::MAX; n_roots]; + for (leaf_idx, &root_idx) in leaf_to_base.iter().enumerate() { + if root_to_first_leaf[root_idx] == usize::MAX { + root_to_first_leaf[root_idx] = leaf_idx; + } + } + Self { schema: tp, leaves, leaf_to_base, + root_to_first_leaf, } } + /// Returns the leaf column index of the first leaf descended from the + /// given root field. Returns `None` if `root_idx` is out of range or the + /// root has no leaves. + pub fn root_first_leaf_index(&self, root_idx: usize) -> Option { + self.root_to_first_leaf.get(root_idx).copied().and_then(|i| { + if i == usize::MAX { + None + } else { + Some(i) + } + }) + } + /// Returns [`ColumnDescriptor`] for a field position. pub fn column(&self, i: usize) -> ColumnDescPtr { assert!(