Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce load-balanced split_groups_by_statistics method #15473

Merged
merged 10 commits into from
Apr 4, 2025

Conversation

xudong963
Copy link
Member

@xudong963 xudong963 commented Mar 28, 2025

Which issue does this PR close?

Rationale for this change

As @suremarc and @leoyvens said, we should make the method load balanced to keep the parallel.

What changes are included in this PR?

Introduced a new method to split file groups into new groups based on statistics to enable efficient parallel processing

Are these changes tested?

UT

Are there any user-facing changes?

Users need to choose the proper split_groups_by_statistics method by their cases.

@github-actions github-actions bot added the datasource Changes to the datasource crate label Mar 28, 2025
/// # Returns
/// A new set of file groups, where files within each group are non-overlapping with respect to
/// their min/max statistics and maintain the specified sort order.
pub fn split_groups_by_statistics_v2(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name is TBD.

I'm wondering if we need to keep the old method because I think both of them have applicable scenarios

Copy link
Contributor

@suremarc suremarc Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could call it split_groups_by_statistics_with_target_partitions?

TBH I am not sure if anyone is using the old code, so I would wager it is safe to replace with the new implementation. But I agree the old one is probably more useful in certain scenarios, e.g. if you are doing a sort merge above it.

If we were to keep it, I would rather unify the implementations, the only thing that differs is the policy for selecting the group to insert. I think we could probably abstract that out into an enum or generic parameter. (Not really sure how common generics are in datafusion though)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep the old and rename the new to split_groups_by_statistics_with_target_partitions or unify the two methods, both of them are okay for me. cc @alamb

@xudong963 xudong963 force-pushed the improve_split_groups_by_statistics branch from 71a3373 to 75df701 Compare March 28, 2025 08:39
@xudong963 xudong963 changed the title Improve split_groups_by_statistics method Introduce load-balanced split_groups_by_statistics method Mar 28, 2025
@xudong963 xudong963 force-pushed the improve_split_groups_by_statistics branch from 75df701 to 36991d7 Compare March 28, 2025 08:47
@xudong963 xudong963 requested a review from Copilot March 28, 2025 08:47
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces a new load-balanced method for splitting file groups based on statistical information to enable efficient parallel processing. Key changes include:

  • Addition of the split_groups_by_statistics_v2 method with load balancing logic.
  • Benchmark updates to compare the performance of the original and new algorithms.
  • Adjustments to Cargo.toml to include the necessary criterion benchmarks.

Reviewed Changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.

File Description
datafusion/datasource/src/file_scan_config.rs Added a new method (split_groups_by_statistics_v2) to perform load-balanced file grouping
datafusion/datasource/benches/split_groups_by_statistics.rs Introduced benchmarks for both original and new splitting algorithms
datafusion/datasource/Cargo.toml Updated dev-dependencies and benchmark configuration for criterion
Comments suppressed due to low confidence (1)

datafusion/datasource/src/file_scan_config.rs:598

  • [nitpick] The method name 'split_groups_by_statistics_v2' might be made more descriptive (e.g., 'split_groups_by_statistics_load_balanced') to clearly indicate the load balancing behavior.
pub fn split_groups_by_statistics_v2(

vec![FileGroup::new(files)]
}

pub fn compare_split_groups_by_statistics_algorithms(c: &mut Criterion) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spilt_groups/original/files=10,overlap=0.0
                        time:   [3.0516 µs 3.0722 µs 3.0961 µs]
Found 5 outliers among 100 measurements (5.00%)
  2 (2.00%) high mild
  3 (3.00%) high severe
spilt_groups/v2_partitions=4/files=10,overlap=0.0
                        time:   [3.2514 µs 3.2801 µs 3.3107 µs]
Found 5 outliers among 100 measurements (5.00%)
  3 (3.00%) high mild
  2 (2.00%) high severe
spilt_groups/v2_partitions=8/files=10,overlap=0.0
                        time:   [3.5700 µs 3.6058 µs 3.6545 µs]
Found 5 outliers among 100 measurements (5.00%)
  2 (2.00%) high mild
  3 (3.00%) high severe
spilt_groups/v2_partitions=16/files=10,overlap=0.0
                        time:   [4.0533 µs 4.0819 µs 4.1139 µs]
Found 4 outliers among 100 measurements (4.00%)
  4 (4.00%) high severe
spilt_groups/v2_partitions=32/files=10,overlap=0.0
                        time:   [5.3031 µs 5.3521 µs 5.4152 µs]
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high severe
spilt_groups/original/files=10,overlap=0.2
                        time:   [2.9694 µs 2.9908 µs 3.0118 µs]
Found 11 outliers among 100 measurements (11.00%)
  8 (8.00%) low mild
  1 (1.00%) high mild
  2 (2.00%) high severe
spilt_groups/v2_partitions=4/files=10,overlap=0.2
                        time:   [3.2044 µs 3.2299 µs 3.2578 µs]
Found 6 outliers among 100 measurements (6.00%)
  1 (1.00%) low mild
  3 (3.00%) high mild
  2 (2.00%) high severe
spilt_groups/v2_partitions=8/files=10,overlap=0.2
                        time:   [3.5383 µs 3.5644 µs 3.5964 µs]
Found 4 outliers among 100 measurements (4.00%)
  4 (4.00%) high severe
spilt_groups/v2_partitions=16/files=10,overlap=0.2
                        time:   [4.0280 µs 4.0516 µs 4.0780 µs]
Found 7 outliers among 100 measurements (7.00%)
  3 (3.00%) low mild
  2 (2.00%) high mild
  2 (2.00%) high severe
spilt_groups/v2_partitions=32/files=10,overlap=0.2
                        time:   [5.3014 µs 5.3424 µs 5.3850 µs]
Found 2 outliers among 100 measurements (2.00%)
  1 (1.00%) high mild
  1 (1.00%) high severe
spilt_groups/original/files=10,overlap=0.5
                        time:   [2.9601 µs 2.9801 µs 3.0023 µs]
Found 5 outliers among 100 measurements (5.00%)
  1 (1.00%) low mild
  2 (2.00%) high mild
  2 (2.00%) high severe
spilt_groups/v2_partitions=4/files=10,overlap=0.5
                        time:   [3.1908 µs 3.2223 µs 3.2589 µs]
Found 4 outliers among 100 measurements (4.00%)
  3 (3.00%) high mild
  1 (1.00%) high severe
spilt_groups/v2_partitions=8/files=10,overlap=0.5
                        time:   [3.5209 µs 3.5408 µs 3.5632 µs]
Found 9 outliers among 100 measurements (9.00%)
  6 (6.00%) low mild
  1 (1.00%) high mild
  2 (2.00%) high severe
spilt_groups/v2_partitions=16/files=10,overlap=0.5
                        time:   [3.9969 µs 4.0221 µs 4.0511 µs]
Found 9 outliers among 100 measurements (9.00%)
  3 (3.00%) low mild
  3 (3.00%) high mild
  3 (3.00%) high severe
spilt_groups/v2_partitions=32/files=10,overlap=0.5
                        time:   [5.1889 µs 5.2249 µs 5.2662 µs]
Found 2 outliers among 100 measurements (2.00%)
  1 (1.00%) high mild
  1 (1.00%) high severe
spilt_groups/original/files=10,overlap=0.8
                        time:   [3.1625 µs 3.1933 µs 3.2307 µs]
Found 14 outliers among 100 measurements (14.00%)
  7 (7.00%) high mild
  7 (7.00%) high severe
spilt_groups/v2_partitions=4/files=10,overlap=0.8
                        time:   [3.2975 µs 3.3533 µs 3.4137 µs]
Found 14 outliers among 100 measurements (14.00%)
  6 (6.00%) high mild
  8 (8.00%) high severe
spilt_groups/v2_partitions=8/files=10,overlap=0.8
                        time:   [3.3652 µs 3.3873 µs 3.4134 µs]
Found 12 outliers among 100 measurements (12.00%)
  4 (4.00%) low mild
  4 (4.00%) high mild
  4 (4.00%) high severe
spilt_groups/v2_partitions=16/files=10,overlap=0.8
                        time:   [3.9061 µs 3.9289 µs 3.9574 µs]
Found 9 outliers among 100 measurements (9.00%)
  2 (2.00%) low mild
  3 (3.00%) high mild
  4 (4.00%) high severe
spilt_groups/v2_partitions=32/files=10,overlap=0.8
                        time:   [4.9453 µs 4.9941 µs 5.0547 µs]
Found 8 outliers among 100 measurements (8.00%)
  2 (2.00%) low mild
  2 (2.00%) high mild
  4 (4.00%) high severe
spilt_groups/original/files=100,overlap=0.0
                        time:   [14.800 µs 15.014 µs 15.261 µs]
Found 14 outliers among 100 measurements (14.00%)
  6 (6.00%) high mild
  8 (8.00%) high severe
Benchmarking spilt_groups/v2_partitions=4/files=100,overlap=0.0: Collecting 100 samples in estimated 10.081 s (571k iterations)
spilt_groups/v2_partitions=4/files=100,overlap=0.0
                        time:   [16.761 µs 16.913 µs 17.124 µs]
Found 6 outliers among 100 measurements (6.00%)
  1 (1.00%) high mild
  5 (5.00%) high severe
spilt_groups/v2_partitions=8/files=100,overlap=0.0
                        time:   [20.542 µs 20.629 µs 20.747 µs]
Found 8 outliers among 100 measurements (8.00%)
  3 (3.00%) high mild
  5 (5.00%) high severe
spilt_groups/v2_partitions=16/files=100,overlap=0.0
                        time:   [26.596 µs 26.717 µs 26.884 µs]
Found 11 outliers among 100 measurements (11.00%)
  3 (3.00%) low mild
  5 (5.00%) high mild
  3 (3.00%) high severe
spilt_groups/v2_partitions=32/files=100,overlap=0.0
                        time:   [40.912 µs 41.096 µs 41.342 µs]
Found 16 outliers among 100 measurements (16.00%)
  5 (5.00%) low mild
  6 (6.00%) high mild
  5 (5.00%) high severe
spilt_groups/original/files=100,overlap=0.2
                        time:   [14.420 µs 14.486 µs 14.573 µs]
Found 5 outliers among 100 measurements (5.00%)
  2 (2.00%) high mild
  3 (3.00%) high severe
spilt_groups/v2_partitions=4/files=100,overlap=0.2
                        time:   [16.644 µs 16.707 µs 16.788 µs]
Found 8 outliers among 100 measurements (8.00%)
  5 (5.00%) high mild
  3 (3.00%) high severe
spilt_groups/v2_partitions=8/files=100,overlap=0.2
                        time:   [20.601 µs 20.702 µs 20.826 µs]
Found 9 outliers among 100 measurements (9.00%)
  6 (6.00%) high mild
  3 (3.00%) high severe
spilt_groups/v2_partitions=16/files=100,overlap=0.2
                        time:   [26.557 µs 26.738 µs 26.968 µs]
Found 9 outliers among 100 measurements (9.00%)
  2 (2.00%) high mild
  7 (7.00%) high severe
spilt_groups/v2_partitions=32/files=100,overlap=0.2
                        time:   [41.131 µs 41.329 µs 41.588 µs]
Found 19 outliers among 100 measurements (19.00%)
  3 (3.00%) low severe
  9 (9.00%) low mild
  3 (3.00%) high mild
  4 (4.00%) high severe
spilt_groups/original/files=100,overlap=0.5
                        time:   [14.826 µs 14.897 µs 14.987 µs]
Found 11 outliers among 100 measurements (11.00%)
  1 (1.00%) low mild
  7 (7.00%) high mild
  3 (3.00%) high severe
spilt_groups/v2_partitions=4/files=100,overlap=0.5
                        time:   [16.583 µs 16.648 µs 16.726 µs]
Found 4 outliers among 100 measurements (4.00%)
  4 (4.00%) high severe
spilt_groups/v2_partitions=8/files=100,overlap=0.5
                        time:   [20.292 µs 20.413 µs 20.559 µs]
Found 7 outliers among 100 measurements (7.00%)
  3 (3.00%) high mild
  4 (4.00%) high severe
spilt_groups/v2_partitions=16/files=100,overlap=0.5
                        time:   [26.199 µs 26.295 µs 26.419 µs]
Found 8 outliers among 100 measurements (8.00%)
  4 (4.00%) high mild
  4 (4.00%) high severe
spilt_groups/v2_partitions=32/files=100,overlap=0.5
                        time:   [40.980 µs 41.346 µs 41.841 µs]
Found 10 outliers among 100 measurements (10.00%)
  3 (3.00%) low mild
  3 (3.00%) high mild
  4 (4.00%) high severe
spilt_groups/original/files=100,overlap=0.8
                        time:   [16.473 µs 16.544 µs 16.640 µs]
Found 4 outliers among 100 measurements (4.00%)
  4 (4.00%) high severe
spilt_groups/v2_partitions=4/files=100,overlap=0.8
                        time:   [20.103 µs 20.190 µs 20.301 µs]
Found 7 outliers among 100 measurements (7.00%)
  1 (1.00%) low mild
  4 (4.00%) high mild
  2 (2.00%) high severe
spilt_groups/v2_partitions=8/files=100,overlap=0.8
                        time:   [20.099 µs 20.221 µs 20.404 µs]
Found 4 outliers among 100 measurements (4.00%)
  4 (4.00%) high severe
spilt_groups/v2_partitions=16/files=100,overlap=0.8
                        time:   [24.757 µs 24.870 µs 25.012 µs]
Found 7 outliers among 100 measurements (7.00%)
  4 (4.00%) high mild
  3 (3.00%) high severe
spilt_groups/v2_partitions=32/files=100,overlap=0.8
                        time:   [39.388 µs 39.585 µs 39.818 µs]
Found 15 outliers among 100 measurements (15.00%)
  10 (10.00%) low mild
  2 (2.00%) high mild
  3 (3.00%) high severe
spilt_groups/original/files=1000,overlap=0.0
                        time:   [1.2800 ms 1.2860 ms 1.2936 ms]
Found 17 outliers among 100 measurements (17.00%)
  6 (6.00%) high mild
  11 (11.00%) high severe
spilt_groups/v2_partitions=4/files=1000,overlap=0.0
                        time:   [1.1734 ms 1.1810 ms 1.1896 ms]
Found 9 outliers among 100 measurements (9.00%)
  3 (3.00%) high mild
  6 (6.00%) high severe
spilt_groups/v2_partitions=8/files=1000,overlap=0.0
                        time:   [1.1556 ms 1.1586 ms 1.1623 ms]
Found 8 outliers among 100 measurements (8.00%)
  1 (1.00%) low mild
  2 (2.00%) high mild
  5 (5.00%) high severe
spilt_groups/v2_partitions=16/files=1000,overlap=0.0
                        time:   [1.1644 ms 1.1768 ms 1.1913 ms]
Found 14 outliers among 100 measurements (14.00%)
  3 (3.00%) high mild
  11 (11.00%) high severe
spilt_groups/v2_partitions=32/files=1000,overlap=0.0
                        time:   [1.1643 ms 1.1751 ms 1.1883 ms]
Found 17 outliers among 100 measurements (17.00%)
  6 (6.00%) high mild
  11 (11.00%) high severe
spilt_groups/original/files=1000,overlap=0.2
                        time:   [126.31 µs 127.47 µs 128.74 µs]
Found 6 outliers among 100 measurements (6.00%)
  1 (1.00%) high mild
  5 (5.00%) high severe
spilt_groups/v2_partitions=4/files=1000,overlap=0.2
                        time:   [147.86 µs 148.99 µs 150.41 µs]
Found 9 outliers among 100 measurements (9.00%)
  4 (4.00%) high mild
  5 (5.00%) high severe
spilt_groups/v2_partitions=8/files=1000,overlap=0.2
                        time:   [187.18 µs 188.91 µs 190.86 µs]
Found 9 outliers among 100 measurements (9.00%)
  5 (5.00%) high mild
  4 (4.00%) high severe
spilt_groups/v2_partitions=16/files=1000,overlap=0.2
                        time:   [246.99 µs 247.96 µs 249.06 µs]
Found 5 outliers among 100 measurements (5.00%)
  5 (5.00%) high severe
spilt_groups/v2_partitions=32/files=1000,overlap=0.2
                        time:   [392.19 µs 395.67 µs 399.84 µs]
Found 9 outliers among 100 measurements (9.00%)
  9 (9.00%) high severe
spilt_groups/original/files=1000,overlap=0.5
                        time:   [131.38 µs 132.60 µs 133.98 µs]
Found 15 outliers among 100 measurements (15.00%)
  6 (6.00%) low mild
  3 (3.00%) high mild
  6 (6.00%) high severe
spilt_groups/v2_partitions=4/files=1000,overlap=0.5
                        time:   [146.93 µs 148.18 µs 149.60 µs]
Found 9 outliers among 100 measurements (9.00%)
  3 (3.00%) low mild
  3 (3.00%) high mild
  3 (3.00%) high severe
spilt_groups/v2_partitions=8/files=1000,overlap=0.5
                        time:   [185.27 µs 187.78 µs 191.07 µs]
Found 12 outliers among 100 measurements (12.00%)
  1 (1.00%) low mild
  3 (3.00%) high mild
  8 (8.00%) high severe
spilt_groups/v2_partitions=16/files=1000,overlap=0.5
                        time:   [245.92 µs 247.51 µs 249.31 µs]
Found 7 outliers among 100 measurements (7.00%)
  1 (1.00%) low mild
  2 (2.00%) high mild
  4 (4.00%) high severe
spilt_groups/v2_partitions=32/files=1000,overlap=0.5
                        time:   [388.99 µs 392.62 µs 397.14 µs]
Found 9 outliers among 100 measurements (9.00%)
  4 (4.00%) high mild
  5 (5.00%) high severe
spilt_groups/original/files=1000,overlap=0.8
                        time:   [141.78 µs 143.24 µs 144.92 µs]
Found 11 outliers among 100 measurements (11.00%)
  1 (1.00%) low mild
  3 (3.00%) high mild
  7 (7.00%) high severe
spilt_groups/v2_partitions=4/files=1000,overlap=0.8
                        time:   [181.40 µs 182.83 µs 184.52 µs]
Found 18 outliers among 100 measurements (18.00%)
  11 (11.00%) low mild
  1 (1.00%) high mild
  6 (6.00%) high severe
spilt_groups/v2_partitions=8/files=1000,overlap=0.8
                        time:   [183.39 µs 187.03 µs 191.27 µs]
Found 16 outliers among 100 measurements (16.00%)
  4 (4.00%) high mild
  12 (12.00%) high severe
spilt_groups/v2_partitions=16/files=1000,overlap=0.8
                        time:   [226.57 µs 228.69 µs 231.41 µs]
Found 9 outliers among 100 measurements (9.00%)
  2 (2.00%) high mild
  7 (7.00%) high severe
spilt_groups/v2_partitions=32/files=1000,overlap=0.8
                        time:   [387.48 µs 395.02 µs 404.59 µs]
Found 12 outliers among 100 measurements (12.00%)
  7 (7.00%) high mild
  5 (5.00%) high severe

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Execution Time

  • The original algorithm is the fastest.
  • v2: higher partition targets increase overhead

Planning phase: ~2x slower for the v2 algorithm
Execution phase: Potentially much better parallelism that could improve overall query performance
(I think it's an expected trade-off, v2's plan cost should be acceptable, with thousands of files in time)

Scaling Behavior:

  • Both algorithms scale linearly with file count

Overlap Effects:

  • Higher overlap factors increase execution time slightly

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Execution phase
Would be great to have some example of this. The impact on planning time seems not super high.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be great to have some example of this. The impact on planning time seems not super high.

Yes, fyi: #10336 (comment)

And I think @leoyvens can ouput more

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm prepared to give this a test run on some real data, by installing datafusion-cli from this branch. But I need some way to enable it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @leoyvens , I enabled it in 3666ae8

@github-actions github-actions bot added the core Core DataFusion crate label Mar 31, 2025
@xudong963
Copy link
Member Author

Ci failure is expected because the file groups changed due to the new method.

I'll update the failed tests and add tests for the new method after we make a consistence, especially after receiving the positive feedback from @leoyvens

Copy link
Contributor

@suremarc suremarc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your hard work @xudong963. The planning impact seems less than I expected, so that's great news.

Edit: I left a comment on copilot's suggestion FYI, just make sure not to miss it since it's still marked as Resolved

/// # Returns
/// A new set of file groups, where files within each group are non-overlapping with respect to
/// their min/max statistics and maintain the specified sort order.
pub fn split_groups_by_statistics_v2(
Copy link
Contributor

@suremarc suremarc Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could call it split_groups_by_statistics_with_target_partitions?

TBH I am not sure if anyone is using the old code, so I would wager it is safe to replace with the new implementation. But I agree the old one is probably more useful in certain scenarios, e.g. if you are doing a sort merge above it.

If we were to keep it, I would rather unify the implementations, the only thing that differs is the policy for selecting the group to insert. I think we could probably abstract that out into an enum or generic parameter. (Not really sure how common generics are in datafusion though)

@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Mar 31, 2025
Copy link
Contributor

@2010YOUY01 2010YOUY01 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, this is awesome.

I suggest to reuse the benchmark utilities also for testing, random file group generation and the later sort order check is a great property test, but I think those micro-benches are not executed in the CI? Additionally, I think we should also test that each partition's file group size is not heavily skewed after splitting, under different overlap factors.

let mut file_groups_indices: Vec<Vec<usize>> =
vec![vec![]; target_partitions.max(1)];

for (idx, min) in indices_sorted_by_min {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this can be maintained in a heap datastructure (https://doc.rust-lang.org/std/collections/struct.BinaryHeap.html)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a BinaryHeap could optimize finding the smallest group, but we'd still need a linear scan to filter eligible groups first and the number of groups (target_partitions) is typically small.

I think the current code is more readable and clear about its intent, wdyt?

@xudong963
Copy link
Member Author

I suggest to reuse the benchmark utilities also for testing, random file group generation and the later sort order check is a great property test

Yes, that's what in my mind, after I receive the positive feedback from @leoyvens , I'll start to do the test stuff.

but I think those micro-benches are not executed in the CI?

Do we need to run benches in CI?

@leoyvens
Copy link
Contributor

I took some time to play with this, so I can provide an anecdotal report.

Conclusion
In my setup, this PR is a clear win to execution times.

Configurations
I compared three datafusion-cli configurations:

  • Main, default: with split_file_groups_by_statistics = false
  • Main, split by stat: with split_file_groups_by_statistics = true
  • Branch, split by stat: This branch with split_file_groups_by_statistics = true

With collect_statistics always set to true.

Data and Query
Dataset contains ~90 files, n_cpus=60. The table is created with:

create external table ordered_table stored as parquet location '<my_data>' with order (block_num);

There is zero overlap across files in the order column.

The query then is:

select * from ordered_table order by block_num limit 5000000;

Plan analysis

  • Main, default: DataSourceExec with 60 groups, then a SortPreservingMergeExec.
    • Groups: [['000000000.parquet', '000439510.parquet'], ['000879020.parquet', '001318530.parquet'], ['001758040.parquet', '002197550.parquet'], ...]
  • Main, split by stat: DataSourceExec with single sorted group, no SortPreservingMergeExec.
  • Branch, split by stat: DataSourceExec with 60 groups, then a SortPreservingMergeExec.
    • Groups: [['000000000.parquet', '021996315.parquet'], ['000439510.parquet', '021998105.parquet'], ['000879020.parquet', '021999879.parquet'] ...]

What is very interesting to observe is that my filenames are lexicographically sorted, so the default file grouping is pessimal, forcing SortPreservingMergeExec to read one group after the other. Meanwhile, the grouping in this branch is optimal and allows the merge exec to reads groups in parallel.

Timings
Not rigorously taken, object store noise is expected:

  • Main, default: 6 to 10 seconds.
  • Main, split by stat: 6 to 10 seconds.
  • Branch: Consistent ~4 seconds.

@leoyvens
Copy link
Contributor

In terms of default behaviour, I see there are planning time concerns relative to this PR. For cases like mine, where files are lexicographically sorted, just changing the way the default grouping algorithm distributes files into groups would suffice (round-robin distribution, rather than group-at-a-time). Sorting by statistics is not actually needed.

@xudong963
Copy link
Member Author

Thank you, @leoyvens!

I plan to add tests for the PR in the next two days, and then we can continue to move it forward. Thanks for all your review!

@xudong963 xudong963 force-pushed the improve_split_groups_by_statistics branch 2 times, most recently from 95000ff to 570e0e8 Compare April 1, 2025 07:16
@xudong963 xudong963 requested review from 2010YOUY01, suremarc, Dandandan and alamb and removed request for suremarc April 1, 2025 07:18
Copy link
Contributor

@suremarc suremarc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussing with @alamb and @wiedld in Discord I expect this will be superseded by ProgressiveEval, as implementing this as a global plan optimization will be able to take into account equivalence classes & statistics generated by physical plan nodes in between the SortPreservingMergeExec and DataSourceExec.

However, since @xudong963 went through the effort of making this improvement, I think we should go ahead and merge it anyway, I also think it will be useful to compare performance once ProgressiveEval arrives.

Thanks again for your hard work @xudong963

@@ -313,6 +314,78 @@ async fn find_first_newline(
Ok(index)
}

/// Generates test files with min-max statistics in different overlap patterns
/// Used by tests and benchmarks
pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec<FileGroup> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, based on my recent experience working in statistics, this function will be very helpful for testing functionality later

Copy link
Contributor

@2010YOUY01 2010YOUY01 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! Thank you @xudong963

@xudong963 xudong963 force-pushed the improve_split_groups_by_statistics branch from fdcb075 to ecb60f0 Compare April 3, 2025 02:30
@xudong963
Copy link
Member Author

I'm curious why the PR triggers the security audit CI

@alamb
Copy link
Contributor

alamb commented Apr 4, 2025

I'm curious why the PR triggers the security audit CI

@alamb
Copy link
Contributor

alamb commented Apr 4, 2025

Thanks @xudong963 @2010YOUY01 and @suremarc

@alamb alamb merged commit e08ef97 into apache:main Apr 4, 2025
28 of 29 checks passed
Copy link
Contributor

@berkaysynnada berkaysynnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could do a late review, thank you @xudong963 for the effort. I have one question:

table_schema: &SchemaRef,
file_groups: &[FileGroup],
sort_order: &LexOrdering,
target_partitions: usize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need target_partitions as a parameter here? I think if this function takes that, it should return Result<Option<...>>, in case of the groups cannot split into the given partition count, being ordered (so we shouldn't check that at the caller side).

The other option is removing that parameter, since the same logic can be written without that (no need to initiate the file_groups_indices based on this count)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate datasource Changes to the datasource crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Enable split_file_groups_by_statistics by default
7 participants