diff --git a/cpp/src/parquet/decoder.cc b/cpp/src/parquet/decoder.cc index 431f2d26042..c69b3b9ccdf 100644 --- a/cpp/src/parquet/decoder.cc +++ b/cpp/src/parquet/decoder.cc @@ -41,6 +41,7 @@ #include "arrow/util/bitmap_ops.h" #include "arrow/util/byte_stream_split_internal.h" #include "arrow/util/checked_cast.h" +#include "arrow/util/endian.h" #include "arrow/util/int_util_overflow.h" #include "arrow/util/logging_internal.h" #include "arrow/util/rle_encoding_internal.h" @@ -406,9 +407,20 @@ int PlainDecoder::DecodeArrow( VisitBitRuns(valid_bits, valid_bits_offset, num_values, [&](int64_t position, int64_t run_length, bool is_valid) { if (is_valid) { +#if ARROW_LITTLE_ENDIAN RETURN_NOT_OK(builder->AppendValues( reinterpret_cast(data), run_length)); data += run_length * sizeof(value_type); +#else + // On big-endian systems, we need to byte-swap each value + // since Parquet data is stored in little-endian format + for (int64_t i = 0; i < run_length; ++i) { + value_type value = ::arrow::bit_util::FromLittleEndian( + SafeLoadAs(data)); + RETURN_NOT_OK(builder->Append(value)); + data += sizeof(value_type); + } +#endif } else { RETURN_NOT_OK(builder->AppendNulls(run_length)); } @@ -458,7 +470,24 @@ inline int DecodePlain(const uint8_t* data, int64_t data_size, int num_values, } // If bytes_to_decode == 0, data could be null if (bytes_to_decode > 0) { +#if ARROW_LITTLE_ENDIAN memcpy(out, data, static_cast(bytes_to_decode)); +#else + // On big-endian systems, we need to byte-swap each value + // since Parquet data is stored in little-endian format. + // Only apply to integer and floating-point types that have FromLittleEndian support. + if constexpr (std::is_same_v || std::is_same_v || + std::is_same_v || std::is_same_v || + std::is_same_v || std::is_same_v) { + for (int i = 0; i < num_values; ++i) { + out[i] = ::arrow::bit_util::FromLittleEndian(SafeLoadAs(data)); + data += sizeof(T); + } + } else { + // For other types (bool, Int96, etc.), just do memcpy + memcpy(out, data, static_cast(bytes_to_decode)); + } +#endif } return static_cast(bytes_to_decode); } @@ -471,7 +500,7 @@ static inline int64_t ReadByteArray(const uint8_t* data, int64_t data_size, if (ARROW_PREDICT_FALSE(data_size < 4)) { ParquetException::EofException(); } - const int32_t len = SafeLoadAs(data); + const int32_t len = ::arrow::bit_util::FromLittleEndian(SafeLoadAs(data)); if (len < 0) { throw ParquetException("Invalid BYTE_ARRAY value"); } @@ -772,7 +801,8 @@ class PlainByteArrayDecoder : public PlainDecoder { return Status::Invalid( "Invalid or truncated PLAIN-encoded BYTE_ARRAY data"); } - auto value_len = SafeLoadAs(data_); + auto value_len = + ::arrow::bit_util::FromLittleEndian(SafeLoadAs(data_)); if (ARROW_PREDICT_FALSE(value_len < 0 || value_len > len_ - 4)) { return Status::Invalid( "Invalid or truncated PLAIN-encoded BYTE_ARRAY data"); @@ -817,7 +847,8 @@ class PlainByteArrayDecoder : public PlainDecoder { return Status::Invalid( "Invalid or truncated PLAIN-encoded BYTE_ARRAY data"); } - auto value_len = SafeLoadAs(data_); + auto value_len = + ::arrow::bit_util::FromLittleEndian(SafeLoadAs(data_)); if (ARROW_PREDICT_FALSE(value_len < 0 || value_len > len_ - 4)) { return Status::Invalid( "Invalid or truncated PLAIN-encoded BYTE_ARRAY data"); @@ -1616,9 +1647,17 @@ class DeltaBitPackDecoder : public TypedDecoderImpl { for (int j = 0; j < values_decode; ++j) { // Addition between min_delta, packed int and last_value should be treated as // unsigned addition. Overflow is as expected. +#if ARROW_LITTLE_ENDIAN buffer[i + j] = static_cast(min_delta_) + static_cast(buffer[i + j]) + static_cast(last_value_); last_value_ = buffer[i + j]; +#else + UT temp = static_cast(min_delta_) + + static_cast(static_cast(buffer[i + j])) + + static_cast(last_value_); + buffer[i + j] = static_cast(temp); + last_value_ = static_cast(temp); +#endif } values_remaining_current_mini_block_ -= values_decode; i += values_decode; @@ -2306,6 +2345,17 @@ class ByteStreamSplitDecoder : public ByteStreamSplitDecoderBaseDecodeRaw(decode_out, max_values); DCHECK_EQ(num_decoded, max_values); +#if !ARROW_LITTLE_ENDIAN + // On big-endian, ByteStreamSplitDecode (DoMergeStreams) reverses stream positions + // to produce numeric values in native byte order. For FLBA (opaque byte arrays), + // we need to undo this reversal to preserve the original byte sequence. + const int type_length = this->type_length_; + for (int i = 0; i < num_decoded; ++i) { + uint8_t* value_ptr = decode_out + static_cast(type_length) * i; + std::reverse(value_ptr, value_ptr + type_length); + } +#endif + for (int i = 0; i < num_decoded; ++i) { buffer[i] = FixedLenByteArray(decode_out + static_cast(this->type_length_) * i); diff --git a/cpp/src/parquet/encoder.cc b/cpp/src/parquet/encoder.cc index 04f079ce70c..268de50019e 100644 --- a/cpp/src/parquet/encoder.cc +++ b/cpp/src/parquet/encoder.cc @@ -162,7 +162,8 @@ class PlainEncoder : public EncoderImpl, virtual public TypedEncoder { void UnsafePutByteArray(const void* data, uint32_t length) { DCHECK(length == 0 || data != nullptr) << "Value ptr cannot be NULL"; - sink_.UnsafeAppend(&length, sizeof(uint32_t)); + uint32_t length_le = ::arrow::bit_util::ToLittleEndian(length); + sink_.UnsafeAppend(&length_le, sizeof(uint32_t)); sink_.UnsafeAppend(data, static_cast(length)); unencoded_byte_array_data_bytes_ += length; } @@ -201,7 +202,37 @@ class PlainEncoder : public EncoderImpl, virtual public TypedEncoder { template void PlainEncoder::Put(const T* buffer, int num_values) { if (num_values > 0) { +#if ARROW_LITTLE_ENDIAN PARQUET_THROW_NOT_OK(sink_.Append(buffer, num_values * sizeof(T))); +#else + // On big-endian systems, except for bool type, we need to byte-swap each value, + // since Parquet data must be stored in little-endian format. + if constexpr (std::is_arithmetic_v && !(std::is_same_v)) { + constexpr int kSmallBufferSize = 128; + T* temp_data = nullptr; + std::array small_buffer; + std::unique_ptr<::arrow::Buffer> heap_buffer; + + // Use stack memory for smaller buffer sizes + if (num_values <= kSmallBufferSize) { + temp_data = small_buffer.data(); + } else { + // Use heap memory for larger sizes + PARQUET_ASSIGN_OR_THROW( + heap_buffer, + ::arrow::AllocateBuffer(num_values * sizeof(T), this->memory_pool())); + temp_data = reinterpret_cast(heap_buffer->mutable_data()); + } + + for (int i = 0; i < num_values; ++i) { + temp_data[i] = ::arrow::bit_util::ToLittleEndian(buffer[i]); + } + PARQUET_THROW_NOT_OK(sink_.Append(temp_data, num_values * sizeof(T))); + } else { + // For other types (Int96, etc.), just do memcpy + PARQUET_THROW_NOT_OK(sink_.Append(buffer, num_values * sizeof(T))); + } +#endif } } @@ -224,18 +255,38 @@ void DirectPutImpl(const ::arrow::Array& values, ::arrow::BufferBuilder* sink) { constexpr auto value_size = sizeof(value_type); auto raw_values = checked_cast(values).raw_values(); - if (values.null_count() == 0) { - // no nulls, just dump the data - PARQUET_THROW_NOT_OK(sink->Append(raw_values, values.length() * value_size)); - } else { - PARQUET_THROW_NOT_OK( - sink->Reserve((values.length() - values.null_count()) * value_size)); + const int64_t len = values.length(); + const int64_t nulls = values.null_count(); + const int64_t valid_count = len - nulls; - for (int64_t i = 0; i < values.length(); i++) { - if (values.IsValid(i)) { - sink->UnsafeAppend(&raw_values[i], value_size); - } +#if ARROW_LITTLE_ENDIAN + // Fast path: no nulls → bulk append + if (nulls == 0) { + PARQUET_THROW_NOT_OK(sink->Append(raw_values, len * value_size)); + return; + } +#endif + + // Reserve only once + PARQUET_THROW_NOT_OK(sink->Reserve(valid_count * value_size)); + + // Fallback path: need to check nulls OR endian conversion + for (int64_t i = 0; i < len; ++i) { + if (!values.IsValid(i)) continue; + +#if ARROW_LITTLE_ENDIAN + // Little-endian, nulls exist → per-element append + sink->UnsafeAppend(&raw_values[i], value_size); +#else + // Big-endian logic + if constexpr (std::is_arithmetic_v && + !(std::is_same_v)) { + auto le_value = ::arrow::bit_util::ToLittleEndian(raw_values[i]); + sink->UnsafeAppend(&le_value, value_size); + } else { + sink->UnsafeAppend(&raw_values[i], value_size); } +#endif } } @@ -649,9 +700,27 @@ class DictEncoderImpl : public EncoderImpl, virtual public DictEncoder { template void DictEncoderImpl::WriteDict(uint8_t* buffer) const { - // For primitive types, only a memcpy + // For primitive types, copy values with endianness conversion DCHECK_EQ(static_cast(dict_encoded_size_), sizeof(T) * memo_table_.size()); +#if ARROW_LITTLE_ENDIAN memo_table_.CopyValues(0 /* start_pos */, reinterpret_cast(buffer)); +#else + // On big-endian systems, we need to byte-swap each value + // since Parquet data must be stored in little-endian format. + if constexpr (std::is_same_v || std::is_same_v || + std::is_same_v || std::is_same_v || + std::is_same_v || std::is_same_v) { + std::vector temp(memo_table_.size()); + memo_table_.CopyValues(0 /* start_pos */, temp.data()); + T* out = reinterpret_cast(buffer); + for (size_t i = 0; i < temp.size(); ++i) { + out[i] = ::arrow::bit_util::ToLittleEndian(temp[i]); + } + } else { + // For other types (Int96, etc.), just do memcpy + memo_table_.CopyValues(0 /* start_pos */, reinterpret_cast(buffer)); + } +#endif } // ByteArray and FLBA already have the dictionary encoded in their data heaps @@ -659,7 +728,8 @@ template <> void DictEncoderImpl::WriteDict(uint8_t* buffer) const { memo_table_.VisitValues(0, [&buffer](::std::string_view v) { uint32_t len = static_cast(v.length()); - memcpy(buffer, &len, sizeof(len)); + uint32_t len_le = ::arrow::bit_util::ToLittleEndian(len); + memcpy(buffer, &len_le, sizeof(len_le)); buffer += sizeof(len); memcpy(buffer, v.data(), len); buffer += len; @@ -924,6 +994,8 @@ class ByteStreamSplitEncoder : public ByteStreamSplitEncoderBase { void Put(const T* buffer, int num_values) override { if (num_values > 0) { + // ByteStreamSplitEncode (DoSplitStreams) handles endianness correctly, + // so we can directly append the native byte representation PARQUET_THROW_NOT_OK( this->sink_.Append(reinterpret_cast(buffer), num_values * static_cast(sizeof(T)))); @@ -964,10 +1036,22 @@ class ByteStreamSplitEncoder : public ByteStreamSplitEncoderBase 0) { const int64_t total_bytes = static_cast(num_values) * byte_width_; PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes)); +#if !ARROW_LITTLE_ENDIAN + // On big-endian, reverse bytes before encoding to compensate for + // DoSplitStreams reversal, ensuring FLBA bytes are preserved as-is + std::vector temp_buffer(byte_width_); +#endif for (int i = 0; i < num_values; ++i) { // Write the result to the output stream DCHECK(buffer[i].ptr != nullptr) << "Value ptr cannot be NULL"; +#if !ARROW_LITTLE_ENDIAN + // Reverse bytes before appending + std::reverse_copy(buffer[i].ptr, buffer[i].ptr + byte_width_, + temp_buffer.begin()); + sink_.UnsafeAppend(temp_buffer.data(), byte_width_); +#else sink_.UnsafeAppend(buffer[i].ptr, byte_width_); +#endif } } this->num_values_in_buffer_ += num_values;