Skip to content

Commit

Permalink
[opt]fixed partitioners, local_exchange and murmur-hash calculations
Browse files Browse the repository at this point in the history
  • Loading branch information
Nitin-Kashyap committed Dec 31, 2024
1 parent 7fdb0a6 commit 3780f23
Show file tree
Hide file tree
Showing 39 changed files with 397 additions and 219 deletions.
12 changes: 10 additions & 2 deletions be/src/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -727,14 +727,22 @@ inline std::string get_exchange_type_name(ExchangeType idx) {
}

struct DataDistribution {
DataDistribution(ExchangeType type) : distribution_type(type) {}
DataDistribution(ExchangeType type) : distribution_type(type), hash_type(THashType::CRC32) {}
DataDistribution(ExchangeType type, const std::vector<TExpr>& partition_exprs_)
: distribution_type(type), partition_exprs(partition_exprs_) {}
: distribution_type(type),
partition_exprs(partition_exprs_),
hash_type(THashType::CRC32) {}
DataDistribution(ExchangeType type, const THashType::type hash_type)
: distribution_type(type), hash_type(hash_type) {}
DataDistribution(ExchangeType type, const std::vector<TExpr>& partition_exprs_,
const THashType::type hash)
: distribution_type(type), partition_exprs(partition_exprs_), hash_type(hash) {}
DataDistribution(const DataDistribution& other) = default;
bool need_local_exchange() const { return distribution_type != ExchangeType::NOOP; }
DataDistribution& operator=(const DataDistribution& other) = default;
ExchangeType distribution_type;
std::vector<TExpr> partition_exprs;
THashType::type hash_type;
};

class ExchangerBase;
Expand Down
47 changes: 43 additions & 4 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf

auto& p = _parent->cast<ExchangeSinkOperatorX>();
_part_type = p._part_type;
_hash_type = p._hash_type;
std::map<int64_t, int64_t> fragment_id_to_channel_index;
for (int i = 0; i < p._dests.size(); ++i) {
const auto& fragment_instance_id = p._dests[i].fragment_instance_id;
Expand Down Expand Up @@ -132,9 +133,18 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
fmt::format("Crc32HashPartitioner({})", _partition_count));
} else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
_partition_count = channels.size();
_partitioner =
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
channels.size());
if (_hash_type == THashType::SPARK_MURMUR32) {
_partitioner.reset(
new vectorized::Murmur32HashPartitioner<vectorized::ShufflePModChannelIds>(
channels.size()));
_profile->add_info_string("Partitioner",
fmt::format("Murmur32HashPartitioner({})", _partition_count));
} else {
_partitioner.reset(new vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
channels.size()));
_profile->add_info_string("Partitioner",
fmt::format("Crc32HashPartitioner({})", _partition_count));
}
RETURN_IF_ERROR(_partitioner->init(p._texprs));
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
_profile->add_info_string("Partitioner",
Expand Down Expand Up @@ -199,6 +209,8 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
RETURN_IF_ERROR(Base::open(state));
_writer.reset(new Writer());
auto& p = _parent->cast<ExchangeSinkOperatorX>();
_part_type = p._part_type;
_hash_type = p._hash_type;

if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM ||
_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
Expand Down Expand Up @@ -268,6 +280,7 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
_texprs(sink.output_partition.partition_exprs),
_row_desc(row_desc),
_part_type(sink.output_partition.type),
_hash_type(sink.output_partition.hash_type),
_dests(destinations),
_dest_node_id(sink.dest_node_id),
_transfer_large_data_by_brpc(config::transfer_large_data_by_brpc),
Expand All @@ -289,6 +302,9 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
sink.output_partition.type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
sink.output_partition.type == TPartitionType::TABLE_SINK_HASH_PARTITIONED ||
sink.output_partition.type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED);
DCHECK(sink.output_partition.hash_type == THashType::CRC32 ||
sink.output_partition.hash_type == THashType::XXHASH64 ||
sink.output_partition.hash_type == THashType::SPARK_MURMUR32);
_name = "ExchangeSinkOperatorX";
_pool = std::make_shared<ObjectPool>();
if (sink.__isset.output_tuple_id) {
Expand All @@ -308,6 +324,28 @@ Status ExchangeSinkOperatorX::init(const TDataSink& tsink) {
return Status::OK();
}

std::string ExchangeSinkOperatorX::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level));

string dest_names;
for (const auto& dest : _dests) {
if (dest_names.empty()) {
dest_names += print_id(dest.fragment_instance_id);
} else {
dest_names += ", " + print_id(dest.fragment_instance_id);
}
}

fmt::format_to(debug_string_buffer,
", Info: (_num_recievers = {}, _dest_node_id = {},"
", _partition_type = {}, _hash_type = {},"
" _destinations = [{}])",
_dests.size(), _dest_node_id, to_string(_part_type), to_string(_hash_type),
dest_names);
return fmt::to_string(debug_string_buffer);
}

Status ExchangeSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<ExchangeSinkLocalState>::open(state));
_state = state;
Expand Down Expand Up @@ -385,7 +423,8 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
if (serialized) {
auto cur_block = local_state._serializer.get_block()->to_block();
if (!cur_block.empty()) {
DCHECK(eos || local_state._serializer.is_local()) << debug_string(state, 0);
DCHECK(eos || local_state._serializer.is_local())
<< Base::debug_string(state, 0);
RETURN_IF_ERROR(local_state._serializer.serialize_block(
&cur_block, block_holder->get_block(),
local_state._rpc_channels_num));
Expand Down
6 changes: 6 additions & 0 deletions be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {

RuntimeProfile::Counter* _add_partition_request_timer = nullptr;
TPartitionType::type _part_type;
THashType::type _hash_type;

std::atomic<bool> _reach_limit = false;
int _last_local_channel_idx = -1;
Expand All @@ -184,13 +185,17 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
};

class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalState> {
using Base = DataSinkOperatorX<ExchangeSinkLocalState>;

public:
ExchangeSinkOperatorX(RuntimeState* state, const RowDescriptor& row_desc, int operator_id,
const TDataStreamSink& sink,
const std::vector<TPlanFragmentDestination>& destinations,
const std::vector<TUniqueId>& fragment_instance_ids);
Status init(const TDataSink& tsink) override;

[[nodiscard]] std::string debug_string(int indentation_level) const override;

RuntimeState* state() { return _state; }

Status open(RuntimeState* state) override;
Expand Down Expand Up @@ -228,6 +233,7 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalSt
TTupleId _output_tuple_id = -1;

TPartitionType::type _part_type;
THashType::type _hash_type;

// serialized batches for broadcasting; we need two so we can write
// one while the other one is still being sent
Expand Down
7 changes: 5 additions & 2 deletions be/src/pipeline/exec/exchange_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ std::string ExchangeSourceOperatorX::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}",
OperatorX<ExchangeLocalState>::debug_string(indentation_level));
fmt::format_to(debug_string_buffer, ", Info: (_num_senders = {}, _is_merging = {})",
_num_senders, _is_merging);
fmt::format_to(debug_string_buffer,
", Info: (_num_senders = {}, _is_merging = {}, _hash_type = {})", _num_senders,
_is_merging, to_string(_hash_type));
return fmt::to_string(debug_string_buffer);
}

Expand Down Expand Up @@ -106,6 +107,8 @@ ExchangeSourceOperatorX::ExchangeSourceOperatorX(ObjectPool* pool, const TPlanNo
_partition_type(tnode.exchange_node.__isset.partition_type
? tnode.exchange_node.partition_type
: TPartitionType::UNPARTITIONED),
_hash_type(tnode.exchange_node.__isset.hash_type ? tnode.exchange_node.hash_type
: THashType::CRC32),
_input_row_desc(descs, tnode.exchange_node.input_row_tuples,
std::vector<bool>(tnode.nullable_tuples.begin(),
tnode.nullable_tuples.begin() +
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/exchange_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class ExchangeSourceOperatorX final : public OperatorX<ExchangeLocalState> {
return _partition_type == TPartitionType::HASH_PARTITIONED
? DataDistribution(ExchangeType::HASH_SHUFFLE)
: _partition_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE)
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _hash_type)
: DataDistribution(ExchangeType::NOOP);
}

Expand All @@ -100,6 +100,7 @@ class ExchangeSourceOperatorX final : public OperatorX<ExchangeLocalState> {
const int _num_senders;
const bool _is_merging;
const TPartitionType::type _partition_type;
const THashType::type _hash_type;
RowDescriptor _input_row_desc;

// use in merge sort
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,8 @@ PartitionedHashJoinSinkOperatorX::PartitionedHashJoinSinkOperatorX(ObjectPool* p
descs),
_join_distribution(tnode.hash_join_node.__isset.dist_type ? tnode.hash_join_node.dist_type
: TJoinDistributionType::NONE),
_hash_type(tnode.hash_join_node.__isset.hash_type ? tnode.hash_join_node.hash_type
: THashType::CRC32),
_distribution_partition_exprs(tnode.__isset.distribute_expr_lists
? tnode.distribute_expr_lists[1]
: std::vector<TExpr> {}),
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class PartitionedHashJoinSinkOperatorX
return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
_join_distribution == TJoinDistributionType::COLOCATE
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE,
_distribution_partition_exprs)
_distribution_partition_exprs, _hash_type)
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_distribution_partition_exprs);
}
Expand All @@ -135,6 +135,7 @@ class PartitionedHashJoinSinkOperatorX
Status _setup_internal_operator(RuntimeState* state);

const TJoinDistributionType::type _join_distribution;
THashType::type _hash_type;

std::vector<TExpr> _build_exprs;

Expand Down
20 changes: 10 additions & 10 deletions be/src/vec/columns/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,16 @@ class SipHash;
} \
}

#define DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL(SEED) \
if (null_data == nullptr) { \
for (size_t i = 0; i < s; i++) { \
hashes[i] = HashUtil::murmur_hash3_32(&data[i], sizeof(T), SEED); \
} \
} else { \
for (size_t i = 0; i < s; i++) { \
if (null_data[i] == 0) \
hashes[i] = HashUtil::murmur_hash3_32(&data[i], sizeof(T), SEED); \
} \
#define DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL() \
if (null_data == nullptr) { \
for (size_t i = 0; i < s; i++) { \
hashes[i] = HashUtil::murmur_hash3_32(&data[i], sizeof(T), hashes[i]); \
} \
} else { \
for (size_t i = 0; i < s; i++) { \
if (null_data[i] == 0) \
hashes[i] = HashUtil::murmur_hash3_32(&data[i], sizeof(T), hashes[i]); \
} \
}

namespace doris::vectorized {
Expand Down
3 changes: 0 additions & 3 deletions be/src/vec/columns/column_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,6 @@ void ColumnArray::update_crcs_with_value(uint32_t* __restrict hash, PrimitiveTyp
void ColumnArray::update_murmur_with_value(size_t start, size_t end, int32_t& hash,
const uint8_t* __restrict null_data) const {
auto& offsets_column = get_offsets();
if (hash == 0) {
hash = HashUtil::SPARK_MURMUR_32_SEED;
}
if (null_data) {
for (size_t i = start; i < end; ++i) {
if (null_data[i] == 0) {
Expand Down
8 changes: 3 additions & 5 deletions be/src/vec/columns/column_decimal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,7 @@ void ColumnDecimal<T>::update_murmur_with_value(size_t start, size_t end, int32_
if (null_data == nullptr) {
for (size_t i = start; i < end; i++) {
if constexpr (!IsDecimalV2<T>) {
hash = HashUtil::murmur_hash3_32(&data[i], sizeof(T),
HashUtil::SPARK_MURMUR_32_SEED);
hash = HashUtil::murmur_hash3_32(&data[i], sizeof(T), hash);
} else {
decimalv2_do_murmur(i, hash);
}
Expand All @@ -199,8 +198,7 @@ void ColumnDecimal<T>::update_murmur_with_value(size_t start, size_t end, int32_
for (size_t i = start; i < end; i++) {
if (null_data[i] == 0) {
if constexpr (!IsDecimalV2<T>) {
hash = HashUtil::murmur_hash3_32(&data[i], sizeof(T),
HashUtil::SPARK_MURMUR_32_SEED);
hash = HashUtil::murmur_hash3_32(&data[i], sizeof(T), hash);
} else {
decimalv2_do_murmur(i, hash);
}
Expand All @@ -217,7 +215,7 @@ void ColumnDecimal<T>::update_murmurs_with_value(int32_t* __restrict hashes, Pri
DCHECK(s == size());

if constexpr (!IsDecimalV2<T>) {
DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL(HashUtil::SPARK_MURMUR_32_SEED)
DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL()
} else {
if (null_data == nullptr) {
for (size_t i = 0; i < s; i++) {
Expand Down
3 changes: 0 additions & 3 deletions be/src/vec/columns/column_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,6 @@ void ColumnMap::update_crc_with_value(size_t start, size_t end, uint32_t& hash,
void ColumnMap::update_murmur_with_value(size_t start, size_t end, int32_t& hash,
const uint8_t* __restrict null_data) const {
auto& offsets = get_offsets();
if (hash == 0) {
hash = HashUtil::SPARK_MURMUR_32_SEED;
}
if (null_data) {
for (size_t i = start; i < end; ++i) {
if (null_data[i] == 0) {
Expand Down
9 changes: 4 additions & 5 deletions be/src/vec/columns/column_nullable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,8 @@ void ColumnNullable::update_murmur_with_value(size_t start, size_t end, int32_t&
nested_column->update_murmur_with_value(start, end, hash, nullptr);
} else {
const auto* __restrict real_null_data =
assert_cast<const ColumnUInt8&>(*null_map).get_data().data();
hash = HashUtil::SPARK_MURMUR_32_SEED;
for (int i = start; i < end; ++i) {
assert_cast<const ColumnUInt8&>(get_null_map_column()).get_data().data();
for (size_t i = start; i < end; ++i) {
if (real_null_data[i] != 0) {
hash = HashUtil::murmur_hash3_32_null(hash);
}
Expand Down Expand Up @@ -141,13 +140,13 @@ void ColumnNullable::update_murmurs_with_value(int32_t* __restrict hashes,
auto s = rows;
DCHECK(s == size());
const auto* __restrict real_null_data =
assert_cast<const ColumnUInt8&>(*null_map).get_data().data();
assert_cast<const ColumnUInt8&>(get_null_map_column()).get_data().data();
if (!has_null()) {
nested_column->update_murmurs_with_value(hashes, type, rows, offset, nullptr);
} else {
for (int i = 0; i < s; ++i) {
if (real_null_data[i] != 0) {
hashes[i] = HashUtil::murmur_hash3_32_null(HashUtil::SPARK_MURMUR_32_SEED);
hashes[i] = HashUtil::murmur_hash3_32_null(hashes[i]);
}
}
nested_column->update_murmurs_with_value(hashes, type, rows, offset, real_null_data);
Expand Down
9 changes: 4 additions & 5 deletions be/src/vec/columns/column_string.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ void ColumnStr<T>::update_crcs_with_value(uint32_t* __restrict hashes, doris::Pr
}
}

void ColumnString::update_murmurs_with_value(int32_t* __restrict hashes, doris::PrimitiveType type,
template <typename T>
void ColumnStr<T>::update_murmurs_with_value(int32_t* __restrict hashes, doris::PrimitiveType type,
int32_t rows, uint32_t offset,
const uint8_t* __restrict null_data) const {
auto s = rows;
Expand All @@ -295,15 +296,13 @@ void ColumnString::update_murmurs_with_value(int32_t* __restrict hashes, doris::
if (null_data == nullptr) {
for (size_t i = 0; i < s; i++) {
auto data_ref = get_data_at(i);
hashes[i] = HashUtil::murmur_hash3_32(data_ref.data, data_ref.size,
HashUtil::SPARK_MURMUR_32_SEED);
hashes[i] = HashUtil::murmur_hash3_32(data_ref.data, data_ref.size, hashes[i]);
}
} else {
for (size_t i = 0; i < s; i++) {
if (null_data[i] == 0) {
auto data_ref = get_data_at(i);
hashes[i] = HashUtil::murmur_hash3_32(data_ref.data, data_ref.size,
HashUtil::SPARK_MURMUR_32_SEED);
hashes[i] = HashUtil::murmur_hash3_32(data_ref.data, data_ref.size, hashes[i]);
}
}
}
Expand Down
6 changes: 2 additions & 4 deletions be/src/vec/columns/column_string.h
Original file line number Diff line number Diff line change
Expand Up @@ -433,15 +433,13 @@ class ColumnStr final : public COWHelper<IColumn, ColumnStr<T>> {
for (size_t i = start; i < end; ++i) {
if (null_data[i] == 0) {
auto data_ref = get_data_at(i);
hash = HashUtil::murmur_hash3_32(data_ref.data, data_ref.size,
HashUtil::SPARK_MURMUR_32_SEED);
hash = HashUtil::murmur_hash3_32(data_ref.data, data_ref.size, hash);
}
}
} else {
for (size_t i = start; i < end; ++i) {
auto data_ref = get_data_at(i);
hash = HashUtil::murmur_hash3_32(data_ref.data, data_ref.size,
HashUtil::SPARK_MURMUR_32_SEED);
hash = HashUtil::murmur_hash3_32(data_ref.data, data_ref.size, hash);
}
}
}
Expand Down
Loading

0 comments on commit 3780f23

Please sign in to comment.