Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perf: Support automatically concat_batches for sort which will improve performance #15375

Open
zhuqi-lucas opened this issue Mar 24, 2025 · 4 comments · May be fixed by #15380
Open

Perf: Support automatically concat_batches for sort which will improve performance #15375

zhuqi-lucas opened this issue Mar 24, 2025 · 4 comments · May be fixed by #15380
Assignees
Labels
enhancement New feature or request

Comments

@zhuqi-lucas
Copy link
Contributor

Is your feature request related to a problem or challenge?

We should investigate and improve the sort code to support concat_batches for more cases besides the following case:

       // If less than sort_in_place_threshold_bytes, concatenate and sort in place
        if self.reservation.size() < self.sort_in_place_threshold_bytes {
            // Concatenate memory batches together and sort
            let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
            self.in_mem_batches.clear();
            self.reservation
                .try_resize(get_reserved_byte_for_record_batch(&batch))?;
            let reservation = self.reservation.take();
            return self.sort_batch_stream(batch, metrics, reservation);
        }

See details about the performance improvement:

#15348 (comment)

Describe the solution you'd like

No response

Describe alternatives you've considered

No response

Additional context

No response

@zhuqi-lucas
Copy link
Contributor Author

take

@zhuqi-lucas
Copy link
Contributor Author

zhuqi-lucas commented Mar 24, 2025

cc @alamb
I did some POC testing, the performance improvement is very promising, even 30% improvement for some queries:

┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃       main ┃ concat_batches_for_sort ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Q12241.04ms │               1816.69ms │ +1.23x faster │
│ Q21841.01ms │               1496.73ms │ +1.23x faster │
│ Q312755.85ms │              12770.18ms │     no change │
│ Q44433.49ms │               3278.70ms │ +1.35x faster │
│ Q54414.15ms │               4409.04ms │     no change │
│ Q64543.09ms │               4597.32ms │     no change │
│ Q78012.85ms │               9026.30ms │  1.13x slower │
│ Q86572.37ms │               6049.51ms │ +1.09x faster │
│ Q96734.63ms │               6345.69ms │ +1.06x faster │
│ Q109896.16ms │               9564.17ms │     no change │
└──────────────┴────────────┴─────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                      ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main)61444.64ms │
│ Total Time (concat_batches_for_sort)59354.33ms │
│ Average Time (main)6144.46ms │
│ Average Time (concat_batches_for_sort)5935.43ms │
│ Queries Faster5 │
│ Queries Slower1 │
│ Queries with No Change4 │
└────────────────────────────────────────┴────────────┘

@2010YOUY01
Copy link
Contributor

This is a great observation, and the POC optimization has a high ROI. Here are some additional thoughts:

This is just intuition, but I think the sorting phase should be faster than the sort-preserving merge phase. Because the sorting implementation is much simpler, it can rely on the existing optimized arrow row format and also quick sort implementation from the standard library. On the contrary, merging phase we have an in-house implementation of a loser tree heap, I think it's a bit complex so maybe it is also hard to optimize manually.
As a result, perhaps it's preferred to put more work to be done during sorting, and leave minimal work to be done during merging (however, merging is still needed inside partial sort, because it can keep all the cores busy when partial sort is all finished)

Example

There is a sort query to run in 4 partitions, and each partition will process 100 input batches

  • Current implementation:
    • Inside partial sort, each batch will be sorted, and a 100-way merge will be contructed.
    • In the final stage, a 4-way merge will produce the final result.
  • Putting most work in sort strategy: (we can control the partial sort and final sort to have the same merge degree, so they can process data at around the same speed)
    • Inside partial sort, sort 25 batches into a single sorted run at once, and after that do a 4-way merge as the output of partial sort
    • In the final stage, there is the same 4-way merge to produce the final result.

Implementation

The POC will copy the batches with cocnat() than do a bigger sort, an alternative to try to avoid copies is: first sort all elements' indices (2-level index consists of (batch_idx, row_idx)), and get a permutation array.
Use the interleave kernel to construct the final result https://docs.rs/arrow/latest/arrow/compute/kernels/interleave/fn.interleave.html

@zhuqi-lucas
Copy link
Contributor Author

Thank you @2010YOUY01 for review and good suggestion, i will improve my POC code and add more testing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
2 participants