Skip to content

Commit 2b5b918

Browse files
authored
Fix problem writing sliced BooleanBuffers as fast-encoding format (#1522)
## Which issue does this PR close? Closes #1520. ## Rationale for this change This is a problem I found when working on #1511, the null bits were not correctly written and caused test failures. This patch is an attempt to fix it. This patch is only aiming for fixing correctness problems. As #1190 (comment) pointed out, the fast BatchWriter may write full data buffer for sliced `Utf8` arrays, so there's still some performance implications when working with sliced arrays. ## What changes are included in this PR? Correctly take slicing indices and length into account when writing BooleanBuffers. This applies to null bits of all arrays, and the values of boolean arrays. ## How are these changes tested? Added a new round-trip test for sliced record batches.
1 parent a31ece9 commit 2b5b918

File tree

1 file changed

+38
-3
lines changed
  • native/core/src/execution/shuffle

1 file changed

+38
-3
lines changed

native/core/src/execution/shuffle/codec.rs

+38-3
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ impl<W: Write> BatchWriter<W> {
181181
// be determined from the data buffer size (length is in bits rather than bytes)
182182
self.write_all(&arr.len().to_le_bytes())?;
183183
// write data buffer
184-
self.write_buffer(arr.values().inner())?;
184+
self.write_boolean_buffer(arr.values())?;
185185
// write null buffer
186186
self.write_null_buffer(arr.nulls())?;
187187
}
@@ -300,8 +300,7 @@ impl<W: Write> BatchWriter<W> {
300300
// write null buffer length in bits
301301
self.write_all(&buffer.len().to_le_bytes())?;
302302
// write null buffer
303-
let buffer = buffer.inner();
304-
self.write_buffer(buffer)?;
303+
self.write_boolean_buffer(buffer)?;
305304
} else {
306305
self.inner.write_all(&0_usize.to_le_bytes())?;
307306
}
@@ -315,6 +314,19 @@ impl<W: Write> BatchWriter<W> {
315314
self.inner.write_all(buffer.as_slice())
316315
}
317316

317+
fn write_boolean_buffer(&mut self, buffer: &BooleanBuffer) -> std::io::Result<()> {
318+
let inner_buffer = buffer.inner();
319+
if buffer.offset() == 0 && buffer.len() == inner_buffer.len() {
320+
// Not a sliced buffer, write the inner buffer directly
321+
self.write_buffer(inner_buffer)?;
322+
} else {
323+
// Sliced buffer, create and write the sliced buffer
324+
let buffer = buffer.sliced();
325+
self.write_buffer(&buffer)?;
326+
}
327+
Ok(())
328+
}
329+
318330
pub fn inner(self) -> W {
319331
self.inner
320332
}
@@ -621,6 +633,29 @@ mod test {
621633
assert_eq!(batch, batch2);
622634
}
623635

636+
#[test]
637+
fn roundtrip_sliced() {
638+
let batch = create_batch(8192, true);
639+
640+
let mut start = 0;
641+
let batch_size = 128;
642+
while start < batch.num_rows() {
643+
let end = (start + batch_size).min(batch.num_rows());
644+
let sliced_batch = batch.slice(start, end - start);
645+
let buffer = Vec::new();
646+
let mut writer = BatchWriter::new(buffer);
647+
writer.write_partial_schema(&sliced_batch.schema()).unwrap();
648+
writer.write_batch(&sliced_batch).unwrap();
649+
let buffer = writer.inner();
650+
651+
let mut reader = BatchReader::new(&buffer);
652+
let batch2 = reader.read_batch().unwrap();
653+
assert_eq!(sliced_batch, batch2);
654+
655+
start = end;
656+
}
657+
}
658+
624659
fn create_batch(num_rows: usize, allow_nulls: bool) -> RecordBatch {
625660
let schema = Arc::new(Schema::new(vec![
626661
Field::new("bool", DataType::Boolean, true),

0 commit comments

Comments
 (0)