Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,24 @@ impl ArrowReaderOptions {
})
}

/// Returns the configured virtual columns, if any.
pub fn virtual_columns(&self) -> &[FieldRef] {
&self.virtual_columns
}

/// Returns the supplied arrow [`Schema`], if one was provided via
/// [`Self::with_schema`].
pub fn supplied_schema(&self) -> Option<&SchemaRef> {
self.supplied_schema.as_ref()
}

/// Returns whether the reader will skip the embedded arrow schema
/// metadata stored in the parquet file (set via
/// [`Self::with_skip_arrow_metadata`]).
pub fn skip_arrow_metadata(&self) -> bool {
self.skip_arrow_metadata
}

#[deprecated(
since = "57.2.0",
note = "Use `column_index_policy` or `offset_index_policy` instead"
Expand Down Expand Up @@ -885,6 +903,35 @@ impl ArrowReaderMetadata {
Self::try_new(Arc::new(metadata), options)
}

/// Construct an [`ArrowReaderMetadata`] from precomputed
/// [`FieldLevels`].
///
/// Use this when the caller has already computed the arrow [`Schema`]
/// and [`FieldLevels`] for `metadata` (e.g. by calling
/// [`parquet_to_arrow_field_levels`]) and wants to avoid recomputing
/// them. For wide schemas the field-levels conversion walks every leaf
/// in the parquet schema, so reusing it across reader builds for the
/// same metadata is significantly cheaper than calling [`Self::try_new`]
/// repeatedly.
///
/// `arrow_schema_metadata` is used for the [`Schema`]'s key/value
/// metadata and is otherwise opaque; pass an empty map when you do not
/// have one.
///
/// [`parquet_to_arrow_field_levels`]: crate::arrow::schema::parquet_to_arrow_field_levels
pub fn from_field_levels(
metadata: Arc<ParquetMetaData>,
arrow_schema: SchemaRef,
field_levels: FieldLevels,
) -> Self {
let FieldLevels { fields: _, levels } = field_levels;
Self {
metadata,
schema: arrow_schema,
fields: levels.map(Arc::new),
}
}

/// Create a new [`ArrowReaderMetadata`] from a pre-existing
/// [`ParquetMetaData`] and [`ArrowReaderOptions`].
///
Expand Down
26 changes: 26 additions & 0 deletions parquet/src/arrow/arrow_reader/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1495,6 +1495,32 @@ impl<'a> StatisticsConverter<'a> {
})
}

/// Construct a [`StatisticsConverter`] from a precomputed
/// `(arrow_field, parquet_leaf_index)` pair.
///
/// This is a low-overhead alternative to [`Self::try_new`] for callers
/// that need to build many converters against the same schemas — for
/// example when extracting per-column statistics for a wide schema.
/// [`Self::try_new`] performs O(N) name lookups against both the arrow
/// and parquet schemas; this constructor avoids those lookups by taking
/// the resolved field and column index directly.
///
/// `parquet_leaf_index` should be `None` if the column does not exist in
/// the parquet file (schema evolution case).
pub fn from_arrow_field(
arrow_field: &'a Field,
parquet_schema: &'a SchemaDescriptor,
parquet_leaf_index: Option<usize>,
) -> Self {
Self {
parquet_column_index: parquet_leaf_index,
arrow_field,
missing_null_counts_as_zero: true,
physical_type: parquet_leaf_index
.map(|idx| parquet_schema.column(idx).physical_type()),
}
}

/// Extract the minimum values from row group statistics in [`RowGroupMetaData`]
///
/// # Return Value
Expand Down
39 changes: 37 additions & 2 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,31 @@ pub trait AsyncFileReader: Send {
&'a mut self,
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;

/// Return a future resolving to a fully-built [`ArrowReaderMetadata`].
///
/// The default implementation calls [`Self::get_metadata`] and then
/// constructs the [`ArrowReaderMetadata`] via
/// [`ArrowReaderMetadata::try_new`], which walks every leaf in the
/// parquet schema to derive the matching arrow [`Schema`] and field
/// levels.
///
/// Implementations that already cache the per-file metadata (e.g.
/// because the same file is opened many times across queries) can
/// override this method to also cache the derived arrow schema view.
/// For wide schemas this can dominate per-file CPU overhead, since the
/// per-leaf walk is `O(N_columns)` and is otherwise repeated on every
/// reader build for the same file.
fn get_arrow_reader_metadata<'a>(
&'a mut self,
options: ArrowReaderOptions,
) -> BoxFuture<'a, Result<ArrowReaderMetadata>> {
async move {
let metadata = self.get_metadata(Some(&options)).await?;
ArrowReaderMetadata::try_new(metadata, options)
}
.boxed()
}
}

/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
Expand All @@ -128,6 +153,13 @@ impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
self.as_mut().get_metadata(options)
}

fn get_arrow_reader_metadata<'a>(
&'a mut self,
options: ArrowReaderOptions,
) -> BoxFuture<'a, Result<ArrowReaderMetadata>> {
self.as_mut().get_arrow_reader_metadata(options)
}
}

impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> MetadataSuffixFetch for T {
Expand Down Expand Up @@ -194,8 +226,11 @@ impl ArrowReaderMetadata {
input: &mut T,
options: ArrowReaderOptions,
) -> Result<Self> {
let metadata = input.get_metadata(Some(&options)).await?;
Self::try_new(metadata, options)
// Delegate to AsyncFileReader::get_arrow_reader_metadata so
// implementations that cache derived state (e.g. an arrow Schema
// and field-levels view) can short-circuit the per-leaf walk
// performed by Self::try_new.
input.get_arrow_reader_metadata(options).await
}
}

Expand Down
11 changes: 7 additions & 4 deletions parquet/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ use arrow_schema::{FieldRef, Schema};
pub use self::schema::{
ArrowSchemaConverter, FieldLevels, add_encoded_arrow_schema_to_metadata, encode_arrow_schema,
parquet_to_arrow_field_levels, parquet_to_arrow_field_levels_with_virtual,
parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns, virtual_type::*,
parquet_to_arrow_schema, parquet_to_arrow_schema_and_field_levels,
parquet_to_arrow_schema_by_columns, virtual_type::*,
};

/// Schema metadata key used to store serialized Arrow schema
Expand Down Expand Up @@ -485,9 +486,11 @@ pub fn parquet_column<'a>(
return None;
}

// This could be made more efficient (#TBD)
let parquet_idx = (0..parquet_schema.columns().len())
.find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?;
// For non-nested fields the parquet column ordering matches the arrow
// root field ordering, so the first leaf descended from `root_idx` is
// exactly the parquet column we want. SchemaDescriptor caches this
// mapping for O(1) lookup.
let parquet_idx = parquet_schema.root_first_leaf_index(root_idx)?;
Some((parquet_idx, field))
}

Expand Down
33 changes: 33 additions & 0 deletions parquet/src/arrow/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,39 @@ pub fn parquet_to_arrow_schema_by_columns(
Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, key_value_metadata, &[])?.0)
}

/// Convert a parquet [`SchemaDescriptor`] into both the arrow [`Schema`]
/// and matching [`FieldLevels`] in a single walk.
///
/// This is a convenience around [`parquet_to_arrow_field_levels`] that
/// also produces a [`Schema`] (with the parquet file's key/value metadata
/// attached). Useful for callers that want to pre-compute and reuse both
/// for [`ArrowReaderMetadata::from_field_levels`].
///
/// [`ArrowReaderMetadata::from_field_levels`]: crate::arrow::arrow_reader::ArrowReaderMetadata::from_field_levels
pub fn parquet_to_arrow_schema_and_field_levels(
parquet_schema: &SchemaDescriptor,
mask: ProjectionMask,
key_value_metadata: Option<&Vec<KeyValue>>,
) -> Result<(Schema, FieldLevels)> {
let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
let maybe_schema = metadata
.remove(super::ARROW_SCHEMA_META_KEY)
.map(|value| get_arrow_schema_from_metadata(&value))
.transpose()?;

if let Some(arrow_schema) = &maybe_schema {
arrow_schema.metadata().iter().for_each(|(k, v)| {
metadata.entry(k.clone()).or_insert_with(|| v.clone());
});
}

let hint = maybe_schema.as_ref().map(|s| s.fields());
let field_levels =
parquet_to_arrow_field_levels_with_virtual(parquet_schema, mask, hint, &[])?;
let schema = Schema::new_with_metadata(field_levels.fields.clone(), metadata);
Ok((schema, field_levels))
}

/// Determines the Arrow Schema from a Parquet schema
///
/// Looks for an Arrow schema metadata "hint" (see
Expand Down
14 changes: 8 additions & 6 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2033,10 +2033,12 @@ mod tests {
.set_row_groups(row_group_meta_with_stats)
.build();

// Sizes include the `root_to_first_leaf: Vec<usize>` cache that
// SchemaDescriptor adds to make `parquet_column` lookups O(1).
#[cfg(not(feature = "encryption"))]
let base_expected_size = 2766;
let base_expected_size = 2806;
#[cfg(feature = "encryption")]
let base_expected_size = 2934;
let base_expected_size = 2974;

assert_eq!(parquet_meta.memory_size(), base_expected_size);

Expand Down Expand Up @@ -2065,9 +2067,9 @@ mod tests {
.build();

#[cfg(not(feature = "encryption"))]
let bigger_expected_size = 3192;
let bigger_expected_size = 3232;
#[cfg(feature = "encryption")]
let bigger_expected_size = 3360;
let bigger_expected_size = 3400;

// more set fields means more memory usage
assert!(bigger_expected_size > base_expected_size);
Expand Down Expand Up @@ -2114,7 +2116,7 @@ mod tests {
.set_row_groups(row_group_meta.clone())
.build();

let base_expected_size = 2058;
let base_expected_size = 2098;
assert_eq!(parquet_meta_data.memory_size(), base_expected_size);

let footer_key = "0123456789012345".as_bytes();
Expand All @@ -2140,7 +2142,7 @@ mod tests {
.set_file_decryptor(Some(decryptor))
.build();

let expected_size_with_decryptor = 3072;
let expected_size_with_decryptor = 3112;
assert!(expected_size_with_decryptor > base_expected_size);

assert_eq!(
Expand Down
37 changes: 36 additions & 1 deletion parquet/src/schema/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,12 @@ pub struct SchemaDescriptor {
/// -- -- -- -- d
/// ```
leaf_to_base: Vec<usize>,

/// For each root field index, the index of its first leaf column in
/// [`Self::leaves`]. This is the inverse of (the first occurrence in)
/// [`Self::leaf_to_base`] and is used to look up a leaf column from a
/// root field index in O(1) time.
root_to_first_leaf: Vec<usize>,
}

impl fmt::Debug for SchemaDescriptor {
Expand All @@ -1073,7 +1079,10 @@ impl fmt::Debug for SchemaDescriptor {
// Need to implement HeapSize in this module as the fields are private
impl HeapSize for SchemaDescriptor {
fn heap_size(&self) -> usize {
self.schema.heap_size() + self.leaves.heap_size() + self.leaf_to_base.heap_size()
self.schema.heap_size()
+ self.leaves.heap_size()
+ self.leaf_to_base.heap_size()
+ self.root_to_first_leaf.heap_size()
}
}

Expand Down Expand Up @@ -1101,13 +1110,39 @@ impl SchemaDescriptor {
);
}

// Build root_to_first_leaf inverse map: for each root field index,
// record the index of its first leaf column. Leaves are emitted in
// root order by build_tree, so the first occurrence of each root_idx
// in leaf_to_base is the first leaf for that root.
let n_roots = tp.get_fields().len();
let mut root_to_first_leaf = vec![usize::MAX; n_roots];
for (leaf_idx, &root_idx) in leaf_to_base.iter().enumerate() {
if root_to_first_leaf[root_idx] == usize::MAX {
root_to_first_leaf[root_idx] = leaf_idx;
}
}

Self {
schema: tp,
leaves,
leaf_to_base,
root_to_first_leaf,
}
}

/// Returns the leaf column index of the first leaf descended from the
/// given root field. Returns `None` if `root_idx` is out of range or the
/// root has no leaves.
pub fn root_first_leaf_index(&self, root_idx: usize) -> Option<usize> {
self.root_to_first_leaf.get(root_idx).copied().and_then(|i| {
if i == usize::MAX {
None
} else {
Some(i)
}
})
}

/// Returns [`ColumnDescriptor`] for a field position.
pub fn column(&self, i: usize) -> ColumnDescPtr {
assert!(
Expand Down
Loading