Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature](datalake) Add BucketShuffleJoin support for bucketed hive tables #27784

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions be/src/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -727,14 +727,22 @@ inline std::string get_exchange_type_name(ExchangeType idx) {
}

struct DataDistribution {
DataDistribution(ExchangeType type) : distribution_type(type) {}
DataDistribution(ExchangeType type) : distribution_type(type), hash_type(THashType::CRC32) {}
DataDistribution(ExchangeType type, const std::vector<TExpr>& partition_exprs_)
: distribution_type(type), partition_exprs(partition_exprs_) {}
: distribution_type(type),
partition_exprs(partition_exprs_),
hash_type(THashType::CRC32) {}
DataDistribution(ExchangeType type, const THashType::type hash_type)
: distribution_type(type), hash_type(hash_type) {}
DataDistribution(ExchangeType type, const std::vector<TExpr>& partition_exprs_,
const THashType::type hash)
: distribution_type(type), partition_exprs(partition_exprs_), hash_type(hash) {}
DataDistribution(const DataDistribution& other) = default;
bool need_local_exchange() const { return distribution_type != ExchangeType::NOOP; }
DataDistribution& operator=(const DataDistribution& other) = default;
ExchangeType distribution_type;
std::vector<TExpr> partition_exprs;
THashType::type hash_type;
};

class ExchangerBase;
Expand Down
47 changes: 43 additions & 4 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf

auto& p = _parent->cast<ExchangeSinkOperatorX>();
_part_type = p._part_type;
_hash_type = p._hash_type;
std::map<int64_t, int64_t> fragment_id_to_channel_index;
for (int i = 0; i < p._dests.size(); ++i) {
const auto& fragment_instance_id = p._dests[i].fragment_instance_id;
Expand Down Expand Up @@ -132,9 +133,18 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
fmt::format("Crc32HashPartitioner({})", _partition_count));
} else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
_partition_count = channels.size();
_partitioner =
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
channels.size());
if (_hash_type == THashType::SPARK_MURMUR32) {
_partitioner.reset(
new vectorized::Murmur32HashPartitioner<vectorized::ShufflePModChannelIds>(
channels.size()));
_profile->add_info_string("Partitioner",
fmt::format("Murmur32HashPartitioner({})", _partition_count));
} else {
_partitioner.reset(new vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
channels.size()));
_profile->add_info_string("Partitioner",
fmt::format("Crc32HashPartitioner({})", _partition_count));
}
RETURN_IF_ERROR(_partitioner->init(p._texprs));
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
_profile->add_info_string("Partitioner",
Expand Down Expand Up @@ -199,6 +209,8 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
RETURN_IF_ERROR(Base::open(state));
_writer.reset(new Writer());
auto& p = _parent->cast<ExchangeSinkOperatorX>();
_part_type = p._part_type;
_hash_type = p._hash_type;

if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM ||
_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
Expand Down Expand Up @@ -268,6 +280,7 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
_texprs(sink.output_partition.partition_exprs),
_row_desc(row_desc),
_part_type(sink.output_partition.type),
_hash_type(sink.output_partition.hash_type),
_dests(destinations),
_dest_node_id(sink.dest_node_id),
_transfer_large_data_by_brpc(config::transfer_large_data_by_brpc),
Expand All @@ -289,6 +302,9 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
sink.output_partition.type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
sink.output_partition.type == TPartitionType::TABLE_SINK_HASH_PARTITIONED ||
sink.output_partition.type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED);
DCHECK(sink.output_partition.hash_type == THashType::CRC32 ||
sink.output_partition.hash_type == THashType::XXHASH64 ||
sink.output_partition.hash_type == THashType::SPARK_MURMUR32);
_name = "ExchangeSinkOperatorX";
_pool = std::make_shared<ObjectPool>();
if (sink.__isset.output_tuple_id) {
Expand All @@ -308,6 +324,28 @@ Status ExchangeSinkOperatorX::init(const TDataSink& tsink) {
return Status::OK();
}

std::string ExchangeSinkOperatorX::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level));

string dest_names;
for (const auto& dest : _dests) {
if (dest_names.empty()) {
dest_names += print_id(dest.fragment_instance_id);
} else {
dest_names += ", " + print_id(dest.fragment_instance_id);
}
}

fmt::format_to(debug_string_buffer,
", Info: (_num_recievers = {}, _dest_node_id = {},"
", _partition_type = {}, _hash_type = {},"
" _destinations = [{}])",
_dests.size(), _dest_node_id, to_string(_part_type), to_string(_hash_type),
dest_names);
return fmt::to_string(debug_string_buffer);
}

Status ExchangeSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<ExchangeSinkLocalState>::open(state));
_state = state;
Expand Down Expand Up @@ -385,7 +423,8 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
if (serialized) {
auto cur_block = local_state._serializer.get_block()->to_block();
if (!cur_block.empty()) {
DCHECK(eos || local_state._serializer.is_local()) << debug_string(state, 0);
DCHECK(eos || local_state._serializer.is_local())
<< Base::debug_string(state, 0);
RETURN_IF_ERROR(local_state._serializer.serialize_block(
&cur_block, block_holder->get_block(),
local_state._rpc_channels_num));
Expand Down
6 changes: 6 additions & 0 deletions be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {

RuntimeProfile::Counter* _add_partition_request_timer = nullptr;
TPartitionType::type _part_type;
THashType::type _hash_type;

std::atomic<bool> _reach_limit = false;
int _last_local_channel_idx = -1;
Expand All @@ -184,13 +185,17 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
};

class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalState> {
using Base = DataSinkOperatorX<ExchangeSinkLocalState>;

public:
ExchangeSinkOperatorX(RuntimeState* state, const RowDescriptor& row_desc, int operator_id,
const TDataStreamSink& sink,
const std::vector<TPlanFragmentDestination>& destinations,
const std::vector<TUniqueId>& fragment_instance_ids);
Status init(const TDataSink& tsink) override;

[[nodiscard]] std::string debug_string(int indentation_level) const override;

RuntimeState* state() { return _state; }

Status open(RuntimeState* state) override;
Expand Down Expand Up @@ -228,6 +233,7 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalSt
TTupleId _output_tuple_id = -1;

TPartitionType::type _part_type;
THashType::type _hash_type;

// serialized batches for broadcasting; we need two so we can write
// one while the other one is still being sent
Expand Down
7 changes: 5 additions & 2 deletions be/src/pipeline/exec/exchange_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ std::string ExchangeSourceOperatorX::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}",
OperatorX<ExchangeLocalState>::debug_string(indentation_level));
fmt::format_to(debug_string_buffer, ", Info: (_num_senders = {}, _is_merging = {})",
_num_senders, _is_merging);
fmt::format_to(debug_string_buffer,
", Info: (_num_senders = {}, _is_merging = {}, _hash_type = {})", _num_senders,
_is_merging, to_string(_hash_type));
return fmt::to_string(debug_string_buffer);
}

Expand Down Expand Up @@ -106,6 +107,8 @@ ExchangeSourceOperatorX::ExchangeSourceOperatorX(ObjectPool* pool, const TPlanNo
_partition_type(tnode.exchange_node.__isset.partition_type
? tnode.exchange_node.partition_type
: TPartitionType::UNPARTITIONED),
_hash_type(tnode.exchange_node.__isset.hash_type ? tnode.exchange_node.hash_type
: THashType::CRC32),
_input_row_desc(descs, tnode.exchange_node.input_row_tuples,
std::vector<bool>(tnode.nullable_tuples.begin(),
tnode.nullable_tuples.begin() +
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/exchange_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class ExchangeSourceOperatorX final : public OperatorX<ExchangeLocalState> {
return _partition_type == TPartitionType::HASH_PARTITIONED
? DataDistribution(ExchangeType::HASH_SHUFFLE)
: _partition_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE)
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _hash_type)
: DataDistribution(ExchangeType::NOOP);
}

Expand All @@ -100,6 +100,7 @@ class ExchangeSourceOperatorX final : public OperatorX<ExchangeLocalState> {
const int _num_senders;
const bool _is_merging;
const TPartitionType::type _partition_type;
const THashType::type _hash_type;
RowDescriptor _input_row_desc;

// use in merge sort
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,8 @@ PartitionedHashJoinSinkOperatorX::PartitionedHashJoinSinkOperatorX(ObjectPool* p
descs),
_join_distribution(tnode.hash_join_node.__isset.dist_type ? tnode.hash_join_node.dist_type
: TJoinDistributionType::NONE),
_hash_type(tnode.hash_join_node.__isset.hash_type ? tnode.hash_join_node.hash_type
: THashType::CRC32),
_distribution_partition_exprs(tnode.__isset.distribute_expr_lists
? tnode.distribute_expr_lists[1]
: std::vector<TExpr> {}),
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class PartitionedHashJoinSinkOperatorX
return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
_join_distribution == TJoinDistributionType::COLOCATE
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE,
_distribution_partition_exprs)
_distribution_partition_exprs, _hash_type)
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_distribution_partition_exprs);
}
Expand All @@ -135,6 +135,7 @@ class PartitionedHashJoinSinkOperatorX
Status _setup_internal_operator(RuntimeState* state);

const TJoinDistributionType::type _join_distribution;
THashType::type _hash_type;

std::vector<TExpr> _build_exprs;

Expand Down
8 changes: 8 additions & 0 deletions be/src/util/hash_util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,21 @@ class HashUtil {
// refer to https://github.com/apache/commons-codec/blob/master/src/main/java/org/apache/commons/codec/digest/MurmurHash3.java
static const uint32_t MURMUR3_32_SEED = 104729;

// refer https://github.com/apache/spark/blob/v3.5.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala#L615
static const uint32_t SPARK_MURMUR_32_SEED = 42;
Nitin-Kashyap marked this conversation as resolved.
Show resolved Hide resolved

// modify from https://github.com/aappleby/smhasher/blob/master/src/MurmurHash3.cpp
static uint32_t murmur_hash3_32(const void* key, int64_t len, uint32_t seed) {
uint32_t out = 0;
murmur_hash3_x86_32(key, len, seed, &out);
return out;
}

static uint32_t murmur_hash3_32_null(uint32_t seed) {
static const int INT_VALUE = 0;
return murmur_hash3_32((const unsigned char*)(&INT_VALUE), 4, seed);
}

static const int MURMUR_R = 47;

// Murmur2 hash implementation returning 64-bit hashes.
Expand Down
27 changes: 27 additions & 0 deletions be/src/vec/columns/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@ class SipHash;
} \
}

#define DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL() \
if (null_data == nullptr) { \
for (size_t i = 0; i < s; i++) { \
hashes[i] = HashUtil::murmur_hash3_32(&data[i], sizeof(T), hashes[i]); \
} \
} else { \
for (size_t i = 0; i < s; i++) { \
if (null_data[i] == 0) \
hashes[i] = HashUtil::murmur_hash3_32(&data[i], sizeof(T), hashes[i]); \
} \
}

namespace doris::vectorized {

class Arena;
Expand Down Expand Up @@ -398,6 +410,21 @@ class IColumn : public COW<IColumn> {
"Method update_crc_with_value is not supported for " + get_name());
}

/// Update state of murmur3 hash function (spark files) with value of n elements to avoid the virtual
/// function call null_data to mark whether need to do hash compute, null_data == nullptr
/// means all element need to do hash function, else only *null_data != 0 need to do hash func
virtual void update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type,
int32_t rows, uint32_t offset = 0,
const uint8_t* __restrict null_data = nullptr) const {
LOG(FATAL) << get_name() << "update_murmurs_with_value not supported";
}

// use range for one hash value to avoid virtual function call in loop
virtual void update_murmur_with_value(size_t start, size_t end, int32_t& hash,
const uint8_t* __restrict null_data) const {
LOG(FATAL) << get_name() << " update_murmur_with_value not supported";
}

/** Removes elements that don't match the filter.
* Is used in WHERE and HAVING operations.
* If result_size_hint > 0, then makes advance reserve(result_size_hint) for the result column;
Expand Down
51 changes: 51 additions & 0 deletions be/src/vec/columns/column_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,57 @@ void ColumnArray::update_crcs_with_value(uint32_t* __restrict hash, PrimitiveTyp
}
}

// for every array row calculate murmurHash
void ColumnArray::update_murmur_with_value(size_t start, size_t end, int32_t& hash,
const uint8_t* __restrict null_data) const {
auto& offsets_column = get_offsets();
if (null_data) {
for (size_t i = start; i < end; ++i) {
if (null_data[i] == 0) {
size_t elem_size = offsets_column[i] - offsets_column[i - 1];
if (elem_size == 0) {
hash = HashUtil::murmur_hash3_32(reinterpret_cast<const char*>(&elem_size),
sizeof(elem_size), hash);
} else {
get_data().update_murmur_with_value(offsets_column[i - 1], offsets_column[i],
hash, nullptr);
}
}
}
} else {
for (size_t i = start; i < end; ++i) {
size_t elem_size = offsets_column[i] - offsets_column[i - 1];
if (elem_size == 0) {
hash = HashUtil::murmur_hash3_32(reinterpret_cast<const char*>(&elem_size),
sizeof(elem_size), hash);
} else {
get_data().update_murmur_with_value(offsets_column[i - 1], offsets_column[i], hash,
nullptr);
}
}
}
}

void ColumnArray::update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type,
int32_t rows, uint32_t offset,
const uint8_t* __restrict null_data) const {
auto s = rows;
DCHECK(s == size());

if (null_data) {
for (size_t i = 0; i < s; ++i) {
// every row
if (null_data[i] == 0) {
update_murmur_with_value(i, i + 1, hash[i], nullptr);
}
}
} else {
for (size_t i = 0; i < s; ++i) {
update_murmur_with_value(i, i + 1, hash[i], nullptr);
}
}
}

void ColumnArray::insert(const Field& x) {
if (x.is_null()) {
get_data().insert(Null());
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/columns/column_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ class ColumnArray final : public COWHelper<IColumn, ColumnArray> {
const uint8_t* __restrict null_data) const override;
void update_crc_with_value(size_t start, size_t end, uint32_t& hash,
const uint8_t* __restrict null_data) const override;
void update_murmur_with_value(size_t start, size_t end, int32_t& hash,
const uint8_t* __restrict null_data) const override;

void update_hashes_with_value(uint64_t* __restrict hashes,
const uint8_t* __restrict null_data = nullptr) const override;
Expand All @@ -148,6 +150,10 @@ class ColumnArray final : public COWHelper<IColumn, ColumnArray> {
uint32_t offset = 0,
const uint8_t* __restrict null_data = nullptr) const override;

void update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type, int32_t rows,
uint32_t offset = 0,
const uint8_t* __restrict null_data = nullptr) const override;

void insert_range_from(const IColumn& src, size_t start, size_t length) override;
void insert_range_from_ignore_overflow(const IColumn& src, size_t start,
size_t length) override;
Expand Down
Loading
Loading