Skip to content

Commit 3e0b644

Browse files
GH-48204 Fix Column Reader & Writer logic to enable Parquet DB support on s390x
1 parent 55587ef commit 3e0b644

File tree

3 files changed

+54
-4
lines changed

3 files changed

+54
-4
lines changed

cpp/src/parquet/column_reader.cc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include "arrow/util/checked_cast.h"
4242
#include "arrow/util/compression.h"
4343
#include "arrow/util/crc32.h"
44+
#include "arrow/util/endian.h"
4445
#include "arrow/util/int_util_overflow.h"
4546
#include "arrow/util/logging.h"
4647
#include "arrow/util/rle_encoding_internal.h"
@@ -112,7 +113,8 @@ int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level,
112113
if (data_size < 4) {
113114
throw ParquetException("Received invalid levels (corrupt data page?)");
114115
}
115-
num_bytes = ::arrow::util::SafeLoadAs<int32_t>(data);
116+
num_bytes =
117+
::arrow::bit_util::FromLittleEndian(::arrow::util::SafeLoadAs<int32_t>(data));
116118
if (num_bytes < 0 || num_bytes > data_size - 4) {
117119
throw ParquetException("Received invalid number of bytes (corrupt data page?)");
118120
}
@@ -132,7 +134,11 @@ int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level,
132134
"Number of buffered values too large (corrupt data page?)");
133135
}
134136
num_bytes = static_cast<int32_t>(bit_util::BytesForBits(num_bits));
137+
#if ARROW_LITTLE_ENDIAN
135138
if (num_bytes < 0 || num_bytes > data_size - 4) {
139+
#else
140+
if (num_bytes < 0 || num_bytes > data_size) {
141+
#endif
136142
throw ParquetException("Received invalid number of bytes (corrupt data page?)");
137143
}
138144
if (!bit_packed_decoder_) {

cpp/src/parquet/column_writer.cc

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -953,7 +953,8 @@ int64_t ColumnWriterImpl::RleEncodeLevels(const void* src_buffer,
953953
DCHECK_EQ(encoded, num_buffered_values_);
954954

955955
if (include_length_prefix) {
956-
reinterpret_cast<int32_t*>(dest_buffer->mutable_data())[0] = level_encoder_.len();
956+
::arrow::util::SafeStore(dest_buffer->mutable_data(),
957+
::arrow::bit_util::ToLittleEndian(level_encoder_.len()));
957958
}
958959

959960
return level_encoder_.len() + prefix_size;
@@ -2578,13 +2579,31 @@ struct SerializeFunctor<
25782579
if constexpr (std::is_same_v<ArrowType, ::arrow::Decimal64Type>) {
25792580
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[0]);
25802581
} else if constexpr (std::is_same_v<ArrowType, ::arrow::Decimal128Type>) {
2582+
#if ARROW_LITTLE_ENDIAN
2583+
// On little-endian: u64_in[0] = low, u64_in[1] = high
2584+
// Write high first for big-endian output
25812585
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[1]);
25822586
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[0]);
2587+
#else
2588+
// On big-endian: u64_in[0] = high, u64_in[1] = low
2589+
// Write high first for big-endian output
2590+
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[0]);
2591+
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[1]);
2592+
#endif
25832593
} else if constexpr (std::is_same_v<ArrowType, ::arrow::Decimal256Type>) {
2594+
#if ARROW_LITTLE_ENDIAN
2595+
// On little-endian: write words in reverse order (high to low)
25842596
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[3]);
25852597
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[2]);
25862598
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[1]);
25872599
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[0]);
2600+
#else
2601+
// On big-endian: write words in natural order (high to low)
2602+
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[0]);
2603+
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[1]);
2604+
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[2]);
2605+
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[3]);
2606+
#endif
25882607
}
25892608
scratch = reinterpret_cast<uint8_t*>(p);
25902609
}
@@ -2603,7 +2622,24 @@ struct SerializeFunctor<
26032622
template <>
26042623
struct SerializeFunctor<::parquet::FLBAType, ::arrow::HalfFloatType> {
26052624
Status Serialize(const ::arrow::HalfFloatArray& array, ArrowWriteContext*, FLBA* out) {
2625+
#if ARROW_LITTLE_ENDIAN
2626+
return SerializeLittleEndianValues(array, array.raw_values(), out);
2627+
#else
26062628
const uint16_t* values = array.raw_values();
2629+
const int64_t length = array.length();
2630+
converted_values_.resize(length);
2631+
for (int64_t i = 0; i < length; ++i) {
2632+
// We don't need IsValid() here. Non valid values are just ignored in
2633+
// SerializeLittleEndianValues().
2634+
converted_values_[i] = ::arrow::bit_util::ToLittleEndian(values[i]);
2635+
}
2636+
return SerializeLittleEndianValues(array, converted_values_.data(), out);
2637+
#endif
2638+
}
2639+
2640+
private:
2641+
Status SerializeLittleEndianValues(const ::arrow::HalfFloatArray& array,
2642+
const uint16_t* values, FLBA* out) {
26072643
if (array.null_count() == 0) {
26082644
for (int64_t i = 0; i < array.length(); ++i) {
26092645
out[i] = ToFLBA(&values[i]);
@@ -2616,10 +2652,13 @@ struct SerializeFunctor<::parquet::FLBAType, ::arrow::HalfFloatType> {
26162652
return Status::OK();
26172653
}
26182654

2619-
private:
26202655
FLBA ToFLBA(const uint16_t* value_ptr) const {
26212656
return FLBA{reinterpret_cast<const uint8_t*>(value_ptr)};
26222657
}
2658+
2659+
#if !ARROW_LITTLE_ENDIAN
2660+
std::vector<uint16_t> converted_values_;
2661+
#endif
26232662
};
26242663

26252664
template <>

cpp/src/parquet/column_writer.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,13 +260,18 @@ constexpr int64_t kJulianEpochOffsetDays = INT64_C(2440588);
260260
template <int64_t UnitPerDay, int64_t NanosecondsPerUnit>
261261
inline void ArrowTimestampToImpalaTimestamp(const int64_t time, Int96* impala_timestamp) {
262262
int64_t julian_days = (time / UnitPerDay) + kJulianEpochOffsetDays;
263-
(*impala_timestamp).value[2] = (uint32_t)julian_days;
263+
(*impala_timestamp).value[2] = static_cast<uint32_t>(julian_days);
264264

265265
int64_t last_day_units = time % UnitPerDay;
266266
auto last_day_nanos = last_day_units * NanosecondsPerUnit;
267+
#if ARROW_LITTLE_ENDIAN
267268
// impala_timestamp will be unaligned every other entry so do memcpy instead
268269
// of assign and reinterpret cast to avoid undefined behavior.
269270
std::memcpy(impala_timestamp, &last_day_nanos, sizeof(int64_t));
271+
#else
272+
(*impala_timestamp).value[0] = static_cast<uint32_t>(last_day_nanos);
273+
(*impala_timestamp).value[1] = static_cast<uint32_t>(last_day_nanos >> 32);
274+
#endif
270275
}
271276

272277
constexpr int64_t kSecondsInNanos = INT64_C(1000000000);

0 commit comments

Comments
 (0)