Skip to content

Commit

Permalink
[feature](datalake) Add BucketShuffleJoin support for Hive table data…
Browse files Browse the repository at this point in the history
… generated by Spark. (27783)

    1. Original planner updated to consider BucketShuffle for bucketed hive table
    2. Neerids planner updated for bucketShuffle join on hive tables.
    3. Added spark style hash calculation in BE for shuffle on one side.
  • Loading branch information
Nitin-Kashyap committed Dec 4, 2023
1 parent 20d4d7e commit 28039b8
Show file tree
Hide file tree
Showing 38 changed files with 1,000 additions and 36 deletions.
8 changes: 8 additions & 0 deletions be/src/util/hash_util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,21 @@ class HashUtil {
// refer to https://github.com/apache/commons-codec/blob/master/src/main/java/org/apache/commons/codec/digest/MurmurHash3.java
static const uint32_t MURMUR3_32_SEED = 104729;

// refer https://github.com/apache/spark/blob/v3.5.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala#L615
static const uint32_t SPARK_MURMUR_32_SEED = 42;

// modify from https://github.com/aappleby/smhasher/blob/master/src/MurmurHash3.cpp
static uint32_t murmur_hash3_32(const void* key, int32_t len, uint32_t seed) {
uint32_t out = 0;
murmur_hash3_x86_32(key, len, seed, &out);
return out;
}

static uint32_t murmur_hash3_32_null(uint32_t seed) {
static const int INT_VALUE = 0;
return murmur_hash3_32((const unsigned char*)(&INT_VALUE), 4, seed);
}

static const int MURMUR_R = 47;

// Murmur2 hash implementation returning 64-bit hashes.
Expand Down
27 changes: 27 additions & 0 deletions be/src/vec/columns/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,18 @@ class SipHash;
} \
}

#define DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL(SEED) \
if (null_data == nullptr) { \
for (size_t i = 0; i < s; i++) { \
hashes[i] = HashUtil::murmur_hash3_32(&data[i], sizeof(T), SEED); \
} \
} else { \
for (size_t i = 0; i < s; i++) { \
if (null_data[i] == 0) \
hashes[i] = HashUtil::murmur_hash3_32(&data[i], sizeof(T), SEED); \
} \
}

namespace doris::vectorized {

class Arena;
Expand Down Expand Up @@ -401,6 +413,21 @@ class IColumn : public COW<IColumn> {
LOG(FATAL) << get_name() << " update_crc_with_value not supported";
}

/// Update state of murmur3 hash function (spark files) with value of n elements to avoid the virtual
/// function call null_data to mark whether need to do hash compute, null_data == nullptr
/// means all element need to do hash function, else only *null_data != 0 need to do hash func
virtual void update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type,
int32_t rows, uint32_t offset = 0,
const uint8_t* __restrict null_data = nullptr) const {
LOG(FATAL) << get_name() << "update_murmurs_with_value not supported";
}

// use range for one hash value to avoid virtual function call in loop
virtual void update_murmur_with_value(size_t start, size_t end, int32_t& hash,
const uint8_t* __restrict null_data) const {
LOG(FATAL) << get_name() << " update_murmur_with_value not supported";
}

/** Removes elements that don't match the filter.
* Is used in WHERE and HAVING operations.
* If result_size_hint > 0, then makes advance reserve(result_size_hint) for the result column;
Expand Down
54 changes: 54 additions & 0 deletions be/src/vec/columns/column_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,60 @@ void ColumnArray::update_crcs_with_value(uint32_t* __restrict hash, PrimitiveTyp
}
}

// for every array row calculate murmurHash
void ColumnArray::update_murmur_with_value(size_t start, size_t end, int32_t& hash,
const uint8_t* __restrict null_data) const {
auto& offsets_column = get_offsets();
if (hash == 0) {
hash = HashUtil::SPARK_MURMUR_32_SEED;
}
if (null_data) {
for (size_t i = start; i < end; ++i) {
if (null_data[i] == 0) {
size_t elem_size = offsets_column[i] - offsets_column[i - 1];
if (elem_size == 0) {
hash = HashUtil::murmur_hash3_32(reinterpret_cast<const char*>(&elem_size),
sizeof(elem_size), hash);
} else {
get_data().update_murmur_with_value(offsets_column[i - 1], offsets_column[i],
hash, nullptr);
}
}
}
} else {
for (size_t i = start; i < end; ++i) {
size_t elem_size = offsets_column[i] - offsets_column[i - 1];
if (elem_size == 0) {
hash = HashUtil::murmur_hash3_32(reinterpret_cast<const char*>(&elem_size),
sizeof(elem_size), hash);
} else {
get_data().update_murmur_with_value(offsets_column[i - 1], offsets_column[i], hash,
nullptr);
}
}
}
}

void ColumnArray::update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type,
int32_t rows, uint32_t offset,
const uint8_t* __restrict null_data) const {
auto s = rows;
DCHECK(s == size());

if (null_data) {
for (size_t i = 0; i < s; ++i) {
// every row
if (null_data[i] == 0) {
update_murmur_with_value(i, i + 1, hash[i], nullptr);
}
}
} else {
for (size_t i = 0; i < s; ++i) {
update_murmur_with_value(i, i + 1, hash[i], nullptr);
}
}
}

void ColumnArray::insert(const Field& x) {
const Array& array = doris::vectorized::get<const Array&>(x);
size_t size = array.size();
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/columns/column_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ class ColumnArray final : public COWHelper<IColumn, ColumnArray> {
const uint8_t* __restrict null_data) const override;
void update_crc_with_value(size_t start, size_t end, uint32_t& hash,
const uint8_t* __restrict null_data) const override;
void update_murmur_with_value(size_t start, size_t end, int32_t& hash,
const uint8_t* __restrict null_data) const override;

void update_hashes_with_value(std::vector<SipHash>& hashes,
const uint8_t* __restrict null_data) const override;
Expand All @@ -152,6 +154,10 @@ class ColumnArray final : public COWHelper<IColumn, ColumnArray> {
uint32_t offset = 0,
const uint8_t* __restrict null_data = nullptr) const override;

void update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type, int32_t rows,
uint32_t offset = 0,
const uint8_t* __restrict null_data = nullptr) const override;

void insert_range_from(const IColumn& src, size_t start, size_t length) override;
void insert(const Field& x) override;
void insert_from(const IColumn& src_, size_t n) override;
Expand Down
18 changes: 18 additions & 0 deletions be/src/vec/columns/column_const.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,24 @@ void ColumnConst::update_hashes_with_value(uint64_t* __restrict hashes,
}
}

void ColumnConst::update_murmurs_with_value(int32_t* __restrict hashes, doris::PrimitiveType type,
int32_t rows, uint32_t offset,
const uint8_t* __restrict null_data) const {
DCHECK(null_data == nullptr);
DCHECK(rows == size());
auto real_data = data->get_data_at(0);
if (real_data.data == nullptr) {
for (int i = 0; i < rows; ++i) {
hashes[i] = HashUtil::murmur_hash3_32_null(HashUtil::SPARK_MURMUR_32_SEED);
}
} else {
for (int i = 0; i < rows; ++i) {
hashes[i] = HashUtil::murmur_hash3_32(real_data.data, real_data.size,
HashUtil::SPARK_MURMUR_32_SEED);
}
}
}

MutableColumns ColumnConst::scatter(ColumnIndex num_columns, const Selector& selector) const {
if (s != selector.size()) {
LOG(FATAL) << fmt::format("Size of selector ({}) doesn't match size of column ({})",
Expand Down
9 changes: 9 additions & 0 deletions be/src/vec/columns/column_const.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ class ColumnConst final : public COWHelper<IColumn, ColumnConst> {
get_data_column_ptr()->update_crc_with_value(start, end, hash, nullptr);
}

void update_murmur_with_value(size_t start, size_t end, int32_t& hash,
const uint8_t* __restrict null_data) const override {
get_data_column_ptr()->update_murmur_with_value(start, end, hash, nullptr);
}

void serialize_vec_with_null_map(std::vector<StringRef>& keys, size_t num_rows,
const uint8_t* null_map) const override {
data->serialize_vec_with_null_map(keys, num_rows, null_map);
Expand All @@ -186,6 +191,10 @@ class ColumnConst final : public COWHelper<IColumn, ColumnConst> {
void update_hashes_with_value(uint64_t* __restrict hashes,
const uint8_t* __restrict null_data) const override;

void update_murmurs_with_value(int32_t* __restrict hashes, PrimitiveType type, int32_t rows,
uint32_t offset = 0,
const uint8_t* __restrict null_data = nullptr) const override;

ColumnPtr filter(const Filter& filt, ssize_t result_size_hint) const override;
size_t filter(const Filter& filter) override;

Expand Down
50 changes: 50 additions & 0 deletions be/src/vec/columns/column_decimal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,56 @@ void ColumnDecimal<T>::update_crcs_with_value(uint32_t* __restrict hashes, Primi
}
}

template <typename T>
void ColumnDecimal<T>::update_murmur_with_value(size_t start, size_t end, int32_t& hash,
const uint8_t* __restrict null_data) const {
if (null_data == nullptr) {
for (size_t i = start; i < end; i++) {
if constexpr (!IsDecimalV2<T>) {
hash = HashUtil::murmur_hash3_32(&data[i], sizeof(T),
HashUtil::SPARK_MURMUR_32_SEED);
} else {
decimalv2_do_murmur(i, hash);
}
}
} else {
for (size_t i = start; i < end; i++) {
if (null_data[i] == 0) {
if constexpr (!IsDecimalV2<T>) {
hash = HashUtil::murmur_hash3_32(&data[i], sizeof(T),
HashUtil::SPARK_MURMUR_32_SEED);
} else {
decimalv2_do_murmur(i, hash);
}
}
}
}
}

template <typename T>
void ColumnDecimal<T>::update_murmurs_with_value(int32_t* __restrict hashes, PrimitiveType type,
int32_t rows, uint32_t offset,
const uint8_t* __restrict null_data) const {
auto s = rows;
DCHECK(s == size());

if constexpr (!IsDecimalV2<T>) {
DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL(HashUtil::SPARK_MURMUR_32_SEED)
} else {
if (null_data == nullptr) {
for (size_t i = 0; i < s; i++) {
decimalv2_do_murmur(i, hashes[i]);
}
} else {
for (size_t i = 0; i < s; i++) {
if (null_data[i] == 0) {
decimalv2_do_murmur(i, hashes[i]);
}
}
}
}
}

template <typename T>
void ColumnDecimal<T>::update_xxHash_with_value(size_t start, size_t end, uint64_t& hash,
const uint8_t* __restrict null_data) const {
Expand Down
14 changes: 13 additions & 1 deletion be/src/vec/columns/column_decimal.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,16 @@ class ColumnDecimal final : public COWHelper<ColumnVectorHelper, ColumnDecimal<T
void update_crcs_with_value(uint32_t* __restrict hashes, PrimitiveType type, uint32_t rows,
uint32_t offset,
const uint8_t* __restrict null_data) const override;
void update_murmurs_with_value(int32_t* __restrict hashes, PrimitiveType type, int32_t rows,
uint32_t offset,
const uint8_t* __restrict null_data) const override;

void update_xxHash_with_value(size_t start, size_t end, uint64_t& hash,
const uint8_t* __restrict null_data) const override;
void update_crc_with_value(size_t start, size_t end, uint32_t& hash,
const uint8_t* __restrict null_data) const override;

void update_murmur_with_value(size_t start, size_t end, int32_t& hash,
const uint8_t* __restrict null_data) const override;
int compare_at(size_t n, size_t m, const IColumn& rhs_, int nan_direction_hint) const override;
void get_permutation(bool reverse, size_t limit, int nan_direction_hint,
IColumn::Permutation& res) const override;
Expand Down Expand Up @@ -302,6 +306,14 @@ class ColumnDecimal final : public COWHelper<ColumnVectorHelper, ColumnDecimal<T
hash = HashUtil::zlib_crc_hash(&int_val, sizeof(int_val), hash);
hash = HashUtil::zlib_crc_hash(&frac_val, sizeof(frac_val), hash);
};

void ALWAYS_INLINE decimalv2_do_murmur(size_t i, int32_t& hash) const {
const auto& dec_val = (const DecimalV2Value&)data[i];
int64_t int_val = dec_val.int_value();
int32_t frac_val = dec_val.frac_value();
hash = HashUtil::murmur_hash3_32(&int_val, sizeof(int_val), HashUtil::SPARK_MURMUR_32_SEED);
hash = HashUtil::murmur_hash3_32(&frac_val, sizeof(frac_val), hash);
};
};

template <typename>
Expand Down
54 changes: 54 additions & 0 deletions be/src/vec/columns/column_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,40 @@ void ColumnMap::update_crc_with_value(size_t start, size_t end, uint32_t& hash,
}
}

void ColumnMap::update_murmur_with_value(size_t start, size_t end, int32_t& hash,
const uint8_t* __restrict null_data) const {
auto& offsets = get_offsets();
if (hash == 0) {
hash = HashUtil::SPARK_MURMUR_32_SEED;
}
if (null_data) {
for (size_t i = start; i < end; ++i) {
if (null_data[i] == 0) {
size_t kv_size = offsets[i] - offsets[i - 1];
if (kv_size == 0) {
hash = HashUtil::murmur_hash3_32(reinterpret_cast<const char*>(&kv_size),
sizeof(kv_size), hash);
} else {
get_keys().update_murmur_with_value(offsets[i - 1], offsets[i], hash, nullptr);
get_values().update_murmur_with_value(offsets[i - 1], offsets[i], hash,
nullptr);
}
}
}
} else {
for (size_t i = start; i < end; ++i) {
size_t kv_size = offsets[i] - offsets[i - 1];
if (kv_size == 0) {
hash = HashUtil::murmur_hash3_32(reinterpret_cast<const char*>(&kv_size),
sizeof(kv_size), hash);
} else {
get_keys().update_murmur_with_value(offsets[i - 1], offsets[i], hash, nullptr);
get_values().update_murmur_with_value(offsets[i - 1], offsets[i], hash, nullptr);
}
}
}
}

void ColumnMap::update_hashes_with_value(uint64_t* hashes, const uint8_t* null_data) const {
size_t s = size();
if (null_data) {
Expand Down Expand Up @@ -343,6 +377,26 @@ void ColumnMap::update_crcs_with_value(uint32_t* __restrict hash, PrimitiveType
}
}

void ColumnMap::update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type,
int32_t rows, uint32_t offset,
const uint8_t* __restrict null_data) const {
auto s = rows;
DCHECK(s == size());

if (null_data) {
for (size_t i = 0; i < s; ++i) {
// every row
if (null_data[i] == 0) {
update_murmur_with_value(i, i + 1, hash[i], nullptr);
}
}
} else {
for (size_t i = 0; i < s; ++i) {
update_murmur_with_value(i, i + 1, hash[i], nullptr);
}
}
}

void ColumnMap::insert_range_from(const IColumn& src, size_t start, size_t length) {
if (length == 0) {
return;
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/columns/column_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ class ColumnMap final : public COWHelper<IColumn, ColumnMap> {
const uint8_t* __restrict null_data) const override;
void update_crc_with_value(size_t start, size_t end, uint32_t& hash,
const uint8_t* __restrict null_data) const override;
void update_murmur_with_value(size_t start, size_t end, int32_t& hash,
const uint8_t* __restrict null_data) const override;

void update_hashes_with_value(std::vector<SipHash>& hashes,
const uint8_t* __restrict null_data) const override;
Expand All @@ -193,6 +195,10 @@ class ColumnMap final : public COWHelper<IColumn, ColumnMap> {
uint32_t offset = 0,
const uint8_t* __restrict null_data = nullptr) const override;

void update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type, int32_t rows,
uint32_t offset = 0,
const uint8_t* __restrict null_data = nullptr) const override;

/******************** keys and values ***************/
const ColumnPtr& get_keys_ptr() const { return keys_column; }
ColumnPtr& get_keys_ptr() { return keys_column; }
Expand Down
Loading

0 comments on commit 28039b8

Please sign in to comment.