Skip to content

Commit 82d9390

Browse files
GH-48204 Fix Column Reader & Writer logic to enable Parquet DB support on s390x
1 parent 2fb2f79 commit 82d9390

File tree

4 files changed

+70
-3
lines changed

4 files changed

+70
-3
lines changed

cpp/src/parquet/column_reader.cc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include "arrow/util/checked_cast.h"
4242
#include "arrow/util/compression.h"
4343
#include "arrow/util/crc32.h"
44+
#include "arrow/util/endian.h"
4445
#include "arrow/util/int_util_overflow.h"
4546
#include "arrow/util/logging.h"
4647
#include "arrow/util/rle_encoding_internal.h"
@@ -112,7 +113,8 @@ int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level,
112113
if (data_size < 4) {
113114
throw ParquetException("Received invalid levels (corrupt data page?)");
114115
}
115-
num_bytes = ::arrow::util::SafeLoadAs<int32_t>(data);
116+
num_bytes =
117+
::arrow::bit_util::FromLittleEndian(::arrow::util::SafeLoadAs<int32_t>(data));
116118
if (num_bytes < 0 || num_bytes > data_size - 4) {
117119
throw ParquetException("Received invalid number of bytes (corrupt data page?)");
118120
}
@@ -132,7 +134,11 @@ int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level,
132134
"Number of buffered values too large (corrupt data page?)");
133135
}
134136
num_bytes = static_cast<int32_t>(bit_util::BytesForBits(num_bits));
137+
#if ARROW_LITTLE_ENDIAN
135138
if (num_bytes < 0 || num_bytes > data_size - 4) {
139+
#else
140+
if (num_bytes < 0 || num_bytes > data_size) {
141+
#endif
136142
throw ParquetException("Received invalid number of bytes (corrupt data page?)");
137143
}
138144
if (!bit_packed_decoder_) {

cpp/src/parquet/column_writer.cc

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -953,7 +953,8 @@ int64_t ColumnWriterImpl::RleEncodeLevels(const void* src_buffer,
953953
DCHECK_EQ(encoded, num_buffered_values_);
954954

955955
if (include_length_prefix) {
956-
reinterpret_cast<int32_t*>(dest_buffer->mutable_data())[0] = level_encoder_.len();
956+
::arrow::util::SafeStore(dest_buffer->mutable_data(),
957+
::arrow::bit_util::ToLittleEndian(level_encoder_.len()));
957958
}
958959

959960
return level_encoder_.len() + prefix_size;
@@ -2578,13 +2579,31 @@ struct SerializeFunctor<
25782579
if constexpr (std::is_same_v<ArrowType, ::arrow::Decimal64Type>) {
25792580
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[0]);
25802581
} else if constexpr (std::is_same_v<ArrowType, ::arrow::Decimal128Type>) {
2582+
#if ARROW_LITTLE_ENDIAN
2583+
// On little-endian: u64_in[0] = low, u64_in[1] = high
2584+
// Write high first for big-endian output
25812585
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[1]);
25822586
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[0]);
2587+
#else
2588+
// On big-endian: u64_in[0] = high, u64_in[1] = low
2589+
// Write high first for big-endian output
2590+
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[0]);
2591+
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[1]);
2592+
#endif
25832593
} else if constexpr (std::is_same_v<ArrowType, ::arrow::Decimal256Type>) {
2594+
#if ARROW_LITTLE_ENDIAN
2595+
// On little-endian: write words in reverse order (high to low)
25842596
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[3]);
25852597
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[2]);
25862598
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[1]);
25872599
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[0]);
2600+
#else
2601+
// On big-endian: write words in natural order (high to low)
2602+
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[0]);
2603+
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[1]);
2604+
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[2]);
2605+
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[3]);
2606+
#endif
25882607
}
25892608
scratch = reinterpret_cast<uint8_t*>(p);
25902609
}
@@ -2600,6 +2619,7 @@ struct SerializeFunctor<
26002619

26012620
// Requires a custom serializer because Float16s in Parquet are stored as a 2-byte
26022621
// (little-endian) FLBA, whereas in Arrow they're a native `uint16_t`.
2622+
#if ARROW_LITTLE_ENDIAN
26032623
template <>
26042624
struct SerializeFunctor<::parquet::FLBAType, ::arrow::HalfFloatType> {
26052625
Status Serialize(const ::arrow::HalfFloatArray& array, ArrowWriteContext*, FLBA* out) {
@@ -2621,6 +2641,38 @@ struct SerializeFunctor<::parquet::FLBAType, ::arrow::HalfFloatType> {
26212641
return FLBA{reinterpret_cast<const uint8_t*>(value_ptr)};
26222642
}
26232643
};
2644+
#else
2645+
template <>
2646+
struct SerializeFunctor<::parquet::FLBAType, ::arrow::HalfFloatType> {
2647+
Status Serialize(const ::arrow::HalfFloatArray& array, ArrowWriteContext*, FLBA* out) {
2648+
const uint16_t* values = array.raw_values();
2649+
const int64_t length = array.length();
2650+
2651+
// Allocate buffer for little-endian converted values
2652+
converted_values_.resize(length);
2653+
2654+
if (array.null_count() == 0) {
2655+
for (int64_t i = 0; i < length; ++i) {
2656+
converted_values_[i] = ::arrow::bit_util::ToLittleEndian(values[i]);
2657+
out[i] = FLBA{reinterpret_cast<const uint8_t*>(&converted_values_[i])};
2658+
}
2659+
} else {
2660+
for (int64_t i = 0; i < length; ++i) {
2661+
if (array.IsValid(i)) {
2662+
converted_values_[i] = ::arrow::bit_util::ToLittleEndian(values[i]);
2663+
out[i] = FLBA{reinterpret_cast<const uint8_t*>(&converted_values_[i])};
2664+
} else {
2665+
out[i] = FLBA{};
2666+
}
2667+
}
2668+
}
2669+
return Status::OK();
2670+
}
2671+
2672+
private:
2673+
std::vector<uint16_t> converted_values_;
2674+
};
2675+
#endif
26242676

26252677
template <>
26262678
Status TypedColumnWriterImpl<FLBAType>::WriteArrowDense(

cpp/src/parquet/column_writer.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
#include "arrow/type_fwd.h"
2525
#include "arrow/util/compression.h"
26+
#include "arrow/util/endian.h"
2627
#include "parquet/exception.h"
2728
#include "parquet/platform.h"
2829
#include "parquet/types.h"
@@ -260,13 +261,21 @@ constexpr int64_t kJulianEpochOffsetDays = INT64_C(2440588);
260261
template <int64_t UnitPerDay, int64_t NanosecondsPerUnit>
261262
inline void ArrowTimestampToImpalaTimestamp(const int64_t time, Int96* impala_timestamp) {
262263
int64_t julian_days = (time / UnitPerDay) + kJulianEpochOffsetDays;
264+
#if ARROW_LITTLE_ENDIAN
263265
(*impala_timestamp).value[2] = (uint32_t)julian_days;
266+
#endif
264267

265268
int64_t last_day_units = time % UnitPerDay;
266269
auto last_day_nanos = last_day_units * NanosecondsPerUnit;
270+
#if ARROW_LITTLE_ENDIAN
267271
// impala_timestamp will be unaligned every other entry so do memcpy instead
268272
// of assign and reinterpret cast to avoid undefined behavior.
269273
std::memcpy(impala_timestamp, &last_day_nanos, sizeof(int64_t));
274+
#else
275+
(*impala_timestamp).value[0] = static_cast<uint32_t>(last_day_nanos);
276+
(*impala_timestamp).value[1] = static_cast<uint32_t>(last_day_nanos >> 32);
277+
(*impala_timestamp).value[2] = static_cast<uint32_t>(julian_days);
278+
#endif
270279
}
271280

272281
constexpr int64_t kSecondsInNanos = INT64_C(1000000000);

0 commit comments

Comments
 (0)