Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement custom RecordBatch serde for shuffle for improved performance #1190

Draft
wants to merge 75 commits into
base: main
Choose a base branch
from

Conversation

andygrove
Copy link
Member

@andygrove andygrove commented Dec 20, 2024

Which issue does this PR close?

Closes #1189

Builds on #1192

Rationale for this change

Arrow IPC is a good general purpose serde framework but we can get better performance by implementing specialized code optimized for Comet, which encodes single batches to shuffle blocks.

This PR implements a new BatchWriter and BatchReader and updates shuffle writer to use them when possible (when all data types are supported), falling back to Arrow IPC for other cases.

Specializations include:

  • The schema gets encoded to bytes just once per shuffle operation rather than once per batch. The encoded schema bytes are then written out directly with each shuffle block, avoiding the schema serde cost per batch.
  • Raw data, offset, and null buffers are written out directly with no flatbuffer encoding, no alignment, and no metadata

Microbenchmarks (encoding only, no compression)

Without compression, we see an almost 3x speedup in writes.

shuffle_writer/shuffle_writer: write encoded (enable_fast_encoding=true, compression=None)
                        time:   [6.2751 µs 6.2906 µs 6.2906 µs]
shuffle_writer/shuffle_writer: write encoded (enable_fast_encoding=false, compression=None)
                        time:   [18.591 µs 18.599 µs 18.599 µs]

Note that the time saved is tiny compared to compression costs, but it still helps. With this PR I am seeing a TPC-H time of 329s compared to 336s in #1192, which this PR builds on.

Spark takes 644s, so with this PR, we are 1.96x faster than Spark. We need to shave off another 7 seconds now to get to 2x (we may get this with the new ParquetExec work).

Benchmark Results

Single node TPC-H.

tpch_allqueries

Single node TPC-DS with optimized version of q72 (better join order).

tpcds_allqueries

TPC-H q3

Encoding + compression is now much closer to Gluten + Velox for the lineitem exchange (8.6s versus 6.2s).

Comet:

2024-12-29_12-08

Gluten + Velox:

2024-12-17_16-15

What changes are included in this PR?

How are these changes tested?

@andygrove andygrove force-pushed the experimental-fast-batch-serde branch from 49d0c27 to f7d8cce Compare December 20, 2024 17:07
use std::io::Write;
use std::sync::Arc;

pub fn write_batch_fast(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you going to end up implementing a form of arrow (stream) IPC?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I discovered that we may be able to just use https://docs.rs/arrow-ipc/latest/arrow_ipc/writer/struct.IpcDataGenerator.html#method.encoded_batch and am going to look into that next

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you going to end up implementing a form of arrow (stream) IPC?

Yes, but without using flatbuffers to align and encode anything, just the raw bytes, and without the metadata messages.


fn create_batch() -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("c0", DataType::Utf8, true),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @andygrove interesting if other datatypes keep the same performance benefit

@andygrove andygrove changed the title feat: Implement fast serde for single record batches [do not review] feat: Implement fast serde for single record batches Dec 21, 2024
@andygrove andygrove changed the title feat: Implement fast serde for single record batches feat: Implement custom RecordBatch serde for shuffle for improved performance Dec 30, 2024
@codecov-commenter
Copy link

codecov-commenter commented Dec 30, 2024

Codecov Report

Attention: Patch coverage is 79.50820% with 25 lines in your changes missing coverage. Please review.

Project coverage is 34.83%. Comparing base (2e0f00a) to head (ab95a9b).
Report is 3 commits behind head on main.

Files with missing lines Patch % Lines
...execution/shuffle/NativeBatchDecoderIterator.scala 71.95% 12 Missing and 11 partials ⚠️
...t/execution/shuffle/CometShuffleExchangeExec.scala 80.00% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@              Coverage Diff              @@
##               main    #1190       +/-   ##
=============================================
- Coverage     56.94%   34.83%   -22.11%     
- Complexity      929      990       +61     
=============================================
  Files           112      116        +4     
  Lines         10985    43844    +32859     
  Branches       2119     9564     +7445     
=============================================
+ Hits           6255    15274     +9019     
- Misses         3617    25599    +21982     
- Partials       1113     2971     +1858     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement faster single batch encoding/decoding for use in shuffle
5 participants