Skip to content

Commit 0c20874

Browse files
authored
GH-47981: [C++][Parquet] Add compatibility with non-compliant RLE stream (#47992)
### Rationale for this change RLE-bit-packed streams are required by the Parquet spec to have 8-padded bit-packed runs, but some non-compliant encoders (such as Polars versions before pola-rs/polars#13883) might generate a truncated last bit-packed run, which nevertheless contains enough *logical* values. ### What changes are included in this PR? 1. Compatibility code for non-compliant RLE streams as described above 2. Guard against zero-size dictionaries to avoid hitting an assertion in `DictionaryConverter` ### Are these changes tested? Yes, by additional unit tests. ### Are there any user-facing changes? No, except a bugfix. * GitHub Issue: #47981 Authored-by: Antoine Pitrou <[email protected]> Signed-off-by: Antoine Pitrou <[email protected]>
1 parent c5bd9c3 commit 0c20874

File tree

3 files changed

+169
-11
lines changed

3 files changed

+169
-11
lines changed

cpp/src/arrow/util/bit_util.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -372,10 +372,12 @@ void PackBits(const uint32_t* values, uint8_t* out) {
372372
}
373373
}
374374

375-
constexpr int64_t MaxLEB128ByteLen(int64_t n_bits) { return CeilDiv(n_bits, 7); }
375+
constexpr int32_t MaxLEB128ByteLen(int32_t n_bits) {
376+
return static_cast<int32_t>(CeilDiv(n_bits, 7));
377+
}
376378

377379
template <typename Int>
378-
constexpr int64_t kMaxLEB128ByteLenFor = MaxLEB128ByteLen(sizeof(Int) * 8);
380+
constexpr int32_t kMaxLEB128ByteLenFor = MaxLEB128ByteLen(sizeof(Int) * 8);
379381

380382
/// Write a integer as LEB128
381383
///
@@ -441,7 +443,7 @@ constexpr int32_t WriteLEB128(Int value, uint8_t* out, int32_t max_out_size) {
441443
template <typename Int>
442444
constexpr int32_t ParseLeadingLEB128(const uint8_t* data, int32_t max_data_size,
443445
Int* out) {
444-
constexpr auto kMaxBytes = static_cast<int32_t>(kMaxLEB128ByteLenFor<Int>);
446+
constexpr auto kMaxBytes = kMaxLEB128ByteLenFor<Int>;
445447
static_assert(kMaxBytes >= 1);
446448
constexpr uint8_t kLow7Mask = 0x7F;
447449
constexpr uint8_t kContinuationBit = 0x80;

cpp/src/arrow/util/rle_encoding_internal.h

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -659,8 +659,8 @@ auto RleBitPackedParser::PeekImpl(Handler&& handler) const
659659

660660
constexpr auto kMaxSize = bit_util::kMaxLEB128ByteLenFor<uint32_t>;
661661
uint32_t run_len_type = 0;
662-
const auto header_bytes = bit_util::ParseLeadingLEB128(data_, kMaxSize, &run_len_type);
663-
662+
const auto header_bytes =
663+
bit_util::ParseLeadingLEB128(data_, std::min(kMaxSize, data_size_), &run_len_type);
664664
if (ARROW_PREDICT_FALSE(header_bytes == 0)) {
665665
// Malformed LEB128 data
666666
return {0, ControlFlow::Break};
@@ -670,7 +670,7 @@ auto RleBitPackedParser::PeekImpl(Handler&& handler) const
670670
const uint32_t count = run_len_type >> 1;
671671
if (is_bit_packed) {
672672
// Bit-packed run
673-
constexpr auto kMaxCount = bit_util::CeilDiv(internal::max_size_for_v<rle_size_t>, 8);
673+
constexpr auto kMaxCount = internal::max_size_for_v<rle_size_t> / 8;
674674
if (ARROW_PREDICT_FALSE(count == 0 || count > kMaxCount)) {
675675
// Illegal number of encoded values
676676
return {0, ControlFlow::Break};
@@ -679,12 +679,24 @@ auto RleBitPackedParser::PeekImpl(Handler&& handler) const
679679
ARROW_DCHECK_LT(static_cast<uint64_t>(count) * 8,
680680
internal::max_size_for_v<rle_size_t>);
681681
// Count Already divided by 8 for byte size calculations
682-
const auto bytes_read = header_bytes + static_cast<int64_t>(count) * value_bit_width_;
682+
auto bytes_read = header_bytes + static_cast<int64_t>(count) * value_bit_width_;
683+
auto values_count = static_cast<rle_size_t>(count * 8);
683684
if (ARROW_PREDICT_FALSE(bytes_read > data_size_)) {
684-
// Bit-packed run would overflow data buffer
685-
return {0, ControlFlow::Break};
685+
// Bit-packed run would overflow data buffer, but we might still be able
686+
// to return a truncated bit-packed such as generated by some non-compliant
687+
// encoders.
688+
// Example in GH-47981: column contains 25 5-bit values, has a single
689+
// bit-packed run with count=4 (theoretically 32 values), but only 17
690+
// bytes of RLE-bit-packed data (including the one-byte header).
691+
bytes_read = data_size_;
692+
values_count =
693+
static_cast<rle_size_t>((bytes_read - header_bytes) * 8 / value_bit_width_);
694+
// Only allow errors where the bit-packed run is not padded to a multiple
695+
// of 8 values. Larger truncation should not occur.
696+
if (values_count <= static_cast<rle_size_t>((count - 1) * 8)) {
697+
return {0, ControlFlow::Break};
698+
}
686699
}
687-
const auto values_count = static_cast<rle_size_t>(count * 8);
688700

689701
auto control = handler.OnBitPackedRun(
690702
BitPackedRun(data_ + header_bytes, values_count, value_bit_width_));
@@ -1215,7 +1227,8 @@ auto RleBitPackedDecoder<T>::GetBatchWithDict(const V* dictionary,
12151227
rle_size_t batch_size) -> rle_size_t {
12161228
using ControlFlow = RleBitPackedParser::ControlFlow;
12171229

1218-
if (ARROW_PREDICT_FALSE(batch_size <= 0)) {
1230+
if (ARROW_PREDICT_FALSE(batch_size <= 0 || dictionary_length == 0)) {
1231+
// Either empty batch or invalid dictionary
12191232
return 0;
12201233
}
12211234

@@ -1284,6 +1297,17 @@ auto RleBitPackedDecoder<T>::GetBatchWithDictSpaced(
12841297
if (null_count == 0) {
12851298
return GetBatchWithDict<V>(dictionary, dictionary_length, out, batch_size);
12861299
}
1300+
if (null_count == batch_size) {
1301+
// All nulls, avoid instantiating DictionaryConverter as dictionary_length
1302+
// could be 0.
1303+
std::fill(out, out + batch_size, V{});
1304+
return batch_size;
1305+
}
1306+
if (ARROW_PREDICT_FALSE(batch_size <= 0 || dictionary_length == 0)) {
1307+
// Either empty batch or invalid dictionary
1308+
return 0;
1309+
}
1310+
12871311
internal::DictionaryConverter<V, value_type> converter{dictionary, dictionary_length};
12881312

12891313
return GetSpaced(converter, out, batch_size, valid_bits, valid_bits_offset, null_count);

cpp/src/arrow/util/rle_encoding_test.cc

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#include "arrow/util/bit_util.h"
3636
#include "arrow/util/io_util.h"
3737
#include "arrow/util/rle_encoding_internal.h"
38+
#include "arrow/util/span.h"
3839

3940
namespace arrow::util {
4041

@@ -458,6 +459,29 @@ void TestRleBitPackedParser(std::vector<uint8_t> bytes, rle_size_t bit_width,
458459
EXPECT_EQ(decoded, expected);
459460
}
460461

462+
void TestRleBitPackedParserError(span<const uint8_t> bytes, rle_size_t bit_width) {
463+
auto parser =
464+
RleBitPackedParser(bytes.data(), static_cast<rle_size_t>(bytes.size()), bit_width);
465+
EXPECT_FALSE(parser.exhausted());
466+
467+
struct {
468+
auto OnRleRun(RleRun run) { return RleBitPackedParser::ControlFlow::Continue; }
469+
auto OnBitPackedRun(BitPackedRun run) {
470+
return RleBitPackedParser::ControlFlow::Continue;
471+
}
472+
} handler;
473+
474+
// Iterate over all runs
475+
parser.Parse(handler);
476+
// Non-exhaustion despite ControlFlow::Continue signals an error occurred.
477+
EXPECT_FALSE(parser.exhausted());
478+
}
479+
480+
void TestRleBitPackedParserError(const std::vector<uint8_t>& bytes,
481+
rle_size_t bit_width) {
482+
TestRleBitPackedParserError(span(bytes), bit_width);
483+
}
484+
461485
TEST(RleBitPacked, RleBitPackedParser) {
462486
TestRleBitPackedParser<uint16_t>(
463487
/* bytes= */
@@ -500,6 +524,114 @@ TEST(RleBitPacked, RleBitPackedParser) {
500524
}
501525
}
502526

527+
TEST(RleBitPacked, RleBitPackedParserInvalidNonPadded) {
528+
// GH-47981: a non-padded trailing bit-packed, produced by some non-compliant
529+
// encoders, should still be decoded successfully.
530+
531+
TestRleBitPackedParser<uint16_t>(
532+
/* bytes= */
533+
{/* LEB128 for 8 values bit packed marker */ 0x3,
534+
/* Bitpacked run */ 0x88, 0xc6},
535+
/* bit_width= */ 3,
536+
/* expected= */ {0, 1, 2, 3, 4});
537+
TestRleBitPackedParser<uint16_t>(
538+
/* bytes= */
539+
{/* LEB128 for 8 values bit packed marker */ 0x3,
540+
/* Bitpacked run */ 0x88},
541+
/* bit_width= */ 3,
542+
/* expected= */ {0, 1});
543+
TestRleBitPackedParser<uint16_t>(
544+
/* bytes= */
545+
{/* LEB128 for 8 values bit packed marker */ 0x3,
546+
/* Bitpacked run */ 0x1, 0x2, 0x3},
547+
/* bit_width= */ 8,
548+
/* expected= */ {1, 2, 3});
549+
TestRleBitPackedParser<uint16_t>(
550+
/* bytes= */
551+
{/* LEB128 for 8 values bit packed marker */ 0x3,
552+
/* Bitpacked run */ 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7},
553+
/* bit_width= */ 8,
554+
/* expected= */ {1, 2, 3, 4, 5, 6, 7});
555+
TestRleBitPackedParser<uint16_t>(
556+
/* bytes= */
557+
{/* LEB128 for 16 values bit packed marker */ 0x5,
558+
/* Bitpacked run */ 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9},
559+
/* bit_width= */ 8,
560+
/* expected= */ {1, 2, 3, 4, 5, 6, 7, 8, 9});
561+
562+
// If the trailing bit-packed declares more values than padding allows,
563+
// it's an error.
564+
565+
// 2 values encoded, 16 values declared (8 would be ok)
566+
TestRleBitPackedParserError(
567+
/* bytes= */
568+
{/* LEB128 for 16 values bit packed marker */ 0x5,
569+
/* Bitpacked run */ 0x88},
570+
/* bit_width= */ 3);
571+
// 8 values encoded, 16 values declared (8 would be ok)
572+
TestRleBitPackedParserError(
573+
/* bytes= */
574+
{/* LEB128 for 16 values bit packed marker */ 0x5,
575+
/* Bitpacked run */ 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8},
576+
/* bit_width= */ 8);
577+
578+
// If the trailing bit-packed run does not have room for at least 1 value,
579+
// it's an error.
580+
581+
TestRleBitPackedParserError(
582+
/* bytes= */
583+
{/* LEB128 for 8 values bit packed marker */ 0x3},
584+
/* bit_width= */ 3);
585+
TestRleBitPackedParserError(
586+
/* bytes= */
587+
{/* LEB128 for 8 values bit packed marker */ 0x3,
588+
/* Bitpacked run */ 0x1},
589+
/* bit_width= */ 9);
590+
}
591+
592+
TEST(RleBitPacked, RleBitPackedParserErrors) {
593+
using V = std::vector<uint8_t>;
594+
595+
// Truncated LEB128 header
596+
TestRleBitPackedParserError(
597+
/* bytes= */
598+
V{0x81},
599+
/* bit_width= */ 3);
600+
601+
// Invalid LEB128 header for a 32-bit value
602+
TestRleBitPackedParserError(
603+
/* bytes= */
604+
V{0xFF, 0xFF, 0xFF, 0xFF, 0x7f},
605+
/* bit_width= */ 3);
606+
607+
// Zero-length repeated run
608+
TestRleBitPackedParserError(
609+
/* bytes= */
610+
V{0x00, 0x00},
611+
/* bit_width= */ 3);
612+
TestRleBitPackedParserError(
613+
/* bytes= */
614+
V{0x80, 0x00, 0x00},
615+
/* bit_width= */ 3);
616+
617+
// Zero-length bit-packed run
618+
TestRleBitPackedParserError(
619+
/* bytes= */
620+
V{0x01},
621+
/* bit_width= */ 3);
622+
TestRleBitPackedParserError(
623+
/* bytes= */
624+
V{0x80, 0x01},
625+
/* bit_width= */ 3);
626+
627+
// Bit-packed run too large
628+
// (we pass a span<> on invalid memory, but only the reachable part should be read)
629+
std::vector<uint8_t> bytes = {0x81, 0x80, 0x80, 0x80, 0x02};
630+
TestRleBitPackedParserError(
631+
/* bytes= */ span(bytes.data(), 1ULL << 30),
632+
/* bit_width= */ 1);
633+
}
634+
503635
// Validates encoding of values by encoding and decoding them. If
504636
// expected_encoding != NULL, also validates that the encoded buffer is
505637
// exactly 'expected_encoding'.

0 commit comments

Comments
 (0)