From f9e42ab023aa153dc2c4f8ce57febe2887737296 Mon Sep 17 00:00:00 2001 From: Nitin Kashyap Date: Mon, 27 Nov 2023 13:20:53 +0530 Subject: [PATCH] [feature](datalake) Add BucketShuffleJoin support for Hive table data generated by Spark. (27783) 1. Original planner updated to consider BucketShuffle for bucketed hive table 2. Neerids planner updated for bucketShuffle join on hive tables. 3. Added spark style hash calculation in BE for shuffle on one side. --- be/src/util/hash_util.hpp | 8 + be/src/vec/columns/column.h | 29 ++++ be/src/vec/columns/column_array.cpp | 53 +++++++ be/src/vec/columns/column_array.h | 6 + be/src/vec/columns/column_const.cpp | 18 +++ be/src/vec/columns/column_const.h | 9 ++ be/src/vec/columns/column_decimal.cpp | 50 +++++++ be/src/vec/columns/column_decimal.h | 16 +- be/src/vec/columns/column_map.cpp | 56 +++++++ be/src/vec/columns/column_map.h | 6 + be/src/vec/columns/column_nullable.cpp | 37 +++++ be/src/vec/columns/column_nullable.h | 5 + be/src/vec/columns/column_string.cpp | 23 +++ be/src/vec/columns/column_string.h | 23 +++ be/src/vec/columns/column_struct.cpp | 15 ++ be/src/vec/columns/column_struct.h | 6 + be/src/vec/columns/column_vector.cpp | 35 +++++ be/src/vec/columns/column_vector.h | 22 +++ be/src/vec/runtime/partitioner.cpp | 22 +++ be/src/vec/runtime/partitioner.h | 14 ++ be/src/vec/sink/vdata_stream_sender.cpp | 14 +- be/src/vec/sink/vdata_stream_sender.h | 8 + be/src/vec/utils/util.hpp | 4 + .../apache/doris/catalog/HiveBucketUtil.java | 5 +- .../catalog/HiveExternalDistributionInfo.java | 95 ++++++++++++ .../catalog/external/HMSExternalTable.java | 141 +++++++++++++++++- .../translator/PhysicalPlanTranslator.java | 14 +- .../ChildOutputPropertyDeriver.java | 2 +- .../LogicalFileScanToPhysicalFileScan.java | 45 +++++- .../apache/doris/planner/DataPartition.java | 15 +- .../doris/planner/DistributedPlanner.java | 93 +++++++++++- .../apache/doris/planner/OlapScanNode.java | 3 +- .../external/FederationBackendPolicy.java | 14 ++ .../planner/external/FileQueryScanNode.java | 30 +++- .../doris/planner/external/HiveScanNode.java | 40 +++++ .../java/org/apache/doris/qe/Coordinator.java | 55 ++++++- .../org/apache/doris/qe/CoordinatorTest.java | 7 +- gensrc/thrift/Ddl.thrift | 6 +- gensrc/thrift/Partitions.thrift | 8 + 39 files changed, 1016 insertions(+), 36 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/catalog/HiveExternalDistributionInfo.java diff --git a/be/src/util/hash_util.hpp b/be/src/util/hash_util.hpp index afa8a1453864aa4..d73863ff97cf824 100644 --- a/be/src/util/hash_util.hpp +++ b/be/src/util/hash_util.hpp @@ -133,6 +133,9 @@ class HashUtil { // refer to https://github.com/apache/commons-codec/blob/master/src/main/java/org/apache/commons/codec/digest/MurmurHash3.java static const uint32_t MURMUR3_32_SEED = 104729; + // refer https://github.com/apache/spark/blob/v3.5.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala#L615 + static const uint32_t SPARK_MURMUR_32_SEED = 42; + // modify from https://github.com/aappleby/smhasher/blob/master/src/MurmurHash3.cpp static uint32_t murmur_hash3_32(const void* key, int32_t len, uint32_t seed) { uint32_t out = 0; @@ -140,6 +143,11 @@ class HashUtil { return out; } + static uint32_t murmur_hash3_32_null(uint32_t seed) { + static const int INT_VALUE = 0; + return murmur_hash3_32((const unsigned char*)(&INT_VALUE), 4, seed); + } + static const int MURMUR_R = 47; // Murmur2 hash implementation returning 64-bit hashes. diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h index 9b49cce1c21e825..89fa6033b796a12 100644 --- a/be/src/vec/columns/column.h +++ b/be/src/vec/columns/column.h @@ -71,6 +71,19 @@ class SipHash; } \ } +#define DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL(SEED) \ + if (null_data == nullptr) { \ + for (size_t i = 0; i < s; i++) { \ + hashes[i] = HashUtil::murmur_hash3_32(&data[i], sizeof(T), SEED); \ + } \ + } else { \ + for (size_t i = 0; i < s; i++) { \ + if (null_data[i] == 0) { \ + hashes[i] = HashUtil::murmur_hash3_32(&data[i], sizeof(T), SEED); \ + } \ + } \ + } + namespace doris::vectorized { class Arena; @@ -410,6 +423,22 @@ class IColumn : public COW { LOG(FATAL) << get_name() << " update_crc_with_value not supported"; } + /// Update state of crc32 hash function with value of n elements to avoid the virtual function call + /// null_data to mark whether need to do hash compute, null_data == nullptr + /// means all element need to do hash function, else only *null_data != 0 need to do hash func + virtual void update_murmurs_with_value(uint32_t* __restrict hash, PrimitiveType type, + uint32_t rows, uint32_t offset = 0, + const uint8_t* __restrict null_data = nullptr) const { + LOG(FATAL) << get_name() << "update_murmurs_with_value not supported"; + } + + // use range for one hash value to avoid virtual function call in loop + virtual void update_murmur_with_value(size_t start, size_t end, uint32_t& hash, + const uint8_t* __restrict null_data) const { + LOG(FATAL) << get_name() << " update_murmur_with_value not supported"; + } + + /** Removes elements that don't match the filter. * Is used in WHERE and HAVING operations. * If result_size_hint > 0, then makes advance reserve(result_size_hint) for the result column; diff --git a/be/src/vec/columns/column_array.cpp b/be/src/vec/columns/column_array.cpp index 98fb480dd1a3a08..d9c3a43585836e7 100644 --- a/be/src/vec/columns/column_array.cpp +++ b/be/src/vec/columns/column_array.cpp @@ -374,6 +374,59 @@ void ColumnArray::update_crcs_with_value(uint32_t* __restrict hash, PrimitiveTyp } } +// for every array row calculate murmurHash +void ColumnArray::update_murmur_with_value(size_t start, size_t end, uint32_t& hash, + const uint8_t* __restrict null_data) const { + auto& offsets_column = get_offsets(); + if (null_data) { + for (size_t i = start; i < end; ++i) { + if (null_data[i] == 0) { + size_t elem_size = offsets_column[i] - offsets_column[i - 1]; + if (elem_size == 0) { + hash = HashUtil::murmur_hash3_32(reinterpret_cast(&elem_size), + sizeof(elem_size), + HashUtil::SPARK_MURMUR_32_SEED); + } else { + get_data().update_murmur_with_value(offsets_column[i - 1], offsets_column[i], + hash, nullptr); + } + } + } + } else { + for (size_t i = start; i < end; ++i) { + size_t elem_size = offsets_column[i] - offsets_column[i - 1]; + if (elem_size == 0) { + hash = HashUtil::murmur_hash3_32(reinterpret_cast(&elem_size), + sizeof(elem_size), + HashUtil::SPARK_MURMUR_32_SEED); + } else { + get_data().update_murmur_with_value(offsets_column[i - 1], offsets_column[i], + hash, nullptr); + } + } + } +} + +void ColumnArray::update_murmurs_with_value(uint32_t* __restrict hash, PrimitiveType type, + uint32_t rows, uint32_t offset, + const uint8_t* __restrict null_data) const { + auto s = rows; + DCHECK(s == size()); + + if (null_data) { + for (size_t i = 0; i < s; ++i) { + // every row + if (null_data[i] == 0) { + update_murmur_with_value(i, i + 1, hash[i], nullptr); + } + } + } else { + for (size_t i = 0; i < s; ++i) { + update_murmur_with_value(i, i + 1, hash[i], nullptr); + } + } +} + void ColumnArray::insert(const Field& x) { const Array& array = doris::vectorized::get(x); size_t size = array.size(); diff --git a/be/src/vec/columns/column_array.h b/be/src/vec/columns/column_array.h index 95fd46333478351..776a119457e4b78 100644 --- a/be/src/vec/columns/column_array.h +++ b/be/src/vec/columns/column_array.h @@ -141,6 +141,8 @@ class ColumnArray final : public COWHelper { const uint8_t* __restrict null_data) const override; void update_crc_with_value(size_t start, size_t end, uint32_t& hash, const uint8_t* __restrict null_data) const override; + void update_murmur_with_value(size_t start, size_t end, uint32_t& hash, + const uint8_t* __restrict null_data) const override; void update_hashes_with_value(std::vector& hashes, const uint8_t* __restrict null_data) const override; @@ -152,6 +154,10 @@ class ColumnArray final : public COWHelper { uint32_t offset = 0, const uint8_t* __restrict null_data = nullptr) const override; + void update_murmurs_with_value(uint32_t* __restrict hash, PrimitiveType type, uint32_t rows, + uint32_t offset = 0, + const uint8_t* __restrict null_data = nullptr) const override; + void insert_range_from(const IColumn& src, size_t start, size_t length) override; void insert(const Field& x) override; void insert_from(const IColumn& src_, size_t n) override; diff --git a/be/src/vec/columns/column_const.cpp b/be/src/vec/columns/column_const.cpp index 3fb851b2a9c8dd6..c0cd7b14a4cd0ec 100644 --- a/be/src/vec/columns/column_const.cpp +++ b/be/src/vec/columns/column_const.cpp @@ -148,6 +148,24 @@ void ColumnConst::update_hashes_with_value(uint64_t* __restrict hashes, } } +void ColumnConst::update_murmurs_with_value(uint32_t* __restrict hashes, doris::PrimitiveType type, + uint32_t rows, uint32_t offset, + const uint8_t* __restrict null_data) const { + DCHECK(null_data == nullptr); + DCHECK(rows == size()); + auto real_data = data->get_data_at(0); + if (real_data.data == nullptr) { + for (int i = 0; i < rows; ++i) { + hashes[i] = HashUtil::murmur_hash3_32_null(HashUtil::SPARK_MURMUR_32_SEED); + } + } else { + for (int i = 0; i < rows; ++i) { + hashes[i] = HashUtil::murmur_hash3_32(real_data.data, real_data.size, + HashUtil::SPARK_MURMUR_32_SEED); + } + } +} + MutableColumns ColumnConst::scatter(ColumnIndex num_columns, const Selector& selector) const { if (s != selector.size()) { LOG(FATAL) << fmt::format("Size of selector ({}) doesn't match size of column ({})", diff --git a/be/src/vec/columns/column_const.h b/be/src/vec/columns/column_const.h index 280d2de8344a72f..db4e7e23f4834c1 100644 --- a/be/src/vec/columns/column_const.h +++ b/be/src/vec/columns/column_const.h @@ -171,6 +171,11 @@ class ColumnConst final : public COWHelper { get_data_column_ptr()->update_crc_with_value(start, end, hash, nullptr); } + void update_murmur_with_value(size_t start, size_t end, uint32_t& hash, + const uint8_t* __restrict null_data) const override { + get_data_column_ptr()->update_murmur_with_value(start, end, hash, nullptr); + } + void serialize_vec_with_null_map(std::vector& keys, size_t num_rows, const uint8_t* null_map) const override { data->serialize_vec_with_null_map(keys, num_rows, null_map); @@ -191,6 +196,10 @@ class ColumnConst final : public COWHelper { void update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const override; + void update_murmurs_with_value(uint32_t* __restrict hashes, PrimitiveType type, uint32_t rows, + uint32_t offset = 0, + const uint8_t* __restrict null_data = nullptr) const override; + ColumnPtr filter(const Filter& filt, ssize_t result_size_hint) const override; size_t filter(const Filter& filter) override; diff --git a/be/src/vec/columns/column_decimal.cpp b/be/src/vec/columns/column_decimal.cpp index 07508f8c6a895e6..ca2860ab68da84d 100644 --- a/be/src/vec/columns/column_decimal.cpp +++ b/be/src/vec/columns/column_decimal.cpp @@ -183,6 +183,56 @@ void ColumnDecimal::update_crcs_with_value(uint32_t* __restrict hashes, Primi } } +template +void ColumnDecimal::update_murmur_with_value(size_t start, size_t end, uint32_t& hash, + const uint8_t* __restrict null_data) const { + if (null_data == nullptr) { + for (size_t i = start; i < end; i++) { + if constexpr (!IsDecimalV2) { + hash = HashUtil::murmur_hash3_32(&data[i], sizeof(T), + HashUtil::SPARK_MURMUR_32_SEED); + } else { + decimalv2_do_murmur(i, hash); + } + } + } else { + for (size_t i = start; i < end; i++) { + if (null_data[i] == 0) { + if constexpr (!IsDecimalV2) { + hash = HashUtil::murmur_hash3_32(&data[i], sizeof(T), + HashUtil::SPARK_MURMUR_32_SEED); + } else { + decimalv2_do_murmur(i, hash); + } + } + } + } +} + +template +void ColumnDecimal::update_murmurs_with_value(uint32_t* __restrict hashes, PrimitiveType type, + uint32_t rows, uint32_t offset, + const uint8_t* __restrict null_data) const { + auto s = rows; + DCHECK(s == size()); + + if constexpr (!IsDecimalV2) { + DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL(HashUtil::SPARK_MURMUR_32_SEED) + } else { + if (null_data == nullptr) { + for (size_t i = 0; i < s; i++) { + decimalv2_do_murmur(i, hashes[i]); + } + } else { + for (size_t i = 0; i < s; i++) { + if (null_data[i] == 0) { + decimalv2_do_murmur(i, hashes[i]); + } + } + } + } +} + template void ColumnDecimal::update_xxHash_with_value(size_t start, size_t end, uint64_t& hash, const uint8_t* __restrict null_data) const { diff --git a/be/src/vec/columns/column_decimal.h b/be/src/vec/columns/column_decimal.h index b61753146fba65d..f9889bdf59c3368 100644 --- a/be/src/vec/columns/column_decimal.h +++ b/be/src/vec/columns/column_decimal.h @@ -190,12 +190,16 @@ class ColumnDecimal final : public COWHelper diff --git a/be/src/vec/columns/column_map.cpp b/be/src/vec/columns/column_map.cpp index 82e8c0a911840b8..05e7a7c5c7244d2 100644 --- a/be/src/vec/columns/column_map.cpp +++ b/be/src/vec/columns/column_map.cpp @@ -323,6 +323,42 @@ void ColumnMap::update_crc_with_value(size_t start, size_t end, uint32_t& hash, } } +void ColumnMap::update_murmur_with_value(size_t start, size_t end, uint32_t& hash, + const uint8_t* __restrict null_data) const { + auto& offsets = get_offsets(); + if (null_data) { + for (size_t i = start; i < end; ++i) { + if (null_data[i] == 0) { + size_t kv_size = offsets[i] - offsets[i - 1]; + if (kv_size == 0) { + hash = HashUtil::murmur_hash3_32(reinterpret_cast(&kv_size), + sizeof(kv_size), + HashUtil::SPARK_MURMUR_32_SEED); + } else { + get_keys().update_murmur_with_value(offsets[i - 1], offsets[i], + hash, nullptr); + get_values().update_murmur_with_value(offsets[i - 1], offsets[i], + hash, nullptr); + } + } + } + } else { + for (size_t i = start; i < end; ++i) { + size_t kv_size = offsets[i] - offsets[i - 1]; + if (kv_size == 0) { + hash = HashUtil::murmur_hash3_32(reinterpret_cast(&kv_size), + sizeof(kv_size), + HashUtil::SPARK_MURMUR_32_SEED); + } else { + get_keys().update_murmur_with_value(offsets[i - 1], offsets[i], + hash, nullptr); + get_values().update_murmur_with_value(offsets[i - 1], offsets[i], + hash, nullptr); + } + } + } +} + void ColumnMap::update_hashes_with_value(uint64_t* hashes, const uint8_t* null_data) const { size_t s = size(); if (null_data) { @@ -358,6 +394,26 @@ void ColumnMap::update_crcs_with_value(uint32_t* __restrict hash, PrimitiveType } } +void ColumnMap::update_murmurs_with_value(uint32_t* __restrict hash, PrimitiveType type, + uint32_t rows, uint32_t offset, + const uint8_t* __restrict null_data) const { + auto s = rows; + DCHECK(s == size()); + + if (null_data) { + for (size_t i = 0; i < s; ++i) { + // every row + if (null_data[i] == 0) { + update_murmur_with_value(i, i + 1, hash[i], nullptr); + } + } + } else { + for (size_t i = 0; i < s; ++i) { + update_murmur_with_value(i, i + 1, hash[i], nullptr); + } + } +} + void ColumnMap::insert_range_from(const IColumn& src, size_t start, size_t length) { if (length == 0) { return; diff --git a/be/src/vec/columns/column_map.h b/be/src/vec/columns/column_map.h index 1cb3dd0c731c438..440dacae0f94fef 100644 --- a/be/src/vec/columns/column_map.h +++ b/be/src/vec/columns/column_map.h @@ -184,6 +184,8 @@ class ColumnMap final : public COWHelper { const uint8_t* __restrict null_data) const override; void update_crc_with_value(size_t start, size_t end, uint32_t& hash, const uint8_t* __restrict null_data) const override; + void update_murmur_with_value(size_t start, size_t end, uint32_t& hash, + const uint8_t* __restrict null_data) const override; void update_hashes_with_value(std::vector& hashes, const uint8_t* __restrict null_data) const override; @@ -195,6 +197,10 @@ class ColumnMap final : public COWHelper { uint32_t offset = 0, const uint8_t* __restrict null_data = nullptr) const override; + void update_murmurs_with_value(uint32_t* __restrict hash, PrimitiveType type, uint32_t rows, + uint32_t offset = 0, + const uint8_t* __restrict null_data = nullptr) const override; + /******************** keys and values ***************/ const ColumnPtr& get_keys_ptr() const { return keys_column; } ColumnPtr& get_keys_ptr() { return keys_column; } diff --git a/be/src/vec/columns/column_nullable.cpp b/be/src/vec/columns/column_nullable.cpp index 3553e9823d9c0da..7b5321f9e556abf 100644 --- a/be/src/vec/columns/column_nullable.cpp +++ b/be/src/vec/columns/column_nullable.cpp @@ -87,6 +87,22 @@ void ColumnNullable::update_crc_with_value(size_t start, size_t end, uint32_t& h } } +void ColumnNullable::update_murmur_with_value(size_t start, size_t end, uint32_t& hash, + const uint8_t* __restrict null_data) const { + if (!has_null()) { + nested_column->update_murmur_with_value(start, end, hash, nullptr); + } else { + const auto* __restrict real_null_data = + assert_cast(*null_map).get_data().data(); + for (int i = start; i < end; ++i) { + if (real_null_data[i] != 0) { + hash = HashUtil::murmur_hash3_32_null(HashUtil::SPARK_MURMUR_32_SEED); + } + } + nested_column->update_murmur_with_value(start, end, hash, real_null_data); + } +} + void ColumnNullable::update_hash_with_value(size_t n, SipHash& hash) const { if (is_null_at(n)) { hash.update(0); @@ -134,6 +150,27 @@ void ColumnNullable::update_crcs_with_value(uint32_t* __restrict hashes, doris:: } } +void ColumnNullable::update_murmurs_with_value(uint32_t* __restrict hashes, + doris::PrimitiveType type, + uint32_t rows, uint32_t offset, + const uint8_t* __restrict null_data) const { + DCHECK(null_data == nullptr); + auto s = rows; + DCHECK(s == size()); + const auto* __restrict real_null_data = + assert_cast(*null_map).get_data().data(); + if (!has_null()) { + nested_column->update_murmurs_with_value(hashes, type, rows, offset, nullptr); + } else { + for (int i = 0; i < s; ++i) { + if (real_null_data[i] != 0) { + hashes[i] = HashUtil::murmur_hash3_32_null(HashUtil::SPARK_MURMUR_32_SEED); + } + } + nested_column->update_murmurs_with_value(hashes, type, rows, offset, real_null_data); + } +} + void ColumnNullable::update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const { DCHECK(null_data == nullptr); diff --git a/be/src/vec/columns/column_nullable.h b/be/src/vec/columns/column_nullable.h index 365400a66990daa..5ff40ec3d59ab1f 100644 --- a/be/src/vec/columns/column_nullable.h +++ b/be/src/vec/columns/column_nullable.h @@ -220,6 +220,8 @@ class ColumnNullable final : public COWHelper { const uint8_t* __restrict null_data) const override; void update_crc_with_value(size_t start, size_t end, uint32_t& hash, const uint8_t* __restrict null_data) const override; + void update_murmur_with_value(size_t start, size_t end, uint32_t& hash, + const uint8_t* __restrict null_data) const override; void update_hash_with_value(size_t n, SipHash& hash) const override; void update_hashes_with_value(std::vector& hashes, @@ -229,6 +231,9 @@ class ColumnNullable final : public COWHelper { const uint8_t* __restrict null_data) const override; void update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const override; + void update_murmurs_with_value(uint32_t* __restrict hash, PrimitiveType type, uint32_t rows, + uint32_t offset, + const uint8_t* __restrict null_data) const override; MutableColumns scatter(ColumnIndex num_columns, const Selector& selector) const override { return scatter_impl(num_columns, selector); diff --git a/be/src/vec/columns/column_string.cpp b/be/src/vec/columns/column_string.cpp index 2d009e2a08becea..2ae75d18da18631 100644 --- a/be/src/vec/columns/column_string.cpp +++ b/be/src/vec/columns/column_string.cpp @@ -219,6 +219,29 @@ void ColumnString::update_crcs_with_value(uint32_t* __restrict hashes, doris::Pr } } +void ColumnString::update_murmurs_with_value(uint32_t* __restrict hashes, doris::PrimitiveType type, + uint32_t rows, uint32_t offset, + const uint8_t* __restrict null_data) const { + auto s = rows; + DCHECK(s == size()); + + if (null_data == nullptr) { + for (size_t i = 0; i < s; i++) { + auto data_ref = get_data_at(i); + hashes[i] = HashUtil::murmur_hash3_32(data_ref.data, data_ref.size, + HashUtil::SPARK_MURMUR_32_SEED); + } + } else { + for (size_t i = 0; i < s; i++) { + if (null_data[i] == 0) { + auto data_ref = get_data_at(i); + hashes[i] = HashUtil::murmur_hash3_32(data_ref.data, data_ref.size, + HashUtil::SPARK_MURMUR_32_SEED); + } + } + } +} + ColumnPtr ColumnString::filter(const Filter& filt, ssize_t result_size_hint) const { if (offsets.size() == 0) { return ColumnString::create(); diff --git a/be/src/vec/columns/column_string.h b/be/src/vec/columns/column_string.h index 191c6a95cf9cc1f..ab701e8f562f74f 100644 --- a/be/src/vec/columns/column_string.h +++ b/be/src/vec/columns/column_string.h @@ -442,6 +442,25 @@ class ColumnString final : public COWHelper { } } + void update_murmur_with_value(size_t start, size_t end, uint32_t& hash, + const uint8_t* __restrict null_data) const override { + if (null_data) { + for (size_t i = start; i < end; ++i) { + if (null_data[i] == 0) { + auto data_ref = get_data_at(i); + hash = HashUtil::murmur_hash3_32(data_ref.data, data_ref.size, + HashUtil::SPARK_MURMUR_32_SEED); + } + } + } else { + for (size_t i = start; i < end; ++i) { + auto data_ref = get_data_at(i); + hash = HashUtil::murmur_hash3_32(data_ref.data, data_ref.size, + HashUtil::SPARK_MURMUR_32_SEED); + } + } + } + void update_hash_with_value(size_t n, SipHash& hash) const override { size_t string_size = size_at(n); size_t offset = offset_at(n); @@ -460,6 +479,10 @@ class ColumnString final : public COWHelper { uint32_t offset, const uint8_t* __restrict null_data) const override; + void update_murmurs_with_value(uint32_t* __restrict hashes, PrimitiveType type, uint32_t rows, + uint32_t offset, + const uint8_t* __restrict null_data) const override; + void update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const override { auto s = size(); diff --git a/be/src/vec/columns/column_struct.cpp b/be/src/vec/columns/column_struct.cpp index 3502fdf581a9c31..be324390f1ed6f5 100644 --- a/be/src/vec/columns/column_struct.cpp +++ b/be/src/vec/columns/column_struct.cpp @@ -210,6 +210,13 @@ void ColumnStruct::update_crc_with_value(size_t start, size_t end, uint32_t& has } } +void ColumnStruct::update_murmur_with_value(size_t start, size_t end, uint32_t& hash, + const uint8_t* __restrict null_data) const { + for (const auto& column : columns) { + column->update_murmur_with_value(start, end, hash, nullptr); + } +} + void ColumnStruct::update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const { for (const auto& column : columns) { @@ -225,6 +232,14 @@ void ColumnStruct::update_crcs_with_value(uint32_t* __restrict hash, PrimitiveTy } } +void ColumnStruct::update_murmurs_with_value(uint32_t* __restrict hash, PrimitiveType type, + uint32_t rows, uint32_t offset, + const uint8_t* __restrict null_data) const { + for (const auto& column : columns) { + column->update_murmurs_with_value(hash, type, rows, offset, null_data); + } +} + void ColumnStruct::insert_indices_from(const IColumn& src, const int* indices_begin, const int* indices_end) { const ColumnStruct& src_concrete = assert_cast(src); diff --git a/be/src/vec/columns/column_struct.h b/be/src/vec/columns/column_struct.h index 499fb8444f9b707..1aa787e0df1b8cd 100644 --- a/be/src/vec/columns/column_struct.h +++ b/be/src/vec/columns/column_struct.h @@ -110,6 +110,8 @@ class ColumnStruct final : public COWHelper { const uint8_t* __restrict null_data) const override; void update_crc_with_value(size_t start, size_t end, uint32_t& hash, const uint8_t* __restrict null_data) const override; + void update_murmur_with_value(size_t start, size_t end, uint32_t& hash, + const uint8_t* __restrict null_data) const override; void update_hashes_with_value(std::vector& hashes, const uint8_t* __restrict null_data) const override; @@ -121,6 +123,10 @@ class ColumnStruct final : public COWHelper { uint32_t offset = 0, const uint8_t* __restrict null_data = nullptr) const override; + void update_murmurs_with_value(uint32_t* __restrict hash, PrimitiveType type, uint32_t rows, + uint32_t offset = 0, + const uint8_t* __restrict null_data = nullptr) const override; + void insert_indices_from(const IColumn& src, const int* indices_begin, const int* indices_end) override; diff --git a/be/src/vec/columns/column_vector.cpp b/be/src/vec/columns/column_vector.cpp index a825e07d5f29323..c363bb36b5b0e4f 100644 --- a/be/src/vec/columns/column_vector.cpp +++ b/be/src/vec/columns/column_vector.cpp @@ -202,6 +202,41 @@ void ColumnVector::update_crcs_with_value(uint32_t* __restrict hashes, Primit } } +template +void ColumnVector::update_murmurs_with_value(uint32_t* __restrict hashes, PrimitiveType type, + uint32_t rows, uint32_t offset, + const uint8_t* __restrict null_data) const { + auto s = rows; + DCHECK(s == size()); + + if constexpr (!std::is_same_v) { + DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL(HashUtil::SPARK_MURMUR_32_SEED) + } else { + if (type == TYPE_DATE || type == TYPE_DATETIME) { + char buf[64]; + auto date_convert_do_crc = [&](size_t i) { + const VecDateTimeValue& date_val = (const VecDateTimeValue&)data[i]; + auto len = date_val.to_buffer(buf); + hashes[i] = HashUtil::murmur_hash3_32(buf, len, HashUtil::SPARK_MURMUR_32_SEED); + }; + + if (null_data == nullptr) { + for (size_t i = 0; i < s; i++) { + date_convert_do_crc(i); + } + } else { + for (size_t i = 0; i < s; i++) { + if (null_data[i] == 0) { + date_convert_do_crc(i); + } + } + } + } else { + DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL(HashUtil::SPARK_MURMUR_32_SEED) + } + } +} + template struct ColumnVector::less { const Self& parent; diff --git a/be/src/vec/columns/column_vector.h b/be/src/vec/columns/column_vector.h index cb1edddb52083c2..d6d850540f4100f 100644 --- a/be/src/vec/columns/column_vector.h +++ b/be/src/vec/columns/column_vector.h @@ -323,6 +323,24 @@ class ColumnVector final : public COWHelper> } } } + + void update_murmur_with_value(size_t start, size_t end, uint32_t& hash, + const uint8_t* __restrict null_data) const override { + if (null_data) { + for (size_t i = start; i < end; i++) { + if (null_data[i] == 0) { + hash = HashUtil::murmur_hash3_32(&data[i], sizeof(T), + HashUtil::SPARK_MURMUR_32_SEED); + } + } + } else { + for (size_t i = start; i < end; i++) { + hash = HashUtil::murmur_hash3_32(&data[i], sizeof(T), + HashUtil::SPARK_MURMUR_32_SEED); + } + } + } + void update_hash_with_value(size_t n, SipHash& hash) const override; void update_hashes_with_value(std::vector& hashes, @@ -331,6 +349,10 @@ class ColumnVector final : public COWHelper> void update_crcs_with_value(uint32_t* __restrict hashes, PrimitiveType type, uint32_t rows, uint32_t offset, const uint8_t* __restrict null_data) const override; + + void update_murmurs_with_value(uint32_t* __restrict hashes, PrimitiveType type, uint32_t rows, + uint32_t offset, + const uint8_t* __restrict null_data) const override; void update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const override; diff --git a/be/src/vec/runtime/partitioner.cpp b/be/src/vec/runtime/partitioner.cpp index db40610723cdb60..464347407d41d63 100644 --- a/be/src/vec/runtime/partitioner.cpp +++ b/be/src/vec/runtime/partitioner.cpp @@ -71,6 +71,13 @@ void XXHashPartitioner::_do_hash(const ColumnPtr& column, uint64_t* column->update_hashes_with_value(result); } +template +void Murmur32HashPartitioner::_do_hash(const ColumnPtr& column, + uint32_t* __restrict result, int idx) const { + column->update_murmurs_with_value(result, Base::_partition_expr_ctxs[idx]->root()->type().type, + column->size()); +} + template Status XXHashPartitioner::clone(RuntimeState* state, std::unique_ptr& partitioner) { @@ -97,6 +104,19 @@ Status Crc32HashPartitioner::clone(RuntimeState* state, return Status::OK(); } +template +Status Murmur32HashPartitioner::clone(RuntimeState* state, + std::unique_ptr& partitioner) { + auto* new_partitioner = new Murmur32HashPartitioner(Base::_partition_count); + partitioner.reset(new_partitioner); + new_partitioner->_partition_expr_ctxs.resize(Base::_partition_expr_ctxs.size()); + for (size_t i = 0; i < Base::_partition_expr_ctxs.size(); i++) { + RETURN_IF_ERROR(Base::_partition_expr_ctxs[i]->clone( + state, new_partitioner->_partition_expr_ctxs[i])); + } + return Status::OK(); +} + template class Partitioner; template class XXHashPartitioner; template class Partitioner; @@ -104,5 +124,7 @@ template class XXHashPartitioner; template class Partitioner; template class Crc32HashPartitioner; template class Crc32HashPartitioner; +template class Murmur32HashPartitioner; +template class Murmur32HashPartitioner; } // namespace doris::vectorized diff --git a/be/src/vec/runtime/partitioner.h b/be/src/vec/runtime/partitioner.h index 66ed8809d7ce7c1..cbc25ab77350533 100644 --- a/be/src/vec/runtime/partitioner.h +++ b/be/src/vec/runtime/partitioner.h @@ -112,5 +112,19 @@ class Crc32HashPartitioner final : public Partitioner { void _do_hash(const ColumnPtr& column, uint32_t* __restrict result, int idx) const override; }; +template +class Murmur32HashPartitioner final : public Partitioner { +public: + using Base = Partitioner; + Murmur32HashPartitioner(int partition_count) + : Partitioner(partition_count) {} + ~Murmur32HashPartitioner() override = default; + + Status clone(RuntimeState* state, std::unique_ptr& partitioner) override; + +private: + void _do_hash(const ColumnPtr& column, uint32_t* __restrict result, int idx) const override; +}; + } // namespace vectorized } // namespace doris diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 5bfec60d1f98d32..d557f8eaafe2c0e 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -321,6 +321,7 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int _pool(pool), _current_channel_idx(0), _part_type(sink.output_partition.type), + _hash_type(sink.output_partition.hash_type), _dest_node_id(sink.dest_node_id), _transfer_large_data_by_brpc(config::transfer_large_data_by_brpc), _serializer(this) { @@ -330,6 +331,9 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int sink.output_partition.type == TPartitionType::RANDOM || sink.output_partition.type == TPartitionType::RANGE_PARTITIONED || sink.output_partition.type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED); + DCHECK(sink.output_partition.hash_type == THashType::CRC32 || + sink.output_partition.hash_type == THashType::XXHASH64 || + sink.output_partition.hash_type == THashType::SPARK_MURMUR32); std::map fragment_id_to_channel_index; _enable_pipeline_exec = state->enable_pipeline_exec(); @@ -378,6 +382,7 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int _pool(pool), _current_channel_idx(0), _part_type(TPartitionType::UNPARTITIONED), + _hash_type(THashType::XXHASH64), _dest_node_id(dest_node_id), _serializer(this) { _cur_pb_block = &_pb_block1; @@ -410,8 +415,13 @@ Status VDataStreamSender::init(const TDataSink& tsink) { RETURN_IF_ERROR(_partitioner->init(t_stream_sink.output_partition.partition_exprs)); } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { _partition_count = _channel_shared_ptrs.size(); - _partitioner.reset( - new Crc32HashPartitioner(_channel_shared_ptrs.size())); + if (_hash_type == THashType::CRC32) { + _partitioner.reset( + new Crc32HashPartitioner(_channel_shared_ptrs.size())); + } else { + _partitioner.reset(new Murmur32HashPartitioner( + _channel_shared_ptrs.size())); + } RETURN_IF_ERROR(_partitioner->init(t_stream_sink.output_partition.partition_exprs)); } else if (_part_type == TPartitionType::RANGE_PARTITIONED) { return Status::InternalError("TPartitionType::RANGE_PARTITIONED should not be used"); diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index d25385d6b219081..0716a3321781b97 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -103,6 +103,13 @@ struct ShuffleChannelIds { } }; +struct ShufflePModChannelIds { + template + HashValueType operator()(HashValueType l, size_t r) { + return (l % r + r) % r; + } +}; + class VDataStreamSender : public DataSink { public: friend class pipeline::ExchangeSinkOperator; @@ -177,6 +184,7 @@ class VDataStreamSender : public DataSink { int _current_channel_idx; // index of current channel to send to if _random == true TPartitionType::type _part_type; + THashType::type _hash_type; // serialized batches for broadcasting; we need two so we can write // one while the other one is still being sent diff --git a/be/src/vec/utils/util.hpp b/be/src/vec/utils/util.hpp index 440bbff15383207..8c91490fd161e0b 100644 --- a/be/src/vec/utils/util.hpp +++ b/be/src/vec/utils/util.hpp @@ -156,6 +156,10 @@ inline std::string remove_suffix(const std::string& name, const std::string& suf return name.substr(0, name.length() - suffix.length()); }; +inline uint64_t pmod(const int64_t a, const int64_t b) { + return (a % b + b) % b; +} + } // namespace doris::vectorized namespace apache::thrift { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveBucketUtil.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveBucketUtil.java index 49823afabf8d788..f0b6092b0985e8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveBucketUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveBucketUtil.java @@ -95,6 +95,9 @@ private static PrimitiveTypeInfo convertToHiveColType(PrimitiveType dorisType) t Pattern.compile("bucket_(\\d+)(_\\d+)?$"); private static final Iterable BUCKET_PATTERNS = ImmutableList.of( + // spark/parquet pattern + // format: f"part-[paritionId]-[tid]-[txnId]-[jobId]-[taskAttemptId]-[fileCount].c000.snappy.parquet" + Pattern.compile("part-\\d{5}-\\w{8}-\\w{4}-\\w{4}-\\w{4}-\\w{12}_(\\d{5})(?:[-_.].*)?"), // legacy Presto naming pattern (current version matches Hive) Pattern.compile("\\d{8}_\\d{6}_\\d{5}_[a-z0-9]{5}_bucket-(\\d+)(?:[-_.].*)?"), // Hive naming pattern per `org.apache.hadoop.hive.ql.exec.Utilities#getBucketIdFromFile()` @@ -390,7 +393,7 @@ private static int hashCodeV2(Object o, ObjectInspector objIns, ByteBuffer byteB throw new DdlException("Unknown type: " + objIns.getTypeName()); } - private static OptionalInt getBucketNumberFromPath(String name) { + public static OptionalInt getBucketNumberFromPath(String name) { for (Pattern pattern : BUCKET_PATTERNS) { Matcher matcher = pattern.matcher(name); if (matcher.matches()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveExternalDistributionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveExternalDistributionInfo.java new file mode 100644 index 000000000000000..5b15874401908a3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveExternalDistributionInfo.java @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import com.google.gson.annotations.SerializedName; + +import java.util.List; +import java.util.Objects; + +/* + * Hive Hash Distribution Info + */ +public class HiveExternalDistributionInfo extends HashDistributionInfo { + @SerializedName(value = "bucketingVersion") + private final int bucketingVersion; + + public HiveExternalDistributionInfo() { + bucketingVersion = 2; + } + + public HiveExternalDistributionInfo(int bucketNum, List distributionColumns, int bucketingVersion) { + super(bucketNum, distributionColumns); + this.bucketingVersion = bucketingVersion; + } + + public HiveExternalDistributionInfo(int bucketNum, boolean autoBucket, + List distributionColumns, int bucketingVersion) { + super(bucketNum, autoBucket, distributionColumns); + this.bucketingVersion = bucketingVersion; + } + + public int getBucketingVersion() { + return bucketingVersion; + } + + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + HiveExternalDistributionInfo that = (HiveExternalDistributionInfo) o; + return bucketNum == that.bucketNum + && sameDistributionColumns(that) + && bucketingVersion == that.bucketingVersion; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), bucketingVersion); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("type: ").append(type).append("; "); + + builder.append("distribution columns: ["); + for (Column column : getDistributionColumns()) { + builder.append(column.getName()).append(","); + } + builder.append("]; "); + + if (autoBucket) { + builder.append("bucket num: auto;"); + } else { + builder.append("bucket num: ").append(bucketNum).append(";"); + } + + builder.append("bucketingVersion: ").append(bucketingVersion).append(";"); + + return builder.toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index b284b4d60ecf03f..b51c58ffb347e39 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -18,10 +18,14 @@ package org.apache.doris.catalog.external; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DistributionInfo; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.HiveExternalDistributionInfo; import org.apache.doris.catalog.HiveMetaStoreClientHelper; import org.apache.doris.catalog.HudiUtils; import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.RandomDistributionInfo; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; @@ -39,6 +43,7 @@ import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.commons.collections.MapUtils; @@ -52,6 +57,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.iceberg.Schema; @@ -63,6 +69,8 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.time.LocalDate; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -87,6 +95,22 @@ public class HMSExternalTable extends ExternalTable { private static final String TBL_PROP_TRANSIENT_LAST_DDL_TIME = "transient_lastDdlTime"; private static final String NUM_ROWS = "numRows"; + private static final String SPARK_BUCKET = "spark.sql.sources.schema.bucketCol."; + private static final String SPARK_NUM_BUCKET = "spark.sql.sources.schema.numBuckets"; + private static final String BUCKETING_VERSION = "bucketing_version"; + + private static final Set SUPPORTED_BUCKET_PROPERTIES; + + static { + SUPPORTED_BUCKET_PROPERTIES = Sets.newHashSet(); + SUPPORTED_BUCKET_PROPERTIES.add(SPARK_BUCKET + "0"); + SUPPORTED_BUCKET_PROPERTIES.add(SPARK_BUCKET + "1"); + SUPPORTED_BUCKET_PROPERTIES.add(SPARK_BUCKET + "2"); + SUPPORTED_BUCKET_PROPERTIES.add(SPARK_BUCKET + "3"); + SUPPORTED_BUCKET_PROPERTIES.add(SPARK_BUCKET + "4"); + SUPPORTED_BUCKET_PROPERTIES.add(SPARK_NUM_BUCKET); + SUPPORTED_BUCKET_PROPERTIES.add(BUCKETING_VERSION); + } static { SUPPORTED_HIVE_FILE_FORMATS = Sets.newHashSet(); @@ -108,8 +132,9 @@ public class HMSExternalTable extends ExternalTable { SUPPORTED_HUDI_FILE_FORMATS.add("com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat"); } - private volatile org.apache.hadoop.hive.metastore.api.Table remoteTable = null; - private List partitionColumns; + protected volatile org.apache.hadoop.hive.metastore.api.Table remoteTable = null; + protected List partitionColumns; + private List bucketColumns; private DLAType dlaType = DLAType.UNKNOWN; @@ -123,6 +148,8 @@ public enum DLAType { UNKNOWN, HIVE, HUDI, ICEBERG } + private DistributionInfo distributionInfo; + /** * Create hive metastore external table. * @@ -192,6 +219,10 @@ public boolean isHoodieCowTable() { return "org.apache.hudi.hadoop.HoodieParquetInputFormat".equals(inputFormatName); } + public boolean isBucketedTable() { + return bucketColumns != null && !bucketColumns.isEmpty(); + } + /** * Now we only support three file input format hive tables: parquet/orc/text. * Support managed_table and external_table. @@ -438,9 +469,85 @@ public List initSchema() { columns = tmpSchema; } initPartitionColumns(columns); + initBucketingColumns(columns); return columns; } + private void initBucketingColumns(List columns) { + List bucketCols = new ArrayList<>(5); + int numBuckets = getBucketColums(bucketCols); + if (bucketCols.isEmpty()) { + bucketColumns = ImmutableList.of(); + distributionInfo = new RandomDistributionInfo(1, true); + return; + } + + int bucketingVersion = Integer.valueOf(remoteTable.getParameters().getOrDefault(BUCKETING_VERSION, + "2")); + ImmutableList.Builder bucketColBuilder = ImmutableList.builder(); + for (String colName : bucketCols) { + // do not use "getColum()", which will cause dead loop + for (Column column : columns) { + if (colName.equals(column.getName())) { + // For partition column, if it is string type, change it to varchar(65535) + // to be same as doris managed table. + // This is to avoid some unexpected behavior such as different partition pruning result + // between doris managed table and external table. + if (column.getType().getPrimitiveType() == PrimitiveType.STRING) { + column.setType(ScalarType.createVarcharType(ScalarType.MAX_VARCHAR_LENGTH)); + } + bucketColBuilder.add(column); + break; + } + } + } + + bucketColumns = bucketColBuilder.build(); + distributionInfo = new HiveExternalDistributionInfo(numBuckets, bucketColumns, bucketingVersion); + LOG.debug("get {} bucket columns for table: {}", bucketColumns.size(), name); + } + + private int getBucketColums(List bucketCols) { + StorageDescriptor descriptor = remoteTable.getSd(); + int numBuckets = -1; + if (descriptor.isSetBucketCols() && !descriptor.getBucketCols().isEmpty()) { + /* Hive Bucketed Table */ + bucketCols.addAll(descriptor.getBucketCols()); + numBuckets = descriptor.getNumBuckets(); + } else if (remoteTable.isSetParameters() + && !Collections.disjoint(SUPPORTED_BUCKET_PROPERTIES, remoteTable.getParameters().keySet())) { + Map parameters = remoteTable.getParameters(); + for (String key : SUPPORTED_BUCKET_PROPERTIES) { + if (parameters.containsKey(key)) { + switch (key) { + case SPARK_BUCKET + "0": + bucketCols.add(0, parameters.get(key)); + break; + case SPARK_BUCKET + "1": + bucketCols.add(1, parameters.get(key)); + break; + case SPARK_BUCKET + "2": + bucketCols.add(2, parameters.get(key)); + break; + case SPARK_BUCKET + "3": + bucketCols.add(3, parameters.get(key)); + break; + case SPARK_BUCKET + "4": + bucketCols.add(4, parameters.get(key)); + break; + case SPARK_NUM_BUCKET: + numBuckets = Integer.valueOf(parameters.get(key)); + break; + default: + //ignore + } + } + } + } + + return numBuckets; + } + public List getHudiSchema(List hmsSchema) { org.apache.avro.Schema hudiSchema = HiveMetaStoreClientHelper.getHudiTableSchema(this); List tmpSchema = Lists.newArrayListWithCapacity(hmsSchema.size()); @@ -544,6 +651,19 @@ public Optional getColumnStatistic(String colName) { return Optional.empty(); } + public DistributionInfo getDefaultDistributionInfo() { + makeSureInitialized(); + if (distributionInfo != null) { + return distributionInfo; + } + + return new RandomDistributionInfo(1, true); + } + + public Map getTableParameters() { + return remoteTable.getParameters(); + } + private Optional getHiveColumnStats(String colName) { List tableStats = getHiveTableColumnStats(Lists.newArrayList(colName)); if (tableStats == null || tableStats.isEmpty()) { @@ -715,14 +835,23 @@ public long getDataSize(boolean singleReplica) { @Override public boolean isDistributionColumn(String columnName) { - return getRemoteTable().getSd().getBucketCols().stream().map(String::toLowerCase) - .collect(Collectors.toSet()).contains(columnName.toLowerCase()); + Set distributeColumns = getDistributionColumnNames() + .stream().map(String::toLowerCase).collect(Collectors.toSet()); + return distributeColumns.contains(columnName.toLowerCase()); } @Override public Set getDistributionColumnNames() { - return getRemoteTable().getSd().getBucketCols().stream().map(String::toLowerCase) - .collect(Collectors.toSet()); + Set distributionColumnNames = Sets.newHashSet(); + if (distributionInfo instanceof RandomDistributionInfo) { + return distributionColumnNames; + } + HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo; + List partitionColumns = hashDistributionInfo.getDistributionColumns(); + for (Column column : partitionColumns) { + distributionColumnNames.add(column.getName().toLowerCase()); + } + return distributionColumnNames; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 6eb9f2d75e9885e..a541ec2dda65161 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -170,6 +170,7 @@ import org.apache.doris.planner.SortNode; import org.apache.doris.planner.TableFunctionNode; import org.apache.doris.planner.UnionNode; +import org.apache.doris.planner.external.FileQueryScanNode; import org.apache.doris.planner.external.HiveScanNode; import org.apache.doris.planner.external.MaxComputeScanNode; import org.apache.doris.planner.external.hudi.HudiScanNode; @@ -451,7 +452,8 @@ public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTransla TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, context); // TODO(cmy): determine the needCheckColumnPriv param - ScanNode scanNode; + FileQueryScanNode scanNode; + DataPartition dataPartition = DataPartition.RANDOM; if (table instanceof HMSExternalTable) { switch (((HMSExternalTable) table).getDlaType()) { case HUDI: @@ -501,8 +503,14 @@ public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTransla ) ); Utils.execWithUncheckedException(scanNode::finalizeForNereids); + if (fileScan.getDistributionSpec() instanceof DistributionSpecHash) { + DistributionSpecHash distributionSpecHash = (DistributionSpecHash) fileScan.getDistributionSpec(); + List partitionExprs = distributionSpecHash.getOrderedShuffledColumns().stream() + .map(context::findSlotRef).collect(Collectors.toList()); + dataPartition = new DataPartition(TPartitionType.HASH_PARTITIONED, + partitionExprs, scanNode.getHashType()); + } // Create PlanFragment - DataPartition dataPartition = DataPartition.RANDOM; PlanFragment planFragment = createPlanFragment(scanNode, dataPartition, fileScan); context.addPlanFragment(planFragment); updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), fileScan); @@ -2282,7 +2290,7 @@ private DataPartition toDataPartition(DistributionSpec distributionSpec, throw new RuntimeException("Do not support shuffle type: " + distributionSpecHash.getShuffleType()); } - return new DataPartition(partitionType, partitionExprs); + return new DataPartition(partitionType, partitionExprs); //todo(NK): check for lakehouse path here } else { throw new RuntimeException("Unknown DistributionSpec: " + distributionSpec); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java index 3b07f2bbe985b96..68968eb51386642 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java @@ -131,7 +131,7 @@ public PhysicalProperties visitPhysicalEsScan(PhysicalEsScan esScan, PlanContext @Override public PhysicalProperties visitPhysicalFileScan(PhysicalFileScan fileScan, PlanContext context) { - return PhysicalProperties.STORAGE_ANY; + return new PhysicalProperties(fileScan.getDistributionSpec()); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java index d86e1d1667e18a6..73f4601fe35bc48 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java @@ -17,11 +17,26 @@ package org.apache.doris.nereids.rules.implementation; -import org.apache.doris.nereids.properties.DistributionSpecAny; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DistributionInfo; +import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.external.HMSExternalTable; +import org.apache.doris.nereids.properties.DistributionSpec; +import org.apache.doris.nereids.properties.DistributionSpecHash; +import org.apache.doris.nereids.properties.DistributionSpecStorageAny; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.trees.expressions.ExprId; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan; +import com.google.common.collect.Lists; + +import java.util.Collections; +import java.util.List; import java.util.Optional; /** @@ -35,7 +50,7 @@ public Rule build() { fileScan.getRelationId(), fileScan.getTable(), fileScan.getQualifier(), - DistributionSpecAny.INSTANCE, + convertDistribution(fileScan), Optional.empty(), fileScan.getLogicalProperties(), fileScan.getConjuncts(), @@ -43,4 +58,30 @@ public Rule build() { fileScan.getTableSample()) ).toRule(RuleType.LOGICAL_FILE_SCAN_TO_PHYSICAL_FILE_SCAN_RULE); } + + private DistributionSpec convertDistribution(LogicalFileScan fileScan) { + TableIf table = fileScan.getTable(); + if (!(table instanceof HMSExternalTable)) { + return DistributionSpecStorageAny.INSTANCE; + } + + HMSExternalTable hmsExternalTable = (HMSExternalTable) table; + DistributionInfo distributionInfo = hmsExternalTable.getDefaultDistributionInfo(); + if (distributionInfo instanceof HashDistributionInfo) { + HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo; + List output = fileScan.getOutput(); + List hashColumns = Lists.newArrayList(); + for (Slot slot : output) { + for (Column column : hashDistributionInfo.getDistributionColumns()) { + if (((SlotReference) slot).getColumn().get().equals(column)) { + hashColumns.add(slot.getExprId()); + } + } + } + return new DistributionSpecHash(hashColumns, DistributionSpecHash.ShuffleType.NATURAL, + fileScan.getTable().getId(), -1, Collections.emptySet()); + } + + return DistributionSpecStorageAny.INSTANCE; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java index 753520869408ec2..0887e4aa3a0b30a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java @@ -26,6 +26,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.thrift.TDataPartition; import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.THashType; import org.apache.doris.thrift.TPartitionType; import com.google.common.base.Joiner; @@ -53,11 +54,16 @@ public class DataPartition { public static final DataPartition RANDOM = new DataPartition(TPartitionType.RANDOM); private final TPartitionType type; + private final THashType hashType; // for hash partition: exprs used to compute hash value private ImmutableList partitionExprs = ImmutableList.of(); public DataPartition(TPartitionType type, List exprs) { + this(type, exprs, THashType.XXHASH64); + } + + public DataPartition(TPartitionType type, List exprs, THashType hashType) { Preconditions.checkNotNull(exprs); Preconditions.checkState(!exprs.isEmpty()); Preconditions.checkState(type == TPartitionType.HASH_PARTITIONED @@ -65,6 +71,7 @@ public DataPartition(TPartitionType type, List exprs) { || type == TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED); this.type = type; this.partitionExprs = ImmutableList.copyOf(exprs); + this.hashType = hashType; } public void substitute(ExprSubstitutionMap smap, Analyzer analyzer) throws AnalysisException { @@ -76,10 +83,15 @@ public DataPartition(TPartitionType type) { Preconditions.checkState(type == TPartitionType.UNPARTITIONED || type == TPartitionType.RANDOM); this.type = type; this.partitionExprs = ImmutableList.of(); + this.hashType = THashType.CRC32; + } + + public static DataPartition hashPartitioned(List exprs, THashType hashType) { + return new DataPartition(TPartitionType.HASH_PARTITIONED, exprs, hashType); } public static DataPartition hashPartitioned(List exprs) { - return new DataPartition(TPartitionType.HASH_PARTITIONED, exprs); + return new DataPartition(TPartitionType.HASH_PARTITIONED, exprs, THashType.XXHASH64); } public boolean isPartitioned() { @@ -103,6 +115,7 @@ public TDataPartition toThrift() { if (partitionExprs != null) { result.setPartitionExprs(Expr.treesToThrift(partitionExprs)); } + result.setHashType(hashType); return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index a719081496b05ba..7665ecab56e2119 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -33,18 +33,23 @@ import org.apache.doris.catalog.DistributionInfo; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.HiveExternalDistributionInfo; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; +import org.apache.doris.planner.external.HiveScanNode; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.SessionVariable; +import org.apache.doris.thrift.THashType; import org.apache.doris.thrift.TPartitionType; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.hive.common.util.Ref; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -286,6 +291,10 @@ private PlanFragment createScanFragment(PlanNode node) throws UserException { OlapScanNode olapScanNode = (OlapScanNode) node; return new PlanFragment(ctx.getNextFragmentId(), node, olapScanNode.constructInputPartitionByDistributionInfo(), DataPartition.RANDOM); + } else if (node instanceof HiveScanNode) { + HiveScanNode hiveScanNode = (HiveScanNode) node; + return new PlanFragment(ctx.getNextFragmentId(), node, + hiveScanNode.constructInputPartitionByDistributionInfo(), DataPartition.RANDOM); } else { // other scan nodes are random partitioned: es, broker return new PlanFragment(ctx.getNextFragmentId(), node, DataPartition.RANDOM); @@ -328,10 +337,12 @@ private PlanFragment createHashJoinFragment( // bucket shuffle join is better than broadcast and shuffle join // it can reduce the network cost of join, so doris chose it first List rhsPartitionExprs = Lists.newArrayList(); - if (canBucketShuffleJoin(node, leftChildFragment, rhsPartitionExprs)) { + Ref hashType = Ref.from(THashType.CRC32); + if (canBucketShuffleJoin(node, leftChildFragment, rhsPartitionExprs, hashType)) { node.setDistributionMode(HashJoinNode.DistributionMode.BUCKET_SHUFFLE); DataPartition rhsJoinPartition = - new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, rhsPartitionExprs); + new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, + rhsPartitionExprs, hashType.value); ExchangeNode rhsExchange = new ExchangeNode(ctx.getNextNodeId(), rightChildFragment.getPlanRoot(), false); rhsExchange.setNumInstances(rightChildFragment.getPlanRoot().getNumInstances()); @@ -601,7 +612,7 @@ private boolean dataDistributionMatchEqPredicate(List eqJoinPre } private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFragment, - List rhsHashExprs) { + List rhsHashExprs, Ref hashType) { if (node.getJoinOp() == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) { return false; } @@ -617,7 +628,9 @@ private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFr PlanNode leftRoot = leftChildFragment.getPlanRoot(); // 1.leftRoot be OlapScanNode if (leftRoot instanceof OlapScanNode) { - return canBucketShuffleJoin(node, leftRoot, rhsHashExprs); + return canBucketShuffleJoin(node, (OlapScanNode) leftRoot, rhsHashExprs); + } else if (leftRoot instanceof HiveScanNode) { + return canBucketShuffleJoin(node, (HiveScanNode) leftRoot, rhsHashExprs, hashType); } // 2.leftRoot be hashjoin node @@ -626,17 +639,83 @@ private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFr leftRoot = leftRoot.getChild(0); } if (leftRoot instanceof OlapScanNode) { - return canBucketShuffleJoin(node, leftRoot, rhsHashExprs); + return canBucketShuffleJoin(node, (OlapScanNode) leftRoot, rhsHashExprs); + } else if (leftRoot instanceof HiveScanNode) { + return canBucketShuffleJoin(node, (HiveScanNode) leftRoot, rhsHashExprs, hashType); } } return false; } + private boolean canBucketShuffleJoin(HashJoinNode node, HiveScanNode leftScanNode, + List rhsJoinExprs, Ref hashType) { + HMSExternalTable leftTable = leftScanNode.getHiveTable(); + + DistributionInfo leftDistribution = leftTable.getDefaultDistributionInfo(); + if (leftDistribution == null || !(leftDistribution instanceof HiveExternalDistributionInfo)) { + return false; + } + + HiveExternalDistributionInfo hiveDistributionInfo = (HiveExternalDistributionInfo) leftDistribution; + + List leftDistributeColumns = hiveDistributionInfo.getDistributionColumns(); + List leftDistributeColumnNames = leftDistributeColumns.stream() + .map(col -> leftTable.getName() + "." + col.getName().toLowerCase()).collect(Collectors.toList()); + + List leftJoinColumnNames = new ArrayList<>(); + List rightExprs = new ArrayList<>(); + List eqJoinConjuncts = node.getEqJoinConjuncts(); + + for (BinaryPredicate eqJoinPredicate : eqJoinConjuncts) { + Expr lhsJoinExpr = eqJoinPredicate.getChild(0); + Expr rhsJoinExpr = eqJoinPredicate.getChild(1); + if (lhsJoinExpr.unwrapSlotRef() == null || rhsJoinExpr.unwrapSlotRef() == null) { + continue; + } + + SlotRef leftSlot = node.getChild(0).findSrcSlotRef(lhsJoinExpr.unwrapSlotRef()); + if (leftSlot.getTable() instanceof HMSExternalTable + && leftScanNode.desc.getSlots().contains(leftSlot.getDesc())) { + // table name in SlotRef is not the really name. `select * from test as t` + // table name in SlotRef is `t`, but here we need is `test`. + leftJoinColumnNames.add(leftSlot.getTable().getName() + "." + + leftSlot.getColumnName().toLowerCase()); + rightExprs.add(rhsJoinExpr); + } + } + + //2 the join columns should contains all left table distribute columns to enable bucket shuffle join + for (int i = 0; i < leftDistributeColumnNames.size(); i++) { + String distributeColumnName = leftDistributeColumnNames.get(i); + boolean findRhsExprs = false; + // check the join column name is same as distribute column name and + // check the rhs join expr type is same as distribute column + for (int j = 0; j < leftJoinColumnNames.size(); j++) { + if (leftJoinColumnNames.get(j).equals(distributeColumnName)) { + // varchar and string type don't need to check the length property + if ((rightExprs.get(j).getType().isVarcharOrStringType() + && leftDistributeColumns.get(i).getType().isVarcharOrStringType()) + || (rightExprs.get(j).getType().equals(leftDistributeColumns.get(i).getType()))) { + rhsJoinExprs.add(rightExprs.get(j)); + findRhsExprs = true; + break; + } + } + } + + if (!findRhsExprs) { + return false; + } + } + + hashType.value = THashType.SPARK_MURMUR32; + return true; + } + //the join expr must contian left table distribute column - private boolean canBucketShuffleJoin(HashJoinNode node, PlanNode leftRoot, + private boolean canBucketShuffleJoin(HashJoinNode node, OlapScanNode leftScanNode, List rhsJoinExprs) { - OlapScanNode leftScanNode = ((OlapScanNode) leftRoot); OlapTable leftTable = leftScanNode.getOlapTable(); //1 the left table has more than one partition or left table is not a stable colocate table diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 530a9a78305bbad..35993e372187fbb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -72,6 +72,7 @@ import org.apache.doris.system.Backend; import org.apache.doris.thrift.TColumn; import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.THashType; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TOlapScanNode; import org.apache.doris.thrift.TOlapTableIndex; @@ -1588,7 +1589,7 @@ public DataPartition constructInputPartitionByDistributionInfo() throws UserExce SlotRef slotRef = new SlotRef(desc.getRef().getName(), column.getName()); dataDistributeExprs.add(slotRef); } - return DataPartition.hashPartitioned(dataDistributeExprs); + return DataPartition.hashPartitioned(dataDistributeExprs, THashType.CRC32); } else { return DataPartition.RANDOM; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java index 9e23463235f2d3a..fcf53f22925b4a1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java @@ -55,6 +55,7 @@ public class FederationBackendPolicy { private final Map> backendMap = Maps.newHashMap(); private final SecureRandom random = new SecureRandom(); private ConsistentHash consistentHash; + private ConsistentHash consistentBucket; private int nextBe = 0; private boolean initialized = false; @@ -98,6 +99,8 @@ public void init(BeSelectionPolicy policy) throws UserException { backendMap.putAll(backends.stream().collect(Collectors.groupingBy(Backend::getHost))); consistentHash = new ConsistentHash<>(Hashing.murmur3_128(), new ScanRangeHash(), new BackendHash(), backends, Config.virtual_node_number); + consistentBucket = new ConsistentHash<>(Hashing.murmur3_128(), new BucketHash(), + new BackendHash(), backends, Config.virtual_node_number); } public Backend getNextBe() { @@ -110,6 +113,10 @@ public Backend getNextConsistentBe(TScanRangeLocations scanRangeLocations) { return consistentHash.getNode(scanRangeLocations); } + public Backend getNextConsistentBe(Integer bucketId) { + return consistentBucket.getNode(bucketId); + } + // Try to find a local BE, if not exists, use `getNextBe` instead public Backend getNextLocalBe(List hosts) { List candidateBackends = Lists.newArrayListWithCapacity(hosts.size()); @@ -151,4 +158,11 @@ public void funnel(TScanRangeLocations scanRange, PrimitiveSink primitiveSink) { } } } + + private static class BucketHash implements Funnel { + @Override + public void funnel(Integer bucketId, PrimitiveSink primitiveSink) { + primitiveSink.putLong(bucketId); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index e53bc2b2a0a7848..8dca746dc0db3e4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -27,8 +27,10 @@ import org.apache.doris.catalog.FsBroker; import org.apache.doris.catalog.FunctionGenTable; import org.apache.doris.catalog.HdfsResource; +import org.apache.doris.catalog.HiveBucketUtil; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.external.ExternalTable; +import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.NotImplementedException; @@ -39,6 +41,7 @@ import org.apache.doris.datasource.hive.AcidInfo; import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo; import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; +import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.external.hudi.HudiScanNode; import org.apache.doris.planner.external.hudi.HudiSplit; @@ -60,6 +63,7 @@ import org.apache.doris.thrift.TFileScanRangeParams; import org.apache.doris.thrift.TFileScanSlotInfo; import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.THashType; import org.apache.doris.thrift.THdfsParams; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TScanRange; @@ -72,6 +76,7 @@ import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import lombok.Getter; @@ -96,6 +101,8 @@ public abstract class FileQueryScanNode extends FileScanNode { protected Map destSlotDescByName; protected TFileScanRangeParams params; + public ArrayListMultimap bucketSeq2locations = ArrayListMultimap.create(); + @Getter protected TableSample tableSample; @@ -329,6 +336,13 @@ public void createScanRangeLocations() throws UserException { HiveSplit hiveSplit = (HiveSplit) split; isACID = hiveSplit.isACID(); } + + boolean isBucketedHiveTable = false; + TableIf targetTable = getTargetTable(); + if (targetTable instanceof HMSExternalTable) { + isBucketedHiveTable = ((HMSExternalTable) targetTable).isBucketedTable(); + } + List partitionValuesFromPath = fileSplit.getPartitionValues() == null ? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys, false, isACID) : fileSplit.getPartitionValues(); @@ -370,7 +384,11 @@ public void createScanRangeLocations() throws UserException { curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); TScanRangeLocation location = new TScanRangeLocation(); Backend selectedBackend; - if (enableSqlCache) { + int bucketNum = 0; + if (isBucketedHiveTable) { + bucketNum = HiveBucketUtil.getBucketNumberFromPath(fileSplit.getPath().getName()).getAsInt(); + selectedBackend = backendPolicy.getNextConsistentBe(bucketNum); + } else if (enableSqlCache) { // Use consistent hash to assign the same scan range into the same backend among different queries selectedBackend = backendPolicy.getNextConsistentBe(curLocations); } else if (enableShortCircuitRead) { @@ -383,9 +401,11 @@ public void createScanRangeLocations() throws UserException { location.setBackendId(selectedBackend.getId()); location.setServer(new TNetworkAddress(selectedBackend.getHost(), selectedBackend.getBePort())); curLocations.addToLocations(location); + LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}", curLocations.getLocations().get(0).getBackendId(), fileSplit.getPath(), fileSplit.getStart(), fileSplit.getLength(), Joiner.on("|").join(fileSplit.getHosts())); + bucketSeq2locations.put(bucketNum, curLocations); scanRangeLocations.add(curLocations); this.totalFileSize += fileSplit.getLength(); } @@ -517,6 +537,14 @@ protected static Optional getTFileType(String location) { } return Optional.empty(); } + + public DataPartition constructInputPartitionByDistributionInfo() { + return DataPartition.RANDOM; + } + + public THashType getHashType() { + return THashType.CRC32; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java index 943d30017e7c2aa..a0f2cf8770c1621 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java @@ -17,11 +17,16 @@ package org.apache.doris.planner.external; +import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.FunctionCallExpr; import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DistributionInfo; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.HiveExternalDistributionInfo; import org.apache.doris.catalog.HiveMetaStoreClientHelper; import org.apache.doris.catalog.ListPartitionItem; import org.apache.doris.catalog.PartitionItem; @@ -40,6 +45,7 @@ import org.apache.doris.datasource.hive.HiveTransaction; import org.apache.doris.datasource.hive.HiveVersionUtil; import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; +import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.external.HiveSplit.HiveSplitCreator; @@ -51,6 +57,7 @@ import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileTextScanRangeParams; import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.THashType; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -421,4 +428,37 @@ protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws User } return compressType; } + + @Override + public DataPartition constructInputPartitionByDistributionInfo() { + if (hmsTable.isBucketedTable()) { + DistributionInfo distributionInfo = hmsTable.getDefaultDistributionInfo(); + if (!(distributionInfo instanceof HashDistributionInfo)) { + return DataPartition.RANDOM; + } + List distributeColumns = ((HiveExternalDistributionInfo) distributionInfo).getDistributionColumns(); + List dataDistributeExprs = Lists.newArrayList(); + for (Column column : distributeColumns) { + SlotRef slotRef = new SlotRef(desc.getRef().getName(), column.getName()); + dataDistributeExprs.add(slotRef); + } + return DataPartition.hashPartitioned(dataDistributeExprs, THashType.SPARK_MURMUR32); + } + + return DataPartition.RANDOM; + } + + public HMSExternalTable getHiveTable() { + return hmsTable; + } + + @Override + public THashType getHashType() { + if (hmsTable.isBucketedTable() + && hmsTable.getDefaultDistributionInfo() instanceof HashDistributionInfo) { + return THashType.SPARK_MURMUR32; + } + + return THashType.CRC32; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index cf34283fa955369..808e316920eda98 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.FsBroker; import org.apache.doris.common.Config; +import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.Pair; import org.apache.doris.common.Reference; import org.apache.doris.common.Status; @@ -60,6 +61,7 @@ import org.apache.doris.planner.external.ExternalScanNode; import org.apache.doris.planner.external.FileQueryScanNode; import org.apache.doris.planner.external.FileScanNode; +import org.apache.doris.planner.external.HiveScanNode; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.InternalService.PExecPlanFragmentResult; import org.apache.doris.proto.InternalService.PExecPlanFragmentStartRequest; @@ -2203,8 +2205,13 @@ private void computeScanRangeAssignment() throws Exception { computeScanRangeAssignmentByColocate((OlapScanNode) scanNode, assignedBytesPerHost, replicaNumPerHost); } if (fragmentContainsBucketShuffleJoin) { - bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) scanNode, - idToBackend, addressToBackendID, replicaNumPerHost); + if (scanNode instanceof OlapScanNode) { + bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) scanNode, + idToBackend, addressToBackendID, replicaNumPerHost); + } else { + bucketShuffleJoinController.computeScanRangeAssignmentByBucket((HiveScanNode) scanNode, + idToBackend, addressToBackendID, replicaNumPerHost); + } } if (!(fragmentContainsColocateJoin || fragmentContainsBucketShuffleJoin)) { computeScanRangeAssignmentByScheduler(scanNode, locations, assignment, assignedBytesPerHost, @@ -2776,6 +2783,50 @@ private void computeScanRangeAssignmentByBucket( } } + private void computeScanRangeAssignmentByBucket( + final HiveScanNode scanNode, ImmutableMap idToBackend, + Map addressToBackendID, + Map replicaNumPerHost) throws Exception { + if (!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) { + int bucketNum = 0; + if (scanNode.getHiveTable().isBucketedTable()) { + bucketNum = scanNode.getHiveTable().getDefaultDistributionInfo().getBucketNum(); + } else { + throw new NotImplementedException("bucket shuffle for non-bucketed table not supported"); + } + fragmentIdToBucketNumMap.put(scanNode.getFragmentId(), bucketNum); + fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new HashMap<>()); + fragmentIdBucketSeqToScanRangeMap.put(scanNode.getFragmentId(), new BucketSeqToScanRange()); + fragmentIdToBuckendIdBucketCountMap.put(scanNode.getFragmentId(), new HashMap<>()); + } + Map bucketSeqToAddress + = fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId()); + BucketSeqToScanRange bucketSeqToScanRange = fragmentIdBucketSeqToScanRangeMap.get(scanNode.getFragmentId()); + + for (Integer bucketSeq : scanNode.bucketSeq2locations.keySet()) { + //fill scanRangeParamsList + List locations = scanNode.bucketSeq2locations.get(bucketSeq); + if (!bucketSeqToAddress.containsKey(bucketSeq)) { + getExecHostPortForFragmentIDAndBucketSeq(locations.get(0), scanNode.getFragmentId(), + bucketSeq, idToBackend, addressToBackendID, replicaNumPerHost); + } + + for (TScanRangeLocations location : locations) { + Map> scanRanges = + findOrInsert(bucketSeqToScanRange, bucketSeq, new HashMap<>()); + + List scanRangeParamsList = + findOrInsert(scanRanges, scanNode.getId().asInt(), new ArrayList<>()); + + // add scan range + TScanRangeParams scanRangeParams = new TScanRangeParams(); + scanRangeParams.scan_range = location.scan_range; + scanRangeParamsList.add(scanRangeParams); + updateScanRangeNumByScanRange(scanRangeParams); + } + } + } + private void computeInstanceParam(PlanFragmentId fragmentId, int parallelExecInstanceNum, FragmentExecParams params) { Map bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(fragmentId); diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java index ca8109e40e3070b..19b53529a3a4654 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java @@ -40,6 +40,7 @@ import org.apache.doris.planner.ScanNode; import org.apache.doris.service.FrontendOptions; import org.apache.doris.system.Backend; +import org.apache.doris.thrift.THashType; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPartitionType; import org.apache.doris.thrift.TScanRangeLocation; @@ -174,7 +175,7 @@ public void testIsBucketShuffleJoin() { new ArrayList<>()); hashJoinNode.setFragment(new PlanFragment(new PlanFragmentId(-1), hashJoinNode, - new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs))); + new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs, THashType.CRC32))); // hash join node is not bucket shuffle join Assert.assertEquals(false, @@ -182,13 +183,13 @@ public void testIsBucketShuffleJoin() { // the fragment id is different from hash join node hashJoinNode.setFragment(new PlanFragment(new PlanFragmentId(-2), hashJoinNode, - new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs))); + new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs, THashType.CRC32))); hashJoinNode.setDistributionMode(HashJoinNode.DistributionMode.BUCKET_SHUFFLE); Assert.assertEquals(false, Deencapsulation.invoke(bucketShuffleJoinController, "isBucketShuffleJoin", -1, hashJoinNode)); hashJoinNode.setFragment(new PlanFragment(new PlanFragmentId(-1), hashJoinNode, - new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs))); + new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs, THashType.CRC32))); Assert.assertEquals(true, Deencapsulation.invoke(bucketShuffleJoinController, "isBucketShuffleJoin", -1, hashJoinNode)); diff --git a/gensrc/thrift/Ddl.thrift b/gensrc/thrift/Ddl.thrift index 9696230af909edc..f733637bc7791a6 100644 --- a/gensrc/thrift/Ddl.thrift +++ b/gensrc/thrift/Ddl.thrift @@ -76,10 +76,6 @@ enum TAggType { // 4: optional string default_value //} -enum THashType { - CRC32 -} - // random partition info struct TRandomPartitionDesc { } @@ -93,7 +89,7 @@ struct THashPartitionDesc { 2: required i32 hash_buckets // type to compute hash value. if not set, use CRC32 - 3: optional THashType hash_type + 3: optional Partitions.THashType hash_type } // value used to represents one column value in one range value diff --git a/gensrc/thrift/Partitions.thrift b/gensrc/thrift/Partitions.thrift index 8eecbb417b1184a..530c0e02ea54fb6 100644 --- a/gensrc/thrift/Partitions.thrift +++ b/gensrc/thrift/Partitions.thrift @@ -21,6 +21,13 @@ namespace java org.apache.doris.thrift include "Exprs.thrift" include "Types.thrift" +enum THashType { + CRC32, + XXHASH64, + SPARK_MURMUR32, + HIVE_MOD +} + enum TPartitionType { UNPARTITIONED, @@ -87,6 +94,7 @@ struct TDataPartition { 1: required TPartitionType type 2: optional list partition_exprs 3: optional list partition_infos + 4: optional THashType hash_type }