Skip to content

Commit 8c99313

Browse files
GH-48202: [C++][Parquet] Fix encoder & decoder logic to enable Parquet DB support on s390x
1 parent 2fb2f79 commit 8c99313

File tree

2 files changed

+137
-6
lines changed

2 files changed

+137
-6
lines changed

cpp/src/parquet/decoder.cc

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include "arrow/util/bitmap_ops.h"
4242
#include "arrow/util/byte_stream_split_internal.h"
4343
#include "arrow/util/checked_cast.h"
44+
#include "arrow/util/endian.h"
4445
#include "arrow/util/int_util_overflow.h"
4546
#include "arrow/util/logging_internal.h"
4647
#include "arrow/util/rle_encoding_internal.h"
@@ -406,9 +407,20 @@ int PlainDecoder<DType>::DecodeArrow(
406407
VisitBitRuns(valid_bits, valid_bits_offset, num_values,
407408
[&](int64_t position, int64_t run_length, bool is_valid) {
408409
if (is_valid) {
410+
#if ARROW_LITTLE_ENDIAN
409411
RETURN_NOT_OK(builder->AppendValues(
410412
reinterpret_cast<const value_type*>(data), run_length));
411413
data += run_length * sizeof(value_type);
414+
#else
415+
// On big-endian systems, we need to byte-swap each value
416+
// since Parquet data is stored in little-endian format
417+
for (int64_t i = 0; i < run_length; ++i) {
418+
value_type value = ::arrow::bit_util::FromLittleEndian(
419+
SafeLoadAs<value_type>(data));
420+
RETURN_NOT_OK(builder->Append(value));
421+
data += sizeof(value_type);
422+
}
423+
#endif
412424
} else {
413425
RETURN_NOT_OK(builder->AppendNulls(run_length));
414426
}
@@ -458,7 +470,24 @@ inline int DecodePlain(const uint8_t* data, int64_t data_size, int num_values,
458470
}
459471
// If bytes_to_decode == 0, data could be null
460472
if (bytes_to_decode > 0) {
473+
#if ARROW_LITTLE_ENDIAN
461474
memcpy(out, data, static_cast<size_t>(bytes_to_decode));
475+
#else
476+
// On big-endian systems, we need to byte-swap each value
477+
// since Parquet data is stored in little-endian format.
478+
// Only apply to integer and floating-point types that have FromLittleEndian support.
479+
if constexpr (std::is_same_v<T, int32_t> || std::is_same_v<T, uint32_t> ||
480+
std::is_same_v<T, int64_t> || std::is_same_v<T, uint64_t> ||
481+
std::is_same_v<T, float> || std::is_same_v<T, double>) {
482+
for (int i = 0; i < num_values; ++i) {
483+
out[i] = ::arrow::bit_util::FromLittleEndian(SafeLoadAs<T>(data));
484+
data += sizeof(T);
485+
}
486+
} else {
487+
// For other types (bool, Int96, etc.), just do memcpy
488+
memcpy(out, data, static_cast<size_t>(bytes_to_decode));
489+
}
490+
#endif
462491
}
463492
return static_cast<int>(bytes_to_decode);
464493
}
@@ -471,7 +500,7 @@ static inline int64_t ReadByteArray(const uint8_t* data, int64_t data_size,
471500
if (ARROW_PREDICT_FALSE(data_size < 4)) {
472501
ParquetException::EofException();
473502
}
474-
const int32_t len = SafeLoadAs<int32_t>(data);
503+
const int32_t len = ::arrow::bit_util::FromLittleEndian(SafeLoadAs<int32_t>(data));
475504
if (len < 0) {
476505
throw ParquetException("Invalid BYTE_ARRAY value");
477506
}
@@ -772,7 +801,8 @@ class PlainByteArrayDecoder : public PlainDecoder<ByteArrayType> {
772801
return Status::Invalid(
773802
"Invalid or truncated PLAIN-encoded BYTE_ARRAY data");
774803
}
775-
auto value_len = SafeLoadAs<int32_t>(data_);
804+
auto value_len =
805+
::arrow::bit_util::FromLittleEndian(SafeLoadAs<int32_t>(data_));
776806
if (ARROW_PREDICT_FALSE(value_len < 0 || value_len > len_ - 4)) {
777807
return Status::Invalid(
778808
"Invalid or truncated PLAIN-encoded BYTE_ARRAY data");
@@ -817,7 +847,8 @@ class PlainByteArrayDecoder : public PlainDecoder<ByteArrayType> {
817847
return Status::Invalid(
818848
"Invalid or truncated PLAIN-encoded BYTE_ARRAY data");
819849
}
820-
auto value_len = SafeLoadAs<int32_t>(data_);
850+
auto value_len =
851+
::arrow::bit_util::FromLittleEndian(SafeLoadAs<int32_t>(data_));
821852
if (ARROW_PREDICT_FALSE(value_len < 0 || value_len > len_ - 4)) {
822853
return Status::Invalid(
823854
"Invalid or truncated PLAIN-encoded BYTE_ARRAY data");
@@ -1616,9 +1647,17 @@ class DeltaBitPackDecoder : public TypedDecoderImpl<DType> {
16161647
for (int j = 0; j < values_decode; ++j) {
16171648
// Addition between min_delta, packed int and last_value should be treated as
16181649
// unsigned addition. Overflow is as expected.
1650+
#if ARROW_LITTLE_ENDIAN
16191651
buffer[i + j] = static_cast<UT>(min_delta_) + static_cast<UT>(buffer[i + j]) +
16201652
static_cast<UT>(last_value_);
16211653
last_value_ = buffer[i + j];
1654+
#else
1655+
UT temp = static_cast<UT>(min_delta_) +
1656+
static_cast<UT>(static_cast<uint64_t>(buffer[i + j])) +
1657+
static_cast<UT>(last_value_);
1658+
buffer[i + j] = static_cast<T>(temp);
1659+
last_value_ = static_cast<T>(temp);
1660+
#endif
16221661
}
16231662
values_remaining_current_mini_block_ -= values_decode;
16241663
i += values_decode;
@@ -2306,6 +2345,17 @@ class ByteStreamSplitDecoder<FLBAType> : public ByteStreamSplitDecoderBase<FLBAT
23062345
const int num_decoded = this->DecodeRaw(decode_out, max_values);
23072346
DCHECK_EQ(num_decoded, max_values);
23082347

2348+
#if !ARROW_LITTLE_ENDIAN
2349+
// On big-endian, ByteStreamSplitDecode (DoMergeStreams) reverses stream positions
2350+
// to produce numeric values in native byte order. For FLBA (opaque byte arrays),
2351+
// we need to undo this reversal to preserve the original byte sequence.
2352+
const int type_length = this->type_length_;
2353+
for (int i = 0; i < num_decoded; ++i) {
2354+
uint8_t* value_ptr = decode_out + static_cast<int64_t>(type_length) * i;
2355+
std::reverse(value_ptr, value_ptr + type_length);
2356+
}
2357+
#endif
2358+
23092359
for (int i = 0; i < num_decoded; ++i) {
23102360
buffer[i] =
23112361
FixedLenByteArray(decode_out + static_cast<int64_t>(this->type_length_) * i);

cpp/src/parquet/encoder.cc

Lines changed: 84 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,8 @@ class PlainEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
162162

163163
void UnsafePutByteArray(const void* data, uint32_t length) {
164164
DCHECK(length == 0 || data != nullptr) << "Value ptr cannot be NULL";
165-
sink_.UnsafeAppend(&length, sizeof(uint32_t));
165+
uint32_t length_le = ::arrow::bit_util::ToLittleEndian(length);
166+
sink_.UnsafeAppend(&length_le, sizeof(uint32_t));
166167
sink_.UnsafeAppend(data, static_cast<int64_t>(length));
167168
unencoded_byte_array_data_bytes_ += length;
168169
}
@@ -201,7 +202,27 @@ class PlainEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
201202
template <typename DType>
202203
void PlainEncoder<DType>::Put(const T* buffer, int num_values) {
203204
if (num_values > 0) {
205+
#if ARROW_LITTLE_ENDIAN
204206
PARQUET_THROW_NOT_OK(sink_.Append(buffer, num_values * sizeof(T)));
207+
#else
208+
// On big-endian systems, we need to byte-swap each value
209+
// since Parquet data must be stored in little-endian format.
210+
if constexpr (std::is_same_v<T, int32_t> || std::is_same_v<T, uint32_t> ||
211+
std::is_same_v<T, int64_t> || std::is_same_v<T, uint64_t> ||
212+
std::is_same_v<T, float> || std::is_same_v<T, double>) {
213+
PARQUET_ASSIGN_OR_THROW(
214+
auto temp_buffer,
215+
::arrow::AllocateBuffer(num_values * sizeof(T), this->memory_pool()));
216+
T* temp_data = temp_buffer->template mutable_data_as<T>();
217+
for (int i = 0; i < num_values; ++i) {
218+
temp_data[i] = ::arrow::bit_util::ToLittleEndian(buffer[i]);
219+
}
220+
PARQUET_THROW_NOT_OK(sink_.Append(temp_data, num_values * sizeof(T)));
221+
} else {
222+
// For other types (Int96, etc.), just do memcpy
223+
PARQUET_THROW_NOT_OK(sink_.Append(buffer, num_values * sizeof(T)));
224+
}
225+
#endif
205226
}
206227
}
207228

@@ -224,6 +245,7 @@ void DirectPutImpl(const ::arrow::Array& values, ::arrow::BufferBuilder* sink) {
224245
constexpr auto value_size = sizeof(value_type);
225246
auto raw_values = checked_cast<const ArrayType&>(values).raw_values();
226247

248+
#if ARROW_LITTLE_ENDIAN
227249
if (values.null_count() == 0) {
228250
// no nulls, just dump the data
229251
PARQUET_THROW_NOT_OK(sink->Append(raw_values, values.length() * value_size));
@@ -237,6 +259,32 @@ void DirectPutImpl(const ::arrow::Array& values, ::arrow::BufferBuilder* sink) {
237259
}
238260
}
239261
}
262+
#else
263+
// On big-endian systems, we need to byte-swap each value
264+
// since Parquet data must be stored in little-endian format.
265+
if constexpr (std::is_same_v<value_type, int32_t> ||
266+
std::is_same_v<value_type, uint32_t> ||
267+
std::is_same_v<value_type, int64_t> ||
268+
std::is_same_v<value_type, uint64_t> ||
269+
std::is_same_v<value_type, float> || std::is_same_v<value_type, double>) {
270+
PARQUET_THROW_NOT_OK(
271+
sink->Reserve((values.length() - values.null_count()) * value_size));
272+
for (int64_t i = 0; i < values.length(); i++) {
273+
if (values.IsValid(i)) {
274+
auto le_value = ::arrow::bit_util::ToLittleEndian(raw_values[i]);
275+
sink->UnsafeAppend(&le_value, value_size);
276+
}
277+
}
278+
} else {
279+
PARQUET_THROW_NOT_OK(
280+
sink->Reserve((values.length() - values.null_count()) * value_size));
281+
for (int64_t i = 0; i < values.length(); i++) {
282+
if (values.IsValid(i)) {
283+
sink->UnsafeAppend(&raw_values[i], value_size);
284+
}
285+
}
286+
}
287+
#endif
240288
}
241289

242290
template <>
@@ -649,17 +697,36 @@ class DictEncoderImpl : public EncoderImpl, virtual public DictEncoder<DType> {
649697

650698
template <typename DType>
651699
void DictEncoderImpl<DType>::WriteDict(uint8_t* buffer) const {
652-
// For primitive types, only a memcpy
700+
// For primitive types, copy values with endianness conversion
653701
DCHECK_EQ(static_cast<size_t>(dict_encoded_size_), sizeof(T) * memo_table_.size());
702+
#if ARROW_LITTLE_ENDIAN
654703
memo_table_.CopyValues(0 /* start_pos */, reinterpret_cast<T*>(buffer));
704+
#else
705+
// On big-endian systems, we need to byte-swap each value
706+
// since Parquet data must be stored in little-endian format.
707+
if constexpr (std::is_same_v<T, int32_t> || std::is_same_v<T, uint32_t> ||
708+
std::is_same_v<T, int64_t> || std::is_same_v<T, uint64_t> ||
709+
std::is_same_v<T, float> || std::is_same_v<T, double>) {
710+
std::vector<T> temp(memo_table_.size());
711+
memo_table_.CopyValues(0 /* start_pos */, temp.data());
712+
T* out = reinterpret_cast<T*>(buffer);
713+
for (size_t i = 0; i < temp.size(); ++i) {
714+
out[i] = ::arrow::bit_util::ToLittleEndian(temp[i]);
715+
}
716+
} else {
717+
// For other types (Int96, etc.), just do memcpy
718+
memo_table_.CopyValues(0 /* start_pos */, reinterpret_cast<T*>(buffer));
719+
}
720+
#endif
655721
}
656722

657723
// ByteArray and FLBA already have the dictionary encoded in their data heaps
658724
template <>
659725
void DictEncoderImpl<ByteArrayType>::WriteDict(uint8_t* buffer) const {
660726
memo_table_.VisitValues(0, [&buffer](::std::string_view v) {
661727
uint32_t len = static_cast<uint32_t>(v.length());
662-
memcpy(buffer, &len, sizeof(len));
728+
uint32_t len_le = ::arrow::bit_util::ToLittleEndian(len);
729+
memcpy(buffer, &len_le, sizeof(len_le));
663730
buffer += sizeof(len);
664731
memcpy(buffer, v.data(), len);
665732
buffer += len;
@@ -924,6 +991,8 @@ class ByteStreamSplitEncoder : public ByteStreamSplitEncoderBase<DType> {
924991

925992
void Put(const T* buffer, int num_values) override {
926993
if (num_values > 0) {
994+
// ByteStreamSplitEncode (DoSplitStreams) handles endianness correctly,
995+
// so we can directly append the native byte representation
927996
PARQUET_THROW_NOT_OK(
928997
this->sink_.Append(reinterpret_cast<const uint8_t*>(buffer),
929998
num_values * static_cast<int64_t>(sizeof(T))));
@@ -964,10 +1033,22 @@ class ByteStreamSplitEncoder<FLBAType> : public ByteStreamSplitEncoderBase<FLBAT
9641033
if (byte_width_ > 0) {
9651034
const int64_t total_bytes = static_cast<int64_t>(num_values) * byte_width_;
9661035
PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes));
1036+
#if !ARROW_LITTLE_ENDIAN
1037+
// On big-endian, reverse bytes before encoding to compensate for
1038+
// DoSplitStreams reversal, ensuring FLBA bytes are preserved as-is
1039+
std::vector<uint8_t> temp_buffer(byte_width_);
1040+
#endif
9671041
for (int i = 0; i < num_values; ++i) {
9681042
// Write the result to the output stream
9691043
DCHECK(buffer[i].ptr != nullptr) << "Value ptr cannot be NULL";
1044+
#if !ARROW_LITTLE_ENDIAN
1045+
// Reverse bytes before appending
1046+
std::reverse_copy(buffer[i].ptr, buffer[i].ptr + byte_width_,
1047+
temp_buffer.begin());
1048+
sink_.UnsafeAppend(temp_buffer.data(), byte_width_);
1049+
#else
9701050
sink_.UnsafeAppend(buffer[i].ptr, byte_width_);
1051+
#endif
9711052
}
9721053
}
9731054
this->num_values_in_buffer_ += num_values;

0 commit comments

Comments
 (0)