diff --git a/be/src/util/hash_util.hpp b/be/src/util/hash_util.hpp index afa8a1453864aa4..d73863ff97cf824 100644 --- a/be/src/util/hash_util.hpp +++ b/be/src/util/hash_util.hpp @@ -133,6 +133,9 @@ class HashUtil { // refer to https://github.com/apache/commons-codec/blob/master/src/main/java/org/apache/commons/codec/digest/MurmurHash3.java static const uint32_t MURMUR3_32_SEED = 104729; + // refer https://github.com/apache/spark/blob/v3.5.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala#L615 + static const uint32_t SPARK_MURMUR_32_SEED = 42; + // modify from https://github.com/aappleby/smhasher/blob/master/src/MurmurHash3.cpp static uint32_t murmur_hash3_32(const void* key, int32_t len, uint32_t seed) { uint32_t out = 0; @@ -140,6 +143,11 @@ class HashUtil { return out; } + static uint32_t murmur_hash3_32_null(uint32_t seed) { + static const int INT_VALUE = 0; + return murmur_hash3_32((const unsigned char*)(&INT_VALUE), 4, seed); + } + static const int MURMUR_R = 47; // Murmur2 hash implementation returning 64-bit hashes. diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h index b2cf72b016ff323..17ea9a0c94fafde 100644 --- a/be/src/vec/columns/column.h +++ b/be/src/vec/columns/column.h @@ -71,6 +71,18 @@ class SipHash; } \ } +#define DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL(SEED) \ + if (null_data == nullptr) { \ + for (size_t i = 0; i < s; i++) { \ + hashes[i] = HashUtil::murmur_hash3_32(&data[i], sizeof(T), SEED); \ + } \ + } else { \ + for (size_t i = 0; i < s; i++) { \ + if (null_data[i] == 0) \ + hashes[i] = HashUtil::murmur_hash3_32(&data[i], sizeof(T), SEED); \ + } \ + } + namespace doris::vectorized { class Arena; @@ -401,6 +413,21 @@ class IColumn : public COW { LOG(FATAL) << get_name() << " update_crc_with_value not supported"; } + /// Update state of murmur3 hash function (spark files) with value of n elements to avoid the virtual + /// function call null_data to mark whether need to do hash compute, null_data == nullptr + /// means all element need to do hash function, else only *null_data != 0 need to do hash func + virtual void update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type, + int32_t rows, uint32_t offset = 0, + const uint8_t* __restrict null_data = nullptr) const { + LOG(FATAL) << get_name() << "update_murmurs_with_value not supported"; + } + + // use range for one hash value to avoid virtual function call in loop + virtual void update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const { + LOG(FATAL) << get_name() << " update_murmur_with_value not supported"; + } + /** Removes elements that don't match the filter. * Is used in WHERE and HAVING operations. * If result_size_hint > 0, then makes advance reserve(result_size_hint) for the result column; diff --git a/be/src/vec/columns/column_array.cpp b/be/src/vec/columns/column_array.cpp index c936c28b885edd6..1e0de0d964ba42c 100644 --- a/be/src/vec/columns/column_array.cpp +++ b/be/src/vec/columns/column_array.cpp @@ -374,6 +374,60 @@ void ColumnArray::update_crcs_with_value(uint32_t* __restrict hash, PrimitiveTyp } } +// for every array row calculate murmurHash +void ColumnArray::update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const { + auto& offsets_column = get_offsets(); + if (hash == 0) { + hash = HashUtil::SPARK_MURMUR_32_SEED; + } + if (null_data) { + for (size_t i = start; i < end; ++i) { + if (null_data[i] == 0) { + size_t elem_size = offsets_column[i] - offsets_column[i - 1]; + if (elem_size == 0) { + hash = HashUtil::murmur_hash3_32(reinterpret_cast(&elem_size), + sizeof(elem_size), hash); + } else { + get_data().update_murmur_with_value(offsets_column[i - 1], offsets_column[i], + hash, nullptr); + } + } + } + } else { + for (size_t i = start; i < end; ++i) { + size_t elem_size = offsets_column[i] - offsets_column[i - 1]; + if (elem_size == 0) { + hash = HashUtil::murmur_hash3_32(reinterpret_cast(&elem_size), + sizeof(elem_size), hash); + } else { + get_data().update_murmur_with_value(offsets_column[i - 1], offsets_column[i], hash, + nullptr); + } + } + } +} + +void ColumnArray::update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type, + int32_t rows, uint32_t offset, + const uint8_t* __restrict null_data) const { + auto s = rows; + DCHECK(s == size()); + + if (null_data) { + for (size_t i = 0; i < s; ++i) { + // every row + if (null_data[i] == 0) { + update_murmur_with_value(i, i + 1, hash[i], nullptr); + } + } + } else { + for (size_t i = 0; i < s; ++i) { + update_murmur_with_value(i, i + 1, hash[i], nullptr); + } + } +} + void ColumnArray::insert(const Field& x) { const Array& array = doris::vectorized::get(x); size_t size = array.size(); diff --git a/be/src/vec/columns/column_array.h b/be/src/vec/columns/column_array.h index 01ce3bcfe28511b..9719f10b5c7fc0f 100644 --- a/be/src/vec/columns/column_array.h +++ b/be/src/vec/columns/column_array.h @@ -141,6 +141,8 @@ class ColumnArray final : public COWHelper { const uint8_t* __restrict null_data) const override; void update_crc_with_value(size_t start, size_t end, uint32_t& hash, const uint8_t* __restrict null_data) const override; + void update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const override; void update_hashes_with_value(std::vector& hashes, const uint8_t* __restrict null_data) const override; @@ -152,6 +154,10 @@ class ColumnArray final : public COWHelper { uint32_t offset = 0, const uint8_t* __restrict null_data = nullptr) const override; + void update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type, int32_t rows, + uint32_t offset = 0, + const uint8_t* __restrict null_data = nullptr) const override; + void insert_range_from(const IColumn& src, size_t start, size_t length) override; void insert(const Field& x) override; void insert_from(const IColumn& src_, size_t n) override; diff --git a/be/src/vec/columns/column_const.cpp b/be/src/vec/columns/column_const.cpp index 3fb851b2a9c8dd6..58f30a949bcf0b4 100644 --- a/be/src/vec/columns/column_const.cpp +++ b/be/src/vec/columns/column_const.cpp @@ -148,6 +148,24 @@ void ColumnConst::update_hashes_with_value(uint64_t* __restrict hashes, } } +void ColumnConst::update_murmurs_with_value(int32_t* __restrict hashes, doris::PrimitiveType type, + int32_t rows, uint32_t offset, + const uint8_t* __restrict null_data) const { + DCHECK(null_data == nullptr); + DCHECK(rows == size()); + auto real_data = data->get_data_at(0); + if (real_data.data == nullptr) { + for (int i = 0; i < rows; ++i) { + hashes[i] = HashUtil::murmur_hash3_32_null(HashUtil::SPARK_MURMUR_32_SEED); + } + } else { + for (int i = 0; i < rows; ++i) { + hashes[i] = HashUtil::murmur_hash3_32(real_data.data, real_data.size, + HashUtil::SPARK_MURMUR_32_SEED); + } + } +} + MutableColumns ColumnConst::scatter(ColumnIndex num_columns, const Selector& selector) const { if (s != selector.size()) { LOG(FATAL) << fmt::format("Size of selector ({}) doesn't match size of column ({})", diff --git a/be/src/vec/columns/column_const.h b/be/src/vec/columns/column_const.h index 8d03087cc3d9890..da27786016d7018 100644 --- a/be/src/vec/columns/column_const.h +++ b/be/src/vec/columns/column_const.h @@ -166,6 +166,11 @@ class ColumnConst final : public COWHelper { get_data_column_ptr()->update_crc_with_value(start, end, hash, nullptr); } + void update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const override { + get_data_column_ptr()->update_murmur_with_value(start, end, hash, nullptr); + } + void serialize_vec_with_null_map(std::vector& keys, size_t num_rows, const uint8_t* null_map) const override { data->serialize_vec_with_null_map(keys, num_rows, null_map); @@ -186,6 +191,10 @@ class ColumnConst final : public COWHelper { void update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const override; + void update_murmurs_with_value(int32_t* __restrict hashes, PrimitiveType type, int32_t rows, + uint32_t offset = 0, + const uint8_t* __restrict null_data = nullptr) const override; + ColumnPtr filter(const Filter& filt, ssize_t result_size_hint) const override; size_t filter(const Filter& filter) override; diff --git a/be/src/vec/columns/column_decimal.cpp b/be/src/vec/columns/column_decimal.cpp index 07508f8c6a895e6..57bdb8ac477eb4b 100644 --- a/be/src/vec/columns/column_decimal.cpp +++ b/be/src/vec/columns/column_decimal.cpp @@ -183,6 +183,56 @@ void ColumnDecimal::update_crcs_with_value(uint32_t* __restrict hashes, Primi } } +template +void ColumnDecimal::update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const { + if (null_data == nullptr) { + for (size_t i = start; i < end; i++) { + if constexpr (!IsDecimalV2) { + hash = HashUtil::murmur_hash3_32(&data[i], sizeof(T), + HashUtil::SPARK_MURMUR_32_SEED); + } else { + decimalv2_do_murmur(i, hash); + } + } + } else { + for (size_t i = start; i < end; i++) { + if (null_data[i] == 0) { + if constexpr (!IsDecimalV2) { + hash = HashUtil::murmur_hash3_32(&data[i], sizeof(T), + HashUtil::SPARK_MURMUR_32_SEED); + } else { + decimalv2_do_murmur(i, hash); + } + } + } + } +} + +template +void ColumnDecimal::update_murmurs_with_value(int32_t* __restrict hashes, PrimitiveType type, + int32_t rows, uint32_t offset, + const uint8_t* __restrict null_data) const { + auto s = rows; + DCHECK(s == size()); + + if constexpr (!IsDecimalV2) { + DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL(HashUtil::SPARK_MURMUR_32_SEED) + } else { + if (null_data == nullptr) { + for (size_t i = 0; i < s; i++) { + decimalv2_do_murmur(i, hashes[i]); + } + } else { + for (size_t i = 0; i < s; i++) { + if (null_data[i] == 0) { + decimalv2_do_murmur(i, hashes[i]); + } + } + } + } +} + template void ColumnDecimal::update_xxHash_with_value(size_t start, size_t end, uint64_t& hash, const uint8_t* __restrict null_data) const { diff --git a/be/src/vec/columns/column_decimal.h b/be/src/vec/columns/column_decimal.h index dfdfbb0d6b9dd83..36f0096adfb2566 100644 --- a/be/src/vec/columns/column_decimal.h +++ b/be/src/vec/columns/column_decimal.h @@ -178,12 +178,16 @@ class ColumnDecimal final : public COWHelper diff --git a/be/src/vec/columns/column_map.cpp b/be/src/vec/columns/column_map.cpp index d4b64f8c163d0ba..6cfa205ab69175a 100644 --- a/be/src/vec/columns/column_map.cpp +++ b/be/src/vec/columns/column_map.cpp @@ -308,6 +308,40 @@ void ColumnMap::update_crc_with_value(size_t start, size_t end, uint32_t& hash, } } +void ColumnMap::update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const { + auto& offsets = get_offsets(); + if (hash == 0) { + hash = HashUtil::SPARK_MURMUR_32_SEED; + } + if (null_data) { + for (size_t i = start; i < end; ++i) { + if (null_data[i] == 0) { + size_t kv_size = offsets[i] - offsets[i - 1]; + if (kv_size == 0) { + hash = HashUtil::murmur_hash3_32(reinterpret_cast(&kv_size), + sizeof(kv_size), hash); + } else { + get_keys().update_murmur_with_value(offsets[i - 1], offsets[i], hash, nullptr); + get_values().update_murmur_with_value(offsets[i - 1], offsets[i], hash, + nullptr); + } + } + } + } else { + for (size_t i = start; i < end; ++i) { + size_t kv_size = offsets[i] - offsets[i - 1]; + if (kv_size == 0) { + hash = HashUtil::murmur_hash3_32(reinterpret_cast(&kv_size), + sizeof(kv_size), hash); + } else { + get_keys().update_murmur_with_value(offsets[i - 1], offsets[i], hash, nullptr); + get_values().update_murmur_with_value(offsets[i - 1], offsets[i], hash, nullptr); + } + } + } +} + void ColumnMap::update_hashes_with_value(uint64_t* hashes, const uint8_t* null_data) const { size_t s = size(); if (null_data) { @@ -343,6 +377,26 @@ void ColumnMap::update_crcs_with_value(uint32_t* __restrict hash, PrimitiveType } } +void ColumnMap::update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type, + int32_t rows, uint32_t offset, + const uint8_t* __restrict null_data) const { + auto s = rows; + DCHECK(s == size()); + + if (null_data) { + for (size_t i = 0; i < s; ++i) { + // every row + if (null_data[i] == 0) { + update_murmur_with_value(i, i + 1, hash[i], nullptr); + } + } + } else { + for (size_t i = 0; i < s; ++i) { + update_murmur_with_value(i, i + 1, hash[i], nullptr); + } + } +} + void ColumnMap::insert_range_from(const IColumn& src, size_t start, size_t length) { if (length == 0) { return; diff --git a/be/src/vec/columns/column_map.h b/be/src/vec/columns/column_map.h index 752de2e10c76171..60b9e4ae8a29023 100644 --- a/be/src/vec/columns/column_map.h +++ b/be/src/vec/columns/column_map.h @@ -182,6 +182,8 @@ class ColumnMap final : public COWHelper { const uint8_t* __restrict null_data) const override; void update_crc_with_value(size_t start, size_t end, uint32_t& hash, const uint8_t* __restrict null_data) const override; + void update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const override; void update_hashes_with_value(std::vector& hashes, const uint8_t* __restrict null_data) const override; @@ -193,6 +195,10 @@ class ColumnMap final : public COWHelper { uint32_t offset = 0, const uint8_t* __restrict null_data = nullptr) const override; + void update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type, int32_t rows, + uint32_t offset = 0, + const uint8_t* __restrict null_data = nullptr) const override; + /******************** keys and values ***************/ const ColumnPtr& get_keys_ptr() const { return keys_column; } ColumnPtr& get_keys_ptr() { return keys_column; } diff --git a/be/src/vec/columns/column_nullable.cpp b/be/src/vec/columns/column_nullable.cpp index ecf330bead3ca69..a1ccda525c3a116 100644 --- a/be/src/vec/columns/column_nullable.cpp +++ b/be/src/vec/columns/column_nullable.cpp @@ -87,6 +87,23 @@ void ColumnNullable::update_crc_with_value(size_t start, size_t end, uint32_t& h } } +void ColumnNullable::update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const { + if (!has_null()) { + nested_column->update_murmur_with_value(start, end, hash, nullptr); + } else { + const auto* __restrict real_null_data = + assert_cast(*null_map).get_data().data(); + hash = HashUtil::SPARK_MURMUR_32_SEED; + for (int i = start; i < end; ++i) { + if (real_null_data[i] != 0) { + hash = HashUtil::murmur_hash3_32_null(hash); + } + } + nested_column->update_murmur_with_value(start, end, hash, real_null_data); + } +} + void ColumnNullable::update_hash_with_value(size_t n, SipHash& hash) const { if (is_null_at(n)) { hash.update(0); @@ -134,6 +151,27 @@ void ColumnNullable::update_crcs_with_value(uint32_t* __restrict hashes, doris:: } } +void ColumnNullable::update_murmurs_with_value(int32_t* __restrict hashes, + doris::PrimitiveType type, int32_t rows, + uint32_t offset, + const uint8_t* __restrict null_data) const { + DCHECK(null_data == nullptr); + auto s = rows; + DCHECK(s == size()); + const auto* __restrict real_null_data = + assert_cast(*null_map).get_data().data(); + if (!has_null()) { + nested_column->update_murmurs_with_value(hashes, type, rows, offset, nullptr); + } else { + for (int i = 0; i < s; ++i) { + if (real_null_data[i] != 0) { + hashes[i] = HashUtil::murmur_hash3_32_null(HashUtil::SPARK_MURMUR_32_SEED); + } + } + nested_column->update_murmurs_with_value(hashes, type, rows, offset, real_null_data); + } +} + void ColumnNullable::update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const { DCHECK(null_data == nullptr); diff --git a/be/src/vec/columns/column_nullable.h b/be/src/vec/columns/column_nullable.h index 10b0951ab8b96af..504edb6706ade50 100644 --- a/be/src/vec/columns/column_nullable.h +++ b/be/src/vec/columns/column_nullable.h @@ -218,6 +218,8 @@ class ColumnNullable final : public COWHelper { const uint8_t* __restrict null_data) const override; void update_crc_with_value(size_t start, size_t end, uint32_t& hash, const uint8_t* __restrict null_data) const override; + void update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const override; void update_hash_with_value(size_t n, SipHash& hash) const override; void update_hashes_with_value(std::vector& hashes, @@ -227,6 +229,9 @@ class ColumnNullable final : public COWHelper { const uint8_t* __restrict null_data) const override; void update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const override; + void update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type, int32_t rows, + uint32_t offset, + const uint8_t* __restrict null_data) const override; MutableColumns scatter(ColumnIndex num_columns, const Selector& selector) const override { return scatter_impl(num_columns, selector); diff --git a/be/src/vec/columns/column_string.cpp b/be/src/vec/columns/column_string.cpp index 424a8717e1498c8..746a6db846c0b14 100644 --- a/be/src/vec/columns/column_string.cpp +++ b/be/src/vec/columns/column_string.cpp @@ -178,6 +178,29 @@ void ColumnString::update_crcs_with_value(uint32_t* __restrict hashes, doris::Pr } } +void ColumnString::update_murmurs_with_value(int32_t* __restrict hashes, doris::PrimitiveType type, + int32_t rows, uint32_t offset, + const uint8_t* __restrict null_data) const { + auto s = rows; + DCHECK(s == size()); + + if (null_data == nullptr) { + for (size_t i = 0; i < s; i++) { + auto data_ref = get_data_at(i); + hashes[i] = HashUtil::murmur_hash3_32(data_ref.data, data_ref.size, + HashUtil::SPARK_MURMUR_32_SEED); + } + } else { + for (size_t i = 0; i < s; i++) { + if (null_data[i] == 0) { + auto data_ref = get_data_at(i); + hashes[i] = HashUtil::murmur_hash3_32(data_ref.data, data_ref.size, + HashUtil::SPARK_MURMUR_32_SEED); + } + } + } +} + ColumnPtr ColumnString::filter(const Filter& filt, ssize_t result_size_hint) const { if (offsets.size() == 0) { return ColumnString::create(); diff --git a/be/src/vec/columns/column_string.h b/be/src/vec/columns/column_string.h index e6b27f20054f3ee..a36ff8e21635eea 100644 --- a/be/src/vec/columns/column_string.h +++ b/be/src/vec/columns/column_string.h @@ -442,6 +442,25 @@ class ColumnString final : public COWHelper { } } + void update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const override { + if (null_data) { + for (size_t i = start; i < end; ++i) { + if (null_data[i] == 0) { + auto data_ref = get_data_at(i); + hash = HashUtil::murmur_hash3_32(data_ref.data, data_ref.size, + HashUtil::SPARK_MURMUR_32_SEED); + } + } + } else { + for (size_t i = start; i < end; ++i) { + auto data_ref = get_data_at(i); + hash = HashUtil::murmur_hash3_32(data_ref.data, data_ref.size, + HashUtil::SPARK_MURMUR_32_SEED); + } + } + } + void update_hash_with_value(size_t n, SipHash& hash) const override { size_t string_size = size_at(n); size_t offset = offset_at(n); @@ -460,6 +479,10 @@ class ColumnString final : public COWHelper { uint32_t offset, const uint8_t* __restrict null_data) const override; + void update_murmurs_with_value(int32_t* __restrict hashes, PrimitiveType type, int32_t rows, + uint32_t offset, + const uint8_t* __restrict null_data) const override; + void update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const override { auto s = size(); diff --git a/be/src/vec/columns/column_struct.cpp b/be/src/vec/columns/column_struct.cpp index 5a89b5d754c96a1..23f34c735fd425f 100644 --- a/be/src/vec/columns/column_struct.cpp +++ b/be/src/vec/columns/column_struct.cpp @@ -210,6 +210,13 @@ void ColumnStruct::update_crc_with_value(size_t start, size_t end, uint32_t& has } } +void ColumnStruct::update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const { + for (const auto& column : columns) { + column->update_murmur_with_value(start, end, hash, nullptr); + } +} + void ColumnStruct::update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const { for (const auto& column : columns) { @@ -225,6 +232,14 @@ void ColumnStruct::update_crcs_with_value(uint32_t* __restrict hash, PrimitiveTy } } +void ColumnStruct::update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type, + int32_t rows, uint32_t offset, + const uint8_t* __restrict null_data) const { + for (const auto& column : columns) { + column->update_murmurs_with_value(hash, type, rows, offset, null_data); + } +} + void ColumnStruct::insert_indices_from(const IColumn& src, const uint32_t* indices_begin, const uint32_t* indices_end) { const auto& src_concrete = assert_cast(src); diff --git a/be/src/vec/columns/column_struct.h b/be/src/vec/columns/column_struct.h index e2da7fd6440a5fb..e8494ec21017e1e 100644 --- a/be/src/vec/columns/column_struct.h +++ b/be/src/vec/columns/column_struct.h @@ -110,6 +110,8 @@ class ColumnStruct final : public COWHelper { const uint8_t* __restrict null_data) const override; void update_crc_with_value(size_t start, size_t end, uint32_t& hash, const uint8_t* __restrict null_data) const override; + void update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const override; void update_hashes_with_value(std::vector& hashes, const uint8_t* __restrict null_data) const override; @@ -121,6 +123,10 @@ class ColumnStruct final : public COWHelper { uint32_t offset = 0, const uint8_t* __restrict null_data = nullptr) const override; + void update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type, int32_t rows, + uint32_t offset = 0, + const uint8_t* __restrict null_data = nullptr) const override; + void insert_indices_from(const IColumn& src, const uint32_t* indices_begin, const uint32_t* indices_end) override; diff --git a/be/src/vec/columns/column_vector.cpp b/be/src/vec/columns/column_vector.cpp index 65b1b6308eec7f8..752699528aaed9a 100644 --- a/be/src/vec/columns/column_vector.cpp +++ b/be/src/vec/columns/column_vector.cpp @@ -202,6 +202,41 @@ void ColumnVector::update_crcs_with_value(uint32_t* __restrict hashes, Primit } } +template +void ColumnVector::update_murmurs_with_value(int32_t* __restrict hashes, PrimitiveType type, + int32_t rows, uint32_t offset, + const uint8_t* __restrict null_data) const { + auto s = rows; + DCHECK(s == size()); + + if constexpr (!std::is_same_v) { + DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL(HashUtil::SPARK_MURMUR_32_SEED) + } else { + if (type == TYPE_DATE || type == TYPE_DATETIME) { + char buf[64]; + auto date_convert_do_crc = [&](size_t i) { + const VecDateTimeValue& date_val = (const VecDateTimeValue&)data[i]; + auto len = date_val.to_buffer(buf); + hashes[i] = HashUtil::murmur_hash3_32(buf, len, HashUtil::SPARK_MURMUR_32_SEED); + }; + + if (null_data == nullptr) { + for (size_t i = 0; i < s; i++) { + date_convert_do_crc(i); + } + } else { + for (size_t i = 0; i < s; i++) { + if (null_data[i] == 0) { + date_convert_do_crc(i); + } + } + } + } else { + DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL(HashUtil::SPARK_MURMUR_32_SEED) + } + } +} + template struct ColumnVector::less { const Self& parent; diff --git a/be/src/vec/columns/column_vector.h b/be/src/vec/columns/column_vector.h index 53199027585abc3..a8ee11635ba6505 100644 --- a/be/src/vec/columns/column_vector.h +++ b/be/src/vec/columns/column_vector.h @@ -323,6 +323,25 @@ class ColumnVector final : public COWHelper> } } } + + void update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const override { + if (hash == 0) { + hash = HashUtil::SPARK_MURMUR_32_SEED; + } + if (null_data) { + for (size_t i = start; i < end; i++) { + if (null_data[i] == 0) { + hash = HashUtil::murmur_hash3_32(&data[i], sizeof(T), hash); + } + } + } else { + for (size_t i = start; i < end; i++) { + hash = HashUtil::murmur_hash3_32(&data[i], sizeof(T), hash); + } + } + } + void update_hash_with_value(size_t n, SipHash& hash) const override; void update_hashes_with_value(std::vector& hashes, @@ -332,6 +351,10 @@ class ColumnVector final : public COWHelper> uint32_t offset, const uint8_t* __restrict null_data) const override; + void update_murmurs_with_value(int32_t* __restrict hashes, PrimitiveType type, int32_t rows, + uint32_t offset, + const uint8_t* __restrict null_data) const override; + void update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const override; diff --git a/be/src/vec/runtime/partitioner.cpp b/be/src/vec/runtime/partitioner.cpp index db40610723cdb60..81085eda85f4466 100644 --- a/be/src/vec/runtime/partitioner.cpp +++ b/be/src/vec/runtime/partitioner.cpp @@ -71,6 +71,13 @@ void XXHashPartitioner::_do_hash(const ColumnPtr& column, uint64_t* column->update_hashes_with_value(result); } +template +void Murmur32HashPartitioner::_do_hash(const ColumnPtr& column, + int32_t* __restrict result, int idx) const { + column->update_murmurs_with_value(result, Base::_partition_expr_ctxs[idx]->root()->type().type, + column->size()); +} + template Status XXHashPartitioner::clone(RuntimeState* state, std::unique_ptr& partitioner) { @@ -97,6 +104,19 @@ Status Crc32HashPartitioner::clone(RuntimeState* state, return Status::OK(); } +template +Status Murmur32HashPartitioner::clone(RuntimeState* state, + std::unique_ptr& partitioner) { + auto* new_partitioner = new Murmur32HashPartitioner(Base::_partition_count); + partitioner.reset(new_partitioner); + new_partitioner->_partition_expr_ctxs.resize(Base::_partition_expr_ctxs.size()); + for (size_t i = 0; i < Base::_partition_expr_ctxs.size(); i++) { + RETURN_IF_ERROR(Base::_partition_expr_ctxs[i]->clone( + state, new_partitioner->_partition_expr_ctxs[i])); + } + return Status::OK(); +} + template class Partitioner; template class XXHashPartitioner; template class Partitioner; @@ -104,5 +124,7 @@ template class XXHashPartitioner; template class Partitioner; template class Crc32HashPartitioner; template class Crc32HashPartitioner; +template class Murmur32HashPartitioner; +template class Murmur32HashPartitioner; } // namespace doris::vectorized diff --git a/be/src/vec/runtime/partitioner.h b/be/src/vec/runtime/partitioner.h index 66ed8809d7ce7c1..92d2698c1f4c80a 100644 --- a/be/src/vec/runtime/partitioner.h +++ b/be/src/vec/runtime/partitioner.h @@ -112,5 +112,19 @@ class Crc32HashPartitioner final : public Partitioner { void _do_hash(const ColumnPtr& column, uint32_t* __restrict result, int idx) const override; }; +template +class Murmur32HashPartitioner final : public Partitioner { +public: + using Base = Partitioner; + Murmur32HashPartitioner(int partition_count) + : Partitioner(partition_count) {} + ~Murmur32HashPartitioner() override = default; + + Status clone(RuntimeState* state, std::unique_ptr& partitioner) override; + +private: + void _do_hash(const ColumnPtr& column, int32_t* __restrict result, int idx) const override; +}; + } // namespace vectorized } // namespace doris diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 4d55e0fcb064e81..1581a164e4d3c0d 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -321,6 +321,7 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int _pool(pool), _current_channel_idx(0), _part_type(sink.output_partition.type), + _hash_type(sink.output_partition.hash_type), _dest_node_id(sink.dest_node_id), _transfer_large_data_by_brpc(config::transfer_large_data_by_brpc), _serializer(this) { @@ -330,6 +331,9 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int sink.output_partition.type == TPartitionType::RANDOM || sink.output_partition.type == TPartitionType::RANGE_PARTITIONED || sink.output_partition.type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED); + DCHECK(sink.output_partition.hash_type == THashType::CRC32 || + sink.output_partition.hash_type == THashType::XXHASH64 || + sink.output_partition.hash_type == THashType::SPARK_MURMUR32); std::map fragment_id_to_channel_index; _enable_pipeline_exec = state->enable_pipeline_exec(); @@ -378,6 +382,7 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int _pool(pool), _current_channel_idx(0), _part_type(TPartitionType::UNPARTITIONED), + _hash_type(THashType::XXHASH64), _dest_node_id(dest_node_id), _serializer(this) { _cur_pb_block = &_pb_block1; @@ -410,8 +415,13 @@ Status VDataStreamSender::init(const TDataSink& tsink) { RETURN_IF_ERROR(_partitioner->init(t_stream_sink.output_partition.partition_exprs)); } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { _partition_count = _channel_shared_ptrs.size(); - _partitioner.reset( - new Crc32HashPartitioner(_channel_shared_ptrs.size())); + if (_hash_type == THashType::CRC32) { + _partitioner.reset( + new Crc32HashPartitioner(_channel_shared_ptrs.size())); + } else { + _partitioner.reset(new Murmur32HashPartitioner( + _channel_shared_ptrs.size())); + } RETURN_IF_ERROR(_partitioner->init(t_stream_sink.output_partition.partition_exprs)); } else if (_part_type == TPartitionType::RANGE_PARTITIONED) { return Status::InternalError("TPartitionType::RANGE_PARTITIONED should not be used"); diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 75a3bfd86a6cd2c..b7cae7b51e15c43 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -103,6 +103,13 @@ struct ShuffleChannelIds { } }; +struct ShufflePModChannelIds { + template + HashValueType operator()(HashValueType l, int32_t r) { + return (l % r + r) % r; + } +}; + class VDataStreamSender : public DataSink { public: friend class pipeline::ExchangeSinkOperator; @@ -177,6 +184,7 @@ class VDataStreamSender : public DataSink { int _current_channel_idx; // index of current channel to send to if _random == true TPartitionType::type _part_type; + THashType::type _hash_type; // serialized batches for broadcasting; we need two so we can write // one while the other one is still being sent diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveBucketUtil.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveBucketUtil.java index 49823afabf8d788..f0b6092b0985e8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveBucketUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveBucketUtil.java @@ -95,6 +95,9 @@ private static PrimitiveTypeInfo convertToHiveColType(PrimitiveType dorisType) t Pattern.compile("bucket_(\\d+)(_\\d+)?$"); private static final Iterable BUCKET_PATTERNS = ImmutableList.of( + // spark/parquet pattern + // format: f"part-[paritionId]-[tid]-[txnId]-[jobId]-[taskAttemptId]-[fileCount].c000.snappy.parquet" + Pattern.compile("part-\\d{5}-\\w{8}-\\w{4}-\\w{4}-\\w{4}-\\w{12}_(\\d{5})(?:[-_.].*)?"), // legacy Presto naming pattern (current version matches Hive) Pattern.compile("\\d{8}_\\d{6}_\\d{5}_[a-z0-9]{5}_bucket-(\\d+)(?:[-_.].*)?"), // Hive naming pattern per `org.apache.hadoop.hive.ql.exec.Utilities#getBucketIdFromFile()` @@ -390,7 +393,7 @@ private static int hashCodeV2(Object o, ObjectInspector objIns, ByteBuffer byteB throw new DdlException("Unknown type: " + objIns.getTypeName()); } - private static OptionalInt getBucketNumberFromPath(String name) { + public static OptionalInt getBucketNumberFromPath(String name) { for (Pattern pattern : BUCKET_PATTERNS) { Matcher matcher = pattern.matcher(name); if (matcher.matches()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveExternalDistributionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveExternalDistributionInfo.java new file mode 100644 index 000000000000000..5b15874401908a3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveExternalDistributionInfo.java @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import com.google.gson.annotations.SerializedName; + +import java.util.List; +import java.util.Objects; + +/* + * Hive Hash Distribution Info + */ +public class HiveExternalDistributionInfo extends HashDistributionInfo { + @SerializedName(value = "bucketingVersion") + private final int bucketingVersion; + + public HiveExternalDistributionInfo() { + bucketingVersion = 2; + } + + public HiveExternalDistributionInfo(int bucketNum, List distributionColumns, int bucketingVersion) { + super(bucketNum, distributionColumns); + this.bucketingVersion = bucketingVersion; + } + + public HiveExternalDistributionInfo(int bucketNum, boolean autoBucket, + List distributionColumns, int bucketingVersion) { + super(bucketNum, autoBucket, distributionColumns); + this.bucketingVersion = bucketingVersion; + } + + public int getBucketingVersion() { + return bucketingVersion; + } + + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + HiveExternalDistributionInfo that = (HiveExternalDistributionInfo) o; + return bucketNum == that.bucketNum + && sameDistributionColumns(that) + && bucketingVersion == that.bucketingVersion; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), bucketingVersion); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("type: ").append(type).append("; "); + + builder.append("distribution columns: ["); + for (Column column : getDistributionColumns()) { + builder.append(column.getName()).append(","); + } + builder.append("]; "); + + if (autoBucket) { + builder.append("bucket num: auto;"); + } else { + builder.append("bucket num: ").append(bucketNum).append(";"); + } + + builder.append("bucketingVersion: ").append(bucketingVersion).append(";"); + + return builder.toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index b284b4d60ecf03f..d48ab21c1f17ee6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -18,10 +18,14 @@ package org.apache.doris.catalog.external; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DistributionInfo; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.HiveExternalDistributionInfo; import org.apache.doris.catalog.HiveMetaStoreClientHelper; import org.apache.doris.catalog.HudiUtils; import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.RandomDistributionInfo; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; @@ -39,6 +43,7 @@ import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.commons.collections.MapUtils; @@ -52,6 +57,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.iceberg.Schema; @@ -63,6 +69,8 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.time.LocalDate; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -87,6 +95,22 @@ public class HMSExternalTable extends ExternalTable { private static final String TBL_PROP_TRANSIENT_LAST_DDL_TIME = "transient_lastDdlTime"; private static final String NUM_ROWS = "numRows"; + private static final String SPARK_BUCKET = "spark.sql.sources.schema.bucketCol."; + private static final String SPARK_NUM_BUCKET = "spark.sql.sources.schema.numBuckets"; + private static final String BUCKETING_VERSION = "bucketing_version"; + + private static final Set SUPPORTED_BUCKET_PROPERTIES; + + static { + SUPPORTED_BUCKET_PROPERTIES = Sets.newHashSet(); + SUPPORTED_BUCKET_PROPERTIES.add(SPARK_BUCKET + "0"); + SUPPORTED_BUCKET_PROPERTIES.add(SPARK_BUCKET + "1"); + SUPPORTED_BUCKET_PROPERTIES.add(SPARK_BUCKET + "2"); + SUPPORTED_BUCKET_PROPERTIES.add(SPARK_BUCKET + "3"); + SUPPORTED_BUCKET_PROPERTIES.add(SPARK_BUCKET + "4"); + SUPPORTED_BUCKET_PROPERTIES.add(SPARK_NUM_BUCKET); + SUPPORTED_BUCKET_PROPERTIES.add(BUCKETING_VERSION); + } static { SUPPORTED_HIVE_FILE_FORMATS = Sets.newHashSet(); @@ -108,8 +132,10 @@ public class HMSExternalTable extends ExternalTable { SUPPORTED_HUDI_FILE_FORMATS.add("com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat"); } - private volatile org.apache.hadoop.hive.metastore.api.Table remoteTable = null; - private List partitionColumns; + protected volatile org.apache.hadoop.hive.metastore.api.Table remoteTable = null; + protected List partitionColumns; + private List bucketColumns; + private boolean isSparkTable; private DLAType dlaType = DLAType.UNKNOWN; @@ -123,6 +149,8 @@ public enum DLAType { UNKNOWN, HIVE, HUDI, ICEBERG } + private DistributionInfo distributionInfo; + /** * Create hive metastore external table. * @@ -192,6 +220,14 @@ public boolean isHoodieCowTable() { return "org.apache.hudi.hadoop.HoodieParquetInputFormat".equals(inputFormatName); } + public boolean isSparkTable() { + return isSparkTable; + } + + public boolean isBucketedTable() { + return bucketColumns != null && !bucketColumns.isEmpty() && isSparkTable; + } + /** * Now we only support three file input format hive tables: parquet/orc/text. * Support managed_table and external_table. @@ -438,9 +474,72 @@ public List initSchema() { columns = tmpSchema; } initPartitionColumns(columns); + initBucketingColumns(columns); return columns; } + private void initBucketingColumns(List columns) { + List bucketCols = new ArrayList<>(5); + int numBuckets = getBucketColumns(bucketCols); + if (bucketCols.isEmpty() || !isSparkTable) { + bucketColumns = ImmutableList.of(); + distributionInfo = new RandomDistributionInfo(1, true); + return; + } + + int bucketingVersion = Integer.valueOf(remoteTable.getParameters().getOrDefault(BUCKETING_VERSION, + "2")); + ImmutableList.Builder bucketColBuilder = ImmutableList.builder(); + for (String colName : bucketCols) { + // do not use "getColumn()", which will cause dead loop + for (Column column : columns) { + if (colName.equalsIgnoreCase(column.getName())) { + // For partition/bucket column, if it is string type, change it to varchar(65535) + // to be same as doris managed table. + // This is to avoid some unexpected behavior such as different partition pruning result + // between doris managed table and external table. + if (column.getType().getPrimitiveType() == PrimitiveType.STRING) { + column.setType(ScalarType.createVarcharType(ScalarType.MAX_VARCHAR_LENGTH)); + } + bucketColBuilder.add(column); + break; + } + } + } + + bucketColumns = bucketColBuilder.build(); + distributionInfo = new HiveExternalDistributionInfo(numBuckets, bucketColumns, bucketingVersion); + LOG.debug("get {} bucket columns for table: {}", bucketColumns.size(), name); + } + + private int getBucketColumns(List bucketCols) { + StorageDescriptor descriptor = remoteTable.getSd(); + int numBuckets = -1; + if (descriptor.isSetBucketCols() && !descriptor.getBucketCols().isEmpty()) { + /* Hive Bucketed Table */ + bucketCols.addAll(descriptor.getBucketCols()); + numBuckets = descriptor.getNumBuckets(); + } else if (remoteTable.isSetParameters() + && !Collections.disjoint(SUPPORTED_BUCKET_PROPERTIES, remoteTable.getParameters().keySet())) { + Map parameters = remoteTable.getParameters(); + for (Map.Entry param : parameters.entrySet()) { + if (param.getKey().startsWith(SPARK_BUCKET)) { + int index = Integer.valueOf(param.getKey() + .substring(param.getKey().lastIndexOf(".") + 1)); + bucketCols.add(index, param.getValue()); + } else if (param.getKey().equals(SPARK_NUM_BUCKET)) { + numBuckets = Integer.valueOf(param.getValue()); + } + } + + if (numBuckets > 0) { + isSparkTable = true; + } + } + + return numBuckets; + } + public List getHudiSchema(List hmsSchema) { org.apache.avro.Schema hudiSchema = HiveMetaStoreClientHelper.getHudiTableSchema(this); List tmpSchema = Lists.newArrayListWithCapacity(hmsSchema.size()); @@ -544,6 +643,19 @@ public Optional getColumnStatistic(String colName) { return Optional.empty(); } + public DistributionInfo getDefaultDistributionInfo() { + makeSureInitialized(); + if (distributionInfo != null) { + return distributionInfo; + } + + return new RandomDistributionInfo(1, true); + } + + public Map getTableParameters() { + return remoteTable.getParameters(); + } + private Optional getHiveColumnStats(String colName) { List tableStats = getHiveTableColumnStats(Lists.newArrayList(colName)); if (tableStats == null || tableStats.isEmpty()) { @@ -715,14 +827,23 @@ public long getDataSize(boolean singleReplica) { @Override public boolean isDistributionColumn(String columnName) { - return getRemoteTable().getSd().getBucketCols().stream().map(String::toLowerCase) - .collect(Collectors.toSet()).contains(columnName.toLowerCase()); + Set distributeColumns = getDistributionColumnNames() + .stream().map(String::toLowerCase).collect(Collectors.toSet()); + return distributeColumns.contains(columnName.toLowerCase()); } @Override public Set getDistributionColumnNames() { - return getRemoteTable().getSd().getBucketCols().stream().map(String::toLowerCase) - .collect(Collectors.toSet()); + Set distributionColumnNames = Sets.newHashSet(); + if (distributionInfo instanceof RandomDistributionInfo) { + return distributionColumnNames; + } + HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo; + List distColumn = hashDistributionInfo.getDistributionColumns(); + for (Column column : distColumn) { + distributionColumnNames.add(column.getName().toLowerCase()); + } + return distributionColumnNames; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 72e56d09c4ec31c..08ed23e8113587b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -170,6 +170,7 @@ import org.apache.doris.planner.SortNode; import org.apache.doris.planner.TableFunctionNode; import org.apache.doris.planner.UnionNode; +import org.apache.doris.planner.external.FileQueryScanNode; import org.apache.doris.planner.external.HiveScanNode; import org.apache.doris.planner.external.MaxComputeScanNode; import org.apache.doris.planner.external.hudi.HudiScanNode; @@ -452,7 +453,8 @@ public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTransla TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, context); // TODO(cmy): determine the needCheckColumnPriv param - ScanNode scanNode; + FileQueryScanNode scanNode; + DataPartition dataPartition = DataPartition.RANDOM; if (table instanceof HMSExternalTable) { switch (((HMSExternalTable) table).getDlaType()) { case HUDI: @@ -502,8 +504,14 @@ public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTransla ) ); Utils.execWithUncheckedException(scanNode::finalizeForNereids); + if (fileScan.getDistributionSpec() instanceof DistributionSpecHash) { + DistributionSpecHash distributionSpecHash = (DistributionSpecHash) fileScan.getDistributionSpec(); + List partitionExprs = distributionSpecHash.getOrderedShuffledColumns().stream() + .map(context::findSlotRef).collect(Collectors.toList()); + dataPartition = new DataPartition(TPartitionType.HASH_PARTITIONED, + partitionExprs, scanNode.getHashType()); + } // Create PlanFragment - DataPartition dataPartition = DataPartition.RANDOM; PlanFragment planFragment = createPlanFragment(scanNode, dataPartition, fileScan); context.addPlanFragment(planFragment); updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), fileScan); @@ -2283,7 +2291,7 @@ private DataPartition toDataPartition(DistributionSpec distributionSpec, throw new RuntimeException("Do not support shuffle type: " + distributionSpecHash.getShuffleType()); } - return new DataPartition(partitionType, partitionExprs); + return new DataPartition(partitionType, partitionExprs); //todo(NK): check for lakehouse path here } else { throw new RuntimeException("Unknown DistributionSpec: " + distributionSpec); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java index 3b07f2bbe985b96..68968eb51386642 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java @@ -131,7 +131,7 @@ public PhysicalProperties visitPhysicalEsScan(PhysicalEsScan esScan, PlanContext @Override public PhysicalProperties visitPhysicalFileScan(PhysicalFileScan fileScan, PlanContext context) { - return PhysicalProperties.STORAGE_ANY; + return new PhysicalProperties(fileScan.getDistributionSpec()); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java index d86e1d1667e18a6..73f4601fe35bc48 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java @@ -17,11 +17,26 @@ package org.apache.doris.nereids.rules.implementation; -import org.apache.doris.nereids.properties.DistributionSpecAny; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DistributionInfo; +import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.external.HMSExternalTable; +import org.apache.doris.nereids.properties.DistributionSpec; +import org.apache.doris.nereids.properties.DistributionSpecHash; +import org.apache.doris.nereids.properties.DistributionSpecStorageAny; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.trees.expressions.ExprId; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan; +import com.google.common.collect.Lists; + +import java.util.Collections; +import java.util.List; import java.util.Optional; /** @@ -35,7 +50,7 @@ public Rule build() { fileScan.getRelationId(), fileScan.getTable(), fileScan.getQualifier(), - DistributionSpecAny.INSTANCE, + convertDistribution(fileScan), Optional.empty(), fileScan.getLogicalProperties(), fileScan.getConjuncts(), @@ -43,4 +58,30 @@ public Rule build() { fileScan.getTableSample()) ).toRule(RuleType.LOGICAL_FILE_SCAN_TO_PHYSICAL_FILE_SCAN_RULE); } + + private DistributionSpec convertDistribution(LogicalFileScan fileScan) { + TableIf table = fileScan.getTable(); + if (!(table instanceof HMSExternalTable)) { + return DistributionSpecStorageAny.INSTANCE; + } + + HMSExternalTable hmsExternalTable = (HMSExternalTable) table; + DistributionInfo distributionInfo = hmsExternalTable.getDefaultDistributionInfo(); + if (distributionInfo instanceof HashDistributionInfo) { + HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo; + List output = fileScan.getOutput(); + List hashColumns = Lists.newArrayList(); + for (Slot slot : output) { + for (Column column : hashDistributionInfo.getDistributionColumns()) { + if (((SlotReference) slot).getColumn().get().equals(column)) { + hashColumns.add(slot.getExprId()); + } + } + } + return new DistributionSpecHash(hashColumns, DistributionSpecHash.ShuffleType.NATURAL, + fileScan.getTable().getId(), -1, Collections.emptySet()); + } + + return DistributionSpecStorageAny.INSTANCE; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java index 753520869408ec2..0887e4aa3a0b30a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java @@ -26,6 +26,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.thrift.TDataPartition; import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.THashType; import org.apache.doris.thrift.TPartitionType; import com.google.common.base.Joiner; @@ -53,11 +54,16 @@ public class DataPartition { public static final DataPartition RANDOM = new DataPartition(TPartitionType.RANDOM); private final TPartitionType type; + private final THashType hashType; // for hash partition: exprs used to compute hash value private ImmutableList partitionExprs = ImmutableList.of(); public DataPartition(TPartitionType type, List exprs) { + this(type, exprs, THashType.XXHASH64); + } + + public DataPartition(TPartitionType type, List exprs, THashType hashType) { Preconditions.checkNotNull(exprs); Preconditions.checkState(!exprs.isEmpty()); Preconditions.checkState(type == TPartitionType.HASH_PARTITIONED @@ -65,6 +71,7 @@ public DataPartition(TPartitionType type, List exprs) { || type == TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED); this.type = type; this.partitionExprs = ImmutableList.copyOf(exprs); + this.hashType = hashType; } public void substitute(ExprSubstitutionMap smap, Analyzer analyzer) throws AnalysisException { @@ -76,10 +83,15 @@ public DataPartition(TPartitionType type) { Preconditions.checkState(type == TPartitionType.UNPARTITIONED || type == TPartitionType.RANDOM); this.type = type; this.partitionExprs = ImmutableList.of(); + this.hashType = THashType.CRC32; + } + + public static DataPartition hashPartitioned(List exprs, THashType hashType) { + return new DataPartition(TPartitionType.HASH_PARTITIONED, exprs, hashType); } public static DataPartition hashPartitioned(List exprs) { - return new DataPartition(TPartitionType.HASH_PARTITIONED, exprs); + return new DataPartition(TPartitionType.HASH_PARTITIONED, exprs, THashType.XXHASH64); } public boolean isPartitioned() { @@ -103,6 +115,7 @@ public TDataPartition toThrift() { if (partitionExprs != null) { result.setPartitionExprs(Expr.treesToThrift(partitionExprs)); } + result.setHashType(hashType); return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index a719081496b05ba..7665ecab56e2119 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -33,18 +33,23 @@ import org.apache.doris.catalog.DistributionInfo; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.HiveExternalDistributionInfo; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; +import org.apache.doris.planner.external.HiveScanNode; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.SessionVariable; +import org.apache.doris.thrift.THashType; import org.apache.doris.thrift.TPartitionType; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.hive.common.util.Ref; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -286,6 +291,10 @@ private PlanFragment createScanFragment(PlanNode node) throws UserException { OlapScanNode olapScanNode = (OlapScanNode) node; return new PlanFragment(ctx.getNextFragmentId(), node, olapScanNode.constructInputPartitionByDistributionInfo(), DataPartition.RANDOM); + } else if (node instanceof HiveScanNode) { + HiveScanNode hiveScanNode = (HiveScanNode) node; + return new PlanFragment(ctx.getNextFragmentId(), node, + hiveScanNode.constructInputPartitionByDistributionInfo(), DataPartition.RANDOM); } else { // other scan nodes are random partitioned: es, broker return new PlanFragment(ctx.getNextFragmentId(), node, DataPartition.RANDOM); @@ -328,10 +337,12 @@ private PlanFragment createHashJoinFragment( // bucket shuffle join is better than broadcast and shuffle join // it can reduce the network cost of join, so doris chose it first List rhsPartitionExprs = Lists.newArrayList(); - if (canBucketShuffleJoin(node, leftChildFragment, rhsPartitionExprs)) { + Ref hashType = Ref.from(THashType.CRC32); + if (canBucketShuffleJoin(node, leftChildFragment, rhsPartitionExprs, hashType)) { node.setDistributionMode(HashJoinNode.DistributionMode.BUCKET_SHUFFLE); DataPartition rhsJoinPartition = - new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, rhsPartitionExprs); + new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, + rhsPartitionExprs, hashType.value); ExchangeNode rhsExchange = new ExchangeNode(ctx.getNextNodeId(), rightChildFragment.getPlanRoot(), false); rhsExchange.setNumInstances(rightChildFragment.getPlanRoot().getNumInstances()); @@ -601,7 +612,7 @@ private boolean dataDistributionMatchEqPredicate(List eqJoinPre } private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFragment, - List rhsHashExprs) { + List rhsHashExprs, Ref hashType) { if (node.getJoinOp() == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) { return false; } @@ -617,7 +628,9 @@ private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFr PlanNode leftRoot = leftChildFragment.getPlanRoot(); // 1.leftRoot be OlapScanNode if (leftRoot instanceof OlapScanNode) { - return canBucketShuffleJoin(node, leftRoot, rhsHashExprs); + return canBucketShuffleJoin(node, (OlapScanNode) leftRoot, rhsHashExprs); + } else if (leftRoot instanceof HiveScanNode) { + return canBucketShuffleJoin(node, (HiveScanNode) leftRoot, rhsHashExprs, hashType); } // 2.leftRoot be hashjoin node @@ -626,17 +639,83 @@ private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFr leftRoot = leftRoot.getChild(0); } if (leftRoot instanceof OlapScanNode) { - return canBucketShuffleJoin(node, leftRoot, rhsHashExprs); + return canBucketShuffleJoin(node, (OlapScanNode) leftRoot, rhsHashExprs); + } else if (leftRoot instanceof HiveScanNode) { + return canBucketShuffleJoin(node, (HiveScanNode) leftRoot, rhsHashExprs, hashType); } } return false; } + private boolean canBucketShuffleJoin(HashJoinNode node, HiveScanNode leftScanNode, + List rhsJoinExprs, Ref hashType) { + HMSExternalTable leftTable = leftScanNode.getHiveTable(); + + DistributionInfo leftDistribution = leftTable.getDefaultDistributionInfo(); + if (leftDistribution == null || !(leftDistribution instanceof HiveExternalDistributionInfo)) { + return false; + } + + HiveExternalDistributionInfo hiveDistributionInfo = (HiveExternalDistributionInfo) leftDistribution; + + List leftDistributeColumns = hiveDistributionInfo.getDistributionColumns(); + List leftDistributeColumnNames = leftDistributeColumns.stream() + .map(col -> leftTable.getName() + "." + col.getName().toLowerCase()).collect(Collectors.toList()); + + List leftJoinColumnNames = new ArrayList<>(); + List rightExprs = new ArrayList<>(); + List eqJoinConjuncts = node.getEqJoinConjuncts(); + + for (BinaryPredicate eqJoinPredicate : eqJoinConjuncts) { + Expr lhsJoinExpr = eqJoinPredicate.getChild(0); + Expr rhsJoinExpr = eqJoinPredicate.getChild(1); + if (lhsJoinExpr.unwrapSlotRef() == null || rhsJoinExpr.unwrapSlotRef() == null) { + continue; + } + + SlotRef leftSlot = node.getChild(0).findSrcSlotRef(lhsJoinExpr.unwrapSlotRef()); + if (leftSlot.getTable() instanceof HMSExternalTable + && leftScanNode.desc.getSlots().contains(leftSlot.getDesc())) { + // table name in SlotRef is not the really name. `select * from test as t` + // table name in SlotRef is `t`, but here we need is `test`. + leftJoinColumnNames.add(leftSlot.getTable().getName() + "." + + leftSlot.getColumnName().toLowerCase()); + rightExprs.add(rhsJoinExpr); + } + } + + //2 the join columns should contains all left table distribute columns to enable bucket shuffle join + for (int i = 0; i < leftDistributeColumnNames.size(); i++) { + String distributeColumnName = leftDistributeColumnNames.get(i); + boolean findRhsExprs = false; + // check the join column name is same as distribute column name and + // check the rhs join expr type is same as distribute column + for (int j = 0; j < leftJoinColumnNames.size(); j++) { + if (leftJoinColumnNames.get(j).equals(distributeColumnName)) { + // varchar and string type don't need to check the length property + if ((rightExprs.get(j).getType().isVarcharOrStringType() + && leftDistributeColumns.get(i).getType().isVarcharOrStringType()) + || (rightExprs.get(j).getType().equals(leftDistributeColumns.get(i).getType()))) { + rhsJoinExprs.add(rightExprs.get(j)); + findRhsExprs = true; + break; + } + } + } + + if (!findRhsExprs) { + return false; + } + } + + hashType.value = THashType.SPARK_MURMUR32; + return true; + } + //the join expr must contian left table distribute column - private boolean canBucketShuffleJoin(HashJoinNode node, PlanNode leftRoot, + private boolean canBucketShuffleJoin(HashJoinNode node, OlapScanNode leftScanNode, List rhsJoinExprs) { - OlapScanNode leftScanNode = ((OlapScanNode) leftRoot); OlapTable leftTable = leftScanNode.getOlapTable(); //1 the left table has more than one partition or left table is not a stable colocate table diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 84b54f7e4de5c28..cc7145edd4f811e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -72,6 +72,7 @@ import org.apache.doris.system.Backend; import org.apache.doris.thrift.TColumn; import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.THashType; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TOlapScanNode; import org.apache.doris.thrift.TOlapTableIndex; @@ -1601,7 +1602,7 @@ public DataPartition constructInputPartitionByDistributionInfo() throws UserExce SlotRef slotRef = new SlotRef(desc.getRef().getName(), column.getName()); dataDistributeExprs.add(slotRef); } - return DataPartition.hashPartitioned(dataDistributeExprs); + return DataPartition.hashPartitioned(dataDistributeExprs, THashType.CRC32); } else { return DataPartition.RANDOM; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java index 9e23463235f2d3a..fcf53f22925b4a1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java @@ -55,6 +55,7 @@ public class FederationBackendPolicy { private final Map> backendMap = Maps.newHashMap(); private final SecureRandom random = new SecureRandom(); private ConsistentHash consistentHash; + private ConsistentHash consistentBucket; private int nextBe = 0; private boolean initialized = false; @@ -98,6 +99,8 @@ public void init(BeSelectionPolicy policy) throws UserException { backendMap.putAll(backends.stream().collect(Collectors.groupingBy(Backend::getHost))); consistentHash = new ConsistentHash<>(Hashing.murmur3_128(), new ScanRangeHash(), new BackendHash(), backends, Config.virtual_node_number); + consistentBucket = new ConsistentHash<>(Hashing.murmur3_128(), new BucketHash(), + new BackendHash(), backends, Config.virtual_node_number); } public Backend getNextBe() { @@ -110,6 +113,10 @@ public Backend getNextConsistentBe(TScanRangeLocations scanRangeLocations) { return consistentHash.getNode(scanRangeLocations); } + public Backend getNextConsistentBe(Integer bucketId) { + return consistentBucket.getNode(bucketId); + } + // Try to find a local BE, if not exists, use `getNextBe` instead public Backend getNextLocalBe(List hosts) { List candidateBackends = Lists.newArrayListWithCapacity(hosts.size()); @@ -151,4 +158,11 @@ public void funnel(TScanRangeLocations scanRange, PrimitiveSink primitiveSink) { } } } + + private static class BucketHash implements Funnel { + @Override + public void funnel(Integer bucketId, PrimitiveSink primitiveSink) { + primitiveSink.putLong(bucketId); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index eac945ced35e958..fa746830b7fef98 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -27,8 +27,10 @@ import org.apache.doris.catalog.FsBroker; import org.apache.doris.catalog.FunctionGenTable; import org.apache.doris.catalog.HdfsResource; +import org.apache.doris.catalog.HiveBucketUtil; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.external.ExternalTable; +import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.NotImplementedException; @@ -39,6 +41,7 @@ import org.apache.doris.datasource.hive.AcidInfo; import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo; import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; +import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.external.hudi.HudiScanNode; import org.apache.doris.planner.external.hudi.HudiSplit; @@ -60,6 +63,7 @@ import org.apache.doris.thrift.TFileScanRangeParams; import org.apache.doris.thrift.TFileScanSlotInfo; import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.THashType; import org.apache.doris.thrift.THdfsParams; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TScanRange; @@ -72,6 +76,7 @@ import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import lombok.Getter; @@ -96,6 +101,8 @@ public abstract class FileQueryScanNode extends FileScanNode { protected Map destSlotDescByName; protected TFileScanRangeParams params; + public ArrayListMultimap bucketSeq2locations = ArrayListMultimap.create(); + @Getter protected TableSample tableSample; @@ -329,6 +336,13 @@ public void createScanRangeLocations() throws UserException { HiveSplit hiveSplit = (HiveSplit) split; isACID = hiveSplit.isACID(); } + + boolean isBucketedHiveTable = false; + TableIf targetTable = getTargetTable(); + if (targetTable instanceof HMSExternalTable) { + isBucketedHiveTable = ((HMSExternalTable) targetTable).isBucketedTable(); + } + List partitionValuesFromPath = fileSplit.getPartitionValues() == null ? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys, false, isACID) : fileSplit.getPartitionValues(); @@ -372,7 +386,11 @@ public void createScanRangeLocations() throws UserException { curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); TScanRangeLocation location = new TScanRangeLocation(); Backend selectedBackend; - if (enableSqlCache) { + int bucketNum = 0; + if (isBucketedHiveTable) { + bucketNum = HiveBucketUtil.getBucketNumberFromPath(fileSplit.getPath().getName()).getAsInt(); + selectedBackend = backendPolicy.getNextConsistentBe(bucketNum); + } else if (enableSqlCache) { // Use consistent hash to assign the same scan range into the same backend among different queries selectedBackend = backendPolicy.getNextConsistentBe(curLocations); } else if (enableShortCircuitRead) { @@ -385,9 +403,11 @@ public void createScanRangeLocations() throws UserException { location.setBackendId(selectedBackend.getId()); location.setServer(new TNetworkAddress(selectedBackend.getHost(), selectedBackend.getBePort())); curLocations.addToLocations(location); + LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}", curLocations.getLocations().get(0).getBackendId(), fileSplit.getPath(), fileSplit.getStart(), fileSplit.getLength(), Joiner.on("|").join(fileSplit.getHosts())); + bucketSeq2locations.put(bucketNum, curLocations); scanRangeLocations.add(curLocations); this.totalFileSize += fileSplit.getLength(); } @@ -519,6 +539,14 @@ protected static Optional getTFileType(String location) { } return Optional.empty(); } + + public DataPartition constructInputPartitionByDistributionInfo() { + return DataPartition.RANDOM; + } + + public THashType getHashType() { + return THashType.CRC32; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java index 943d30017e7c2aa..a0f2cf8770c1621 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java @@ -17,11 +17,16 @@ package org.apache.doris.planner.external; +import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.FunctionCallExpr; import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DistributionInfo; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.HiveExternalDistributionInfo; import org.apache.doris.catalog.HiveMetaStoreClientHelper; import org.apache.doris.catalog.ListPartitionItem; import org.apache.doris.catalog.PartitionItem; @@ -40,6 +45,7 @@ import org.apache.doris.datasource.hive.HiveTransaction; import org.apache.doris.datasource.hive.HiveVersionUtil; import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; +import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.external.HiveSplit.HiveSplitCreator; @@ -51,6 +57,7 @@ import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileTextScanRangeParams; import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.THashType; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -421,4 +428,37 @@ protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws User } return compressType; } + + @Override + public DataPartition constructInputPartitionByDistributionInfo() { + if (hmsTable.isBucketedTable()) { + DistributionInfo distributionInfo = hmsTable.getDefaultDistributionInfo(); + if (!(distributionInfo instanceof HashDistributionInfo)) { + return DataPartition.RANDOM; + } + List distributeColumns = ((HiveExternalDistributionInfo) distributionInfo).getDistributionColumns(); + List dataDistributeExprs = Lists.newArrayList(); + for (Column column : distributeColumns) { + SlotRef slotRef = new SlotRef(desc.getRef().getName(), column.getName()); + dataDistributeExprs.add(slotRef); + } + return DataPartition.hashPartitioned(dataDistributeExprs, THashType.SPARK_MURMUR32); + } + + return DataPartition.RANDOM; + } + + public HMSExternalTable getHiveTable() { + return hmsTable; + } + + @Override + public THashType getHashType() { + if (hmsTable.isBucketedTable() + && hmsTable.getDefaultDistributionInfo() instanceof HashDistributionInfo) { + return THashType.SPARK_MURMUR32; + } + + return THashType.CRC32; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index f3622b62fbbb29b..a346c6197fb2ff0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.FsBroker; import org.apache.doris.common.Config; +import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.Pair; import org.apache.doris.common.Reference; import org.apache.doris.common.Status; @@ -60,6 +61,7 @@ import org.apache.doris.planner.external.ExternalScanNode; import org.apache.doris.planner.external.FileQueryScanNode; import org.apache.doris.planner.external.FileScanNode; +import org.apache.doris.planner.external.HiveScanNode; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.InternalService.PExecPlanFragmentResult; import org.apache.doris.proto.InternalService.PExecPlanFragmentStartRequest; @@ -2216,8 +2218,13 @@ private void computeScanRangeAssignment() throws Exception { computeScanRangeAssignmentByColocate((OlapScanNode) scanNode, assignedBytesPerHost, replicaNumPerHost); } if (fragmentContainsBucketShuffleJoin) { - bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) scanNode, - idToBackend, addressToBackendID, replicaNumPerHost); + if (scanNode instanceof OlapScanNode) { + bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) scanNode, + idToBackend, addressToBackendID, replicaNumPerHost); + } else if (scanNode instanceof HiveScanNode) { + bucketShuffleJoinController.computeScanRangeAssignmentByBucket((HiveScanNode) scanNode, + idToBackend, addressToBackendID, replicaNumPerHost); + } } if (!(fragmentContainsColocateJoin || fragmentContainsBucketShuffleJoin)) { computeScanRangeAssignmentByScheduler(scanNode, locations, assignment, assignedBytesPerHost, @@ -2789,6 +2796,50 @@ private void computeScanRangeAssignmentByBucket( } } + private void computeScanRangeAssignmentByBucket( + final HiveScanNode scanNode, ImmutableMap idToBackend, + Map addressToBackendID, + Map replicaNumPerHost) throws Exception { + if (!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) { + int bucketNum = 0; + if (scanNode.getHiveTable().isBucketedTable()) { + bucketNum = scanNode.getHiveTable().getDefaultDistributionInfo().getBucketNum(); + } else { + throw new NotImplementedException("bucket shuffle for non-bucketed table not supported"); + } + fragmentIdToBucketNumMap.put(scanNode.getFragmentId(), bucketNum); + fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new HashMap<>()); + fragmentIdBucketSeqToScanRangeMap.put(scanNode.getFragmentId(), new BucketSeqToScanRange()); + fragmentIdToBuckendIdBucketCountMap.put(scanNode.getFragmentId(), new HashMap<>()); + } + Map bucketSeqToAddress + = fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId()); + BucketSeqToScanRange bucketSeqToScanRange = fragmentIdBucketSeqToScanRangeMap.get(scanNode.getFragmentId()); + + for (Integer bucketSeq : scanNode.bucketSeq2locations.keySet()) { + //fill scanRangeParamsList + List locations = scanNode.bucketSeq2locations.get(bucketSeq); + if (!bucketSeqToAddress.containsKey(bucketSeq)) { + getExecHostPortForFragmentIDAndBucketSeq(locations.get(0), scanNode.getFragmentId(), + bucketSeq, idToBackend, addressToBackendID, replicaNumPerHost); + } + + for (TScanRangeLocations location : locations) { + Map> scanRanges = + findOrInsert(bucketSeqToScanRange, bucketSeq, new HashMap<>()); + + List scanRangeParamsList = + findOrInsert(scanRanges, scanNode.getId().asInt(), new ArrayList<>()); + + // add scan range + TScanRangeParams scanRangeParams = new TScanRangeParams(); + scanRangeParams.scan_range = location.scan_range; + scanRangeParamsList.add(scanRangeParams); + updateScanRangeNumByScanRange(scanRangeParams); + } + } + } + private void computeInstanceParam(PlanFragmentId fragmentId, int parallelExecInstanceNum, FragmentExecParams params) { Map bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(fragmentId); diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java index ca8109e40e3070b..19b53529a3a4654 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java @@ -40,6 +40,7 @@ import org.apache.doris.planner.ScanNode; import org.apache.doris.service.FrontendOptions; import org.apache.doris.system.Backend; +import org.apache.doris.thrift.THashType; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPartitionType; import org.apache.doris.thrift.TScanRangeLocation; @@ -174,7 +175,7 @@ public void testIsBucketShuffleJoin() { new ArrayList<>()); hashJoinNode.setFragment(new PlanFragment(new PlanFragmentId(-1), hashJoinNode, - new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs))); + new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs, THashType.CRC32))); // hash join node is not bucket shuffle join Assert.assertEquals(false, @@ -182,13 +183,13 @@ public void testIsBucketShuffleJoin() { // the fragment id is different from hash join node hashJoinNode.setFragment(new PlanFragment(new PlanFragmentId(-2), hashJoinNode, - new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs))); + new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs, THashType.CRC32))); hashJoinNode.setDistributionMode(HashJoinNode.DistributionMode.BUCKET_SHUFFLE); Assert.assertEquals(false, Deencapsulation.invoke(bucketShuffleJoinController, "isBucketShuffleJoin", -1, hashJoinNode)); hashJoinNode.setFragment(new PlanFragment(new PlanFragmentId(-1), hashJoinNode, - new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs))); + new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs, THashType.CRC32))); Assert.assertEquals(true, Deencapsulation.invoke(bucketShuffleJoinController, "isBucketShuffleJoin", -1, hashJoinNode)); diff --git a/gensrc/thrift/Ddl.thrift b/gensrc/thrift/Ddl.thrift index 9696230af909edc..f733637bc7791a6 100644 --- a/gensrc/thrift/Ddl.thrift +++ b/gensrc/thrift/Ddl.thrift @@ -76,10 +76,6 @@ enum TAggType { // 4: optional string default_value //} -enum THashType { - CRC32 -} - // random partition info struct TRandomPartitionDesc { } @@ -93,7 +89,7 @@ struct THashPartitionDesc { 2: required i32 hash_buckets // type to compute hash value. if not set, use CRC32 - 3: optional THashType hash_type + 3: optional Partitions.THashType hash_type } // value used to represents one column value in one range value diff --git a/gensrc/thrift/Partitions.thrift b/gensrc/thrift/Partitions.thrift index 8eecbb417b1184a..a36b33a6836cbda 100644 --- a/gensrc/thrift/Partitions.thrift +++ b/gensrc/thrift/Partitions.thrift @@ -21,6 +21,12 @@ namespace java org.apache.doris.thrift include "Exprs.thrift" include "Types.thrift" +enum THashType { + CRC32, + XXHASH64, + SPARK_MURMUR32 +} + enum TPartitionType { UNPARTITIONED, @@ -87,6 +93,7 @@ struct TDataPartition { 1: required TPartitionType type 2: optional list partition_exprs 3: optional list partition_infos + 4: optional THashType hash_type }