Skip to content

WIP: example lexical range handling #63

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft

Conversation

wiedld
Copy link
Collaborator

@wiedld wiedld commented Apr 1, 2025

Sharing some code of how we are extracting the lexical ranges, and using it to reorder (ungrouped) partitions.

It's likely more complex than needed, since we're handling legacy (in-house) bits and we've been constraining ourselves to currently available (as of ver 44) APIs. I did some minor tweaks updates to the code to get it compiling and tests passing.

This is not intended as a proposed solution. Rather, a description of the problem space and how we've been solving it (within our constraints). I think we all want a better upstream solution 🙏🏼 .

plan: Arc<dyn ExecutionPlan>,
_config: &datafusion_common::config::ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_up(|plan| {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the start point of the code, and providing an overview.

Comment on lines +42 to +45
pub fn split_parquet_files(
plan: Arc<dyn ExecutionPlan>,
ordering_req: &LexOrdering,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
Copy link
Collaborator Author

@wiedld wiedld Apr 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is where we ungroup the file source, prior to any calculation of lexical ranges.

We do look at the lexical range, at the file src, to decide how to ungroup. But then it gets transformed in the DAG before it hits the SPM. Therefore we re-calculate the lexical ranges seen at the SPM (which may get replaced with the ProgressiveEvalExec).

Comment on lines +38 to +41
pub fn extract_disjoint_ranges_from_plan(
exprs: &LexOrdering,
input_plan: &Arc<dyn ExecutionPlan>,
) -> Result<Option<NonOverlappingOrderedLexicalRanges>> {
Copy link
Collaborator Author

@wiedld wiedld Apr 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this function, the lexical ranges (per sort key) are extracted as min/maxes without any other sense of how we will order them. (Just that we need the min/max value per partition).

It's then the NonOverlappingOrderedLexicalRanges which has a sense of proper ordering.

Comment on lines 186 to 203
fn f_down(&mut self, node: &'n Self::Node) -> DatafusionResult<TreeNodeRecursion> {
if !is_supported(node) {
self.statistics = None;
Ok(TreeNodeRecursion::Stop)
} else if is_leaf(node) {
self.statistics = self.extract_from_data_source(node)?;
Ok(TreeNodeRecursion::Stop)
} else if should_merge_partition_statistics(node) {
self.statistics = self.merge_column_statistics_across_partitions(node)?;
Ok(TreeNodeRecursion::Jump)
} else if should_pass_thru_partition_statistics(node) {
self.statistics =
self.find_stats_per_partition_within_multiple_children(node)?;
Ok(TreeNodeRecursion::Stop)
} else {
self.statistics = None;
Ok(TreeNodeRecursion::Stop)
}
Copy link
Collaborator Author

@wiedld wiedld Apr 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where I tried to generalize the rules of how to extract partition min/max, without special casing each node type.

I prefer the approach suggested by xudong to have a caller on the ExecutionPlan::statistics_per_partition.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm gonna merge the PR: apache#15852, then you can update the PR to use it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @xudong963 .

I ended up quickly porting over our own statistics_per_partition code, since it handles a few use cases we need. (Note: it doesn't handle joins and a few other use cases, which you have covered.)

I pushed it all up anyways, for the purposes of sharing code.

@wiedld wiedld force-pushed the wiedld/lexical-range-code branch 3 times, most recently from 5e05a0f to 4871fdd Compare May 8, 2025 19:09
@wiedld
Copy link
Collaborator Author

wiedld commented May 8, 2025

The dependency pathing is a bit weird (and wrong), since I'm just quickly porting over code from our internal codebase. Actual implementation should not have this issue.

Test cases are all passing locally. Not bothering to debugging the 1 test case which fails only in remote CI; hopefully we can fix that as we make it into anything real.

/// Merge a collection of [`Statistics`] into a single stat.
///
/// This takes statistics references, which may or may not be arc'ed.
pub(super) fn merge_stats_collection<

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DF has the simillar logic now

Copy link

@xudong963 xudong963 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the excellent work!

Maybe we can split the PR to some parts and port them to upstream separately, such as the ProgressiveEvalExec node, the InsertProgressiveEval rule and the statistics features that are needed. (I can help add features about statistics).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants