diff --git a/be/src/http/http_channel.cpp b/be/src/http/http_channel.cpp index 312f1ab9286909f..bfd173236ea2caa 100644 --- a/be/src/http/http_channel.cpp +++ b/be/src/http/http_channel.cpp @@ -123,7 +123,7 @@ void HttpChannel::send_files(HttpRequest* request, const std::string& root_dir, VLOG_DEBUG << "http channel send file " << file_path << ", size: " << file_size; evbuffer_add_printf(evb.get(), "File-Name: %s\r\n", file.c_str()); - evbuffer_add_printf(evb.get(), "Content-Length: %ld\r\n", file_size); + evbuffer_add_printf(evb.get(), "Content-Length: %lld\r\n", file_size); evbuffer_add_printf(evb.get(), "\r\n"); if (file_size > 0) { evbuffer_add_file(evb.get(), fd, 0, file_size); diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index f1cfe2b02977e12..de52f94d6bb1afd 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -728,14 +728,22 @@ inline std::string get_exchange_type_name(ExchangeType idx) { } struct DataDistribution { - DataDistribution(ExchangeType type) : distribution_type(type) {} + DataDistribution(ExchangeType type) : distribution_type(type), hash_type(THashType::CRC32) {} DataDistribution(ExchangeType type, const std::vector& partition_exprs_) - : distribution_type(type), partition_exprs(partition_exprs_) {} + : distribution_type(type), + partition_exprs(partition_exprs_), + hash_type(THashType::CRC32) {} + DataDistribution(ExchangeType type, const THashType::type hash_type) + : distribution_type(type), hash_type(hash_type) {} + DataDistribution(ExchangeType type, const std::vector& partition_exprs_, + const THashType::type hash) + : distribution_type(type), partition_exprs(partition_exprs_), hash_type(hash) {} DataDistribution(const DataDistribution& other) = default; bool need_local_exchange() const { return distribution_type != ExchangeType::NOOP; } DataDistribution& operator=(const DataDistribution& other) = default; ExchangeType distribution_type; std::vector partition_exprs; + THashType::type hash_type; }; class ExchangerBase; diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index aa893fc0a26f2eb..02b2188c4373668 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -79,6 +79,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf auto& p = _parent->cast(); _part_type = p._part_type; + _hash_type = p._hash_type; std::map fragment_id_to_channel_index; for (int i = 0; i < p._dests.size(); ++i) { const auto& fragment_instance_id = p._dests[i].fragment_instance_id; @@ -132,9 +133,18 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf fmt::format("Crc32HashPartitioner({})", _partition_count)); } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { _partition_count = channels.size(); - _partitioner = - std::make_unique>( - channels.size()); + if (_hash_type == THashType::SPARK_MURMUR32) { + _partitioner.reset( + new vectorized::Murmur32HashPartitioner( + channels.size())); + _profile->add_info_string("Partitioner", + fmt::format("Murmur32HashPartitioner({})", _partition_count)); + } else { + _partitioner.reset(new vectorized::Crc32HashPartitioner( + channels.size())); + _profile->add_info_string("Partitioner", + fmt::format("Crc32HashPartitioner({})", _partition_count)); + } RETURN_IF_ERROR(_partitioner->init(p._texprs)); RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); _profile->add_info_string("Partitioner", @@ -199,6 +209,8 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { RETURN_IF_ERROR(Base::open(state)); _writer.reset(new Writer()); auto& p = _parent->cast(); + _part_type = p._part_type; + _hash_type = p._hash_type; if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM || _part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) { @@ -243,6 +255,100 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { } } } + if (_part_type == TPartitionType::HASH_PARTITIONED) { + _partition_count = channels.size(); + _partitioner = + std::make_unique>( + channels.size()); + RETURN_IF_ERROR(_partitioner->init(p._texprs)); + RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); + _profile->add_info_string("Partitioner", + fmt::format("Crc32HashPartitioner({})", _partition_count)); + } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { + _partition_count = channels.size(); + if (_hash_type == THashType::SPARK_MURMUR32) { + _partitioner = std::make_unique< + vectorized::Murmur32HashPartitioner>( + channels.size()); + _profile->add_info_string("Partitioner", + fmt::format("Murmur32HashPartitioner({})", _partition_count)); + } else { + _partitioner = std::make_unique< + vectorized::Crc32HashPartitioner>( + channels.size()); + _profile->add_info_string("Partitioner", + fmt::format("Crc32HashPartitioner({})", _partition_count)); + } + RETURN_IF_ERROR(_partitioner->init(p._texprs)); + RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); + + } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { + _partition_count = channels.size(); + _profile->add_info_string("Partitioner", + fmt::format("Crc32HashPartitioner({})", _partition_count)); + _txn_id = p._tablet_sink_txn_id; + _schema = std::make_shared(); + RETURN_IF_ERROR(_schema->init(p._tablet_sink_schema)); + _vpartition = std::make_unique(_schema, p._tablet_sink_partition); + RETURN_IF_ERROR(_vpartition->init()); + auto find_tablet_mode = vectorized::OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW; + _tablet_finder = + std::make_unique(_vpartition.get(), find_tablet_mode); + _tablet_sink_tuple_desc = _state->desc_tbl().get_tuple_descriptor(p._tablet_sink_tuple_id); + _tablet_sink_row_desc = p._pool->add(new RowDescriptor(_tablet_sink_tuple_desc, false)); + _tablet_sink_expr_ctxs.resize(p._tablet_sink_expr_ctxs.size()); + for (size_t i = 0; i < _tablet_sink_expr_ctxs.size(); i++) { + RETURN_IF_ERROR(p._tablet_sink_expr_ctxs[i]->clone(state, _tablet_sink_expr_ctxs[i])); + } + // if _part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED, we handle the processing of auto_increment column + // on exchange node rather than on TabletWriter + _block_convertor = + std::make_unique(_tablet_sink_tuple_desc); + _block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(), + _state->batch_size()); + _location = p._pool->add(new OlapTableLocationParam(p._tablet_sink_location)); + _row_distribution.init( + {.state = _state, + .block_convertor = _block_convertor.get(), + .tablet_finder = _tablet_finder.get(), + .vpartition = _vpartition.get(), + .add_partition_request_timer = _add_partition_request_timer, + .txn_id = _txn_id, + .pool = p._pool.get(), + .location = _location, + .vec_output_expr_ctxs = &_tablet_sink_expr_ctxs, + .schema = _schema, + .caller = (void*)this, + .create_partition_callback = &ExchangeSinkLocalState::empty_callback_function}); + } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) { + _partition_count = + channels.size() * config::table_sink_partition_write_max_partition_nums_per_writer; + _partitioner = + std::make_unique>( + _partition_count); + _partition_function = std::make_unique(_partitioner.get()); + + scale_writer_partitioning_exchanger = std::make_unique< + vectorized::ScaleWriterPartitioningExchanger>( + channels.size(), *_partition_function, _partition_count, channels.size(), 1, + config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold / + state->task_num() == + 0 + ? config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold + : config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold / + state->task_num(), + config::table_sink_partition_write_min_data_processed_rebalance_threshold / + state->task_num() == + 0 + ? config::table_sink_partition_write_min_data_processed_rebalance_threshold + : config::table_sink_partition_write_min_data_processed_rebalance_threshold / + state->task_num()); + + RETURN_IF_ERROR(_partitioner->init(p._texprs)); + RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); + _profile->add_info_string("Partitioner", + fmt::format("Crc32HashPartitioner({})", _partition_count)); + } if (_part_type == TPartitionType::HASH_PARTITIONED || _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED || @@ -273,6 +379,7 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX( _texprs(sink.output_partition.partition_exprs), _row_desc(row_desc), _part_type(sink.output_partition.type), + _hash_type(sink.output_partition.hash_type), _dests(destinations), _dest_node_id(sink.dest_node_id), _transfer_large_data_by_brpc(config::transfer_large_data_by_brpc), @@ -293,6 +400,9 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX( sink.output_partition.type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED || sink.output_partition.type == TPartitionType::TABLE_SINK_HASH_PARTITIONED || sink.output_partition.type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED); + DCHECK(sink.output_partition.hash_type == THashType::CRC32 || + sink.output_partition.hash_type == THashType::XXHASH64 || + sink.output_partition.hash_type == THashType::SPARK_MURMUR32); _name = "ExchangeSinkOperatorX"; _pool = std::make_shared(); if (sink.__isset.output_tuple_id) { @@ -312,6 +422,28 @@ Status ExchangeSinkOperatorX::init(const TDataSink& tsink) { return Status::OK(); } +std::string ExchangeSinkOperatorX::debug_string(int indentation_level) const { + fmt::memory_buffer debug_string_buffer; + fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level)); + + string dest_names; + for (const auto& dest : _dests) { + if (dest_names.empty()) { + dest_names += print_id(dest.fragment_instance_id); + } else { + dest_names += ", " + print_id(dest.fragment_instance_id); + } + } + + fmt::format_to(debug_string_buffer, + ", Info: (_num_recievers = {}, _dest_node_id = {}," + ", _partition_type = {}, _hash_type = {}," + " _destinations = [{}])", + _dests.size(), _dest_node_id, to_string(_part_type), to_string(_hash_type), + dest_names); + return fmt::to_string(debug_string_buffer); +} + Status ExchangeSinkOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::open(state)); _state = state; @@ -389,7 +521,8 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block if (serialized) { auto cur_block = local_state._serializer.get_block()->to_block(); if (!cur_block.empty()) { - DCHECK(eos || local_state._serializer.is_local()) << debug_string(state, 0); + DCHECK(eos || local_state._serializer.is_local()) + << Base::debug_string(state, 0); RETURN_IF_ERROR(local_state._serializer.serialize_block( &cur_block, block_holder->get_block(), local_state._rpc_channels_num)); diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index e88389b1d7bb5a4..bbc71125cd50276 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -173,6 +173,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { RuntimeProfile::Counter* _add_partition_request_timer = nullptr; TPartitionType::type _part_type; + THashType::type _hash_type; std::atomic _reach_limit = false; int _last_local_channel_idx = -1; @@ -183,6 +184,8 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { }; class ExchangeSinkOperatorX final : public DataSinkOperatorX { + using Base = DataSinkOperatorX; + public: ExchangeSinkOperatorX(RuntimeState* state, const RowDescriptor& row_desc, int operator_id, const TDataStreamSink& sink, @@ -190,6 +193,8 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX& fragment_instance_ids); Status init(const TDataSink& tsink) override; + [[nodiscard]] std::string debug_string(int indentation_level) const override; + RuntimeState* state() { return _state; } Status open(RuntimeState* state) override; @@ -228,6 +233,7 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX::debug_string(indentation_level)); - fmt::format_to(debug_string_buffer, ", Info: (_num_senders = {}, _is_merging = {})", - _num_senders, _is_merging); + fmt::format_to(debug_string_buffer, + ", Info: (_num_senders = {}, _is_merging = {}, _hash_type = {})", _num_senders, + _is_merging, to_string(_hash_type)); return fmt::to_string(debug_string_buffer); } @@ -106,6 +107,8 @@ ExchangeSourceOperatorX::ExchangeSourceOperatorX(ObjectPool* pool, const TPlanNo _partition_type(tnode.exchange_node.__isset.partition_type ? tnode.exchange_node.partition_type : TPartitionType::UNPARTITIONED), + _hash_type(tnode.exchange_node.__isset.hash_type ? tnode.exchange_node.hash_type + : THashType::CRC32), _input_row_desc(descs, tnode.exchange_node.input_row_tuples, std::vector(tnode.nullable_tuples.begin(), tnode.nullable_tuples.begin() + diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h index f938f5007d16430..f52a76c5e6d9c0f 100644 --- a/be/src/pipeline/exec/exchange_source_operator.h +++ b/be/src/pipeline/exec/exchange_source_operator.h @@ -90,7 +90,7 @@ class ExchangeSourceOperatorX final : public OperatorX { return _partition_type == TPartitionType::HASH_PARTITIONED ? DataDistribution(ExchangeType::HASH_SHUFFLE) : _partition_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED - ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE) + ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _hash_type) : DataDistribution(ExchangeType::NOOP); } @@ -99,6 +99,7 @@ class ExchangeSourceOperatorX final : public OperatorX { const int _num_senders; const bool _is_merging; const TPartitionType::type _partition_type; + const THashType::type _hash_type; RowDescriptor _input_row_desc; // use in merge sort diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 878c3870946f1c4..4556a1b47b14104 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -402,6 +402,8 @@ PartitionedHashJoinSinkOperatorX::PartitionedHashJoinSinkOperatorX(ObjectPool* p descs), _join_distribution(tnode.hash_join_node.__isset.dist_type ? tnode.hash_join_node.dist_type : TJoinDistributionType::NONE), + _hash_type(tnode.hash_join_node.__isset.hash_type ? tnode.hash_join_node.hash_type + : THashType::CRC32), _distribution_partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[1] : std::vector {}), diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index d1fe30e06f2dd2c..74dba0a4c64372f 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -109,7 +109,7 @@ class PartitionedHashJoinSinkOperatorX return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE || _join_distribution == TJoinDistributionType::COLOCATE ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, - _distribution_partition_exprs) + _distribution_partition_exprs, _hash_type) : DataDistribution(ExchangeType::HASH_SHUFFLE, _distribution_partition_exprs); } @@ -134,6 +134,7 @@ class PartitionedHashJoinSinkOperatorX Status _setup_internal_operator(RuntimeState* state); const TJoinDistributionType::type _join_distribution; + THashType::type _hash_type; std::vector _build_exprs; diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h index 917a8283d5787b1..9078731ee438ec0 100644 --- a/be/src/vec/columns/column.h +++ b/be/src/vec/columns/column.h @@ -57,16 +57,16 @@ class SipHash; } \ } -#define DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL(SEED) \ - if (null_data == nullptr) { \ - for (size_t i = 0; i < s; i++) { \ - hashes[i] = HashUtil::murmur_hash3_32(&data[i], sizeof(T), SEED); \ - } \ - } else { \ - for (size_t i = 0; i < s; i++) { \ - if (null_data[i] == 0) \ - hashes[i] = HashUtil::murmur_hash3_32(&data[i], sizeof(T), SEED); \ - } \ +#define DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL() \ + if (null_data == nullptr) { \ + for (size_t i = 0; i < s; i++) { \ + hashes[i] = HashUtil::murmur_hash3_32(&data[i], sizeof(T), hashes[i]); \ + } \ + } else { \ + for (size_t i = 0; i < s; i++) { \ + if (null_data[i] == 0) \ + hashes[i] = HashUtil::murmur_hash3_32(&data[i], sizeof(T), hashes[i]); \ + } \ } namespace doris::vectorized { diff --git a/be/src/vec/columns/column_array.cpp b/be/src/vec/columns/column_array.cpp index 814aea9d6af83f5..16a3fc256559b14 100644 --- a/be/src/vec/columns/column_array.cpp +++ b/be/src/vec/columns/column_array.cpp @@ -323,9 +323,6 @@ void ColumnArray::update_crcs_with_value(uint32_t* __restrict hash, PrimitiveTyp void ColumnArray::update_murmur_with_value(size_t start, size_t end, int32_t& hash, const uint8_t* __restrict null_data) const { auto& offsets_column = get_offsets(); - if (hash == 0) { - hash = HashUtil::SPARK_MURMUR_32_SEED; - } if (null_data) { for (size_t i = start; i < end; ++i) { if (null_data[i] == 0) { diff --git a/be/src/vec/columns/column_decimal.cpp b/be/src/vec/columns/column_decimal.cpp index 5b45b96afe628ec..78c0d9162bbc2c6 100644 --- a/be/src/vec/columns/column_decimal.cpp +++ b/be/src/vec/columns/column_decimal.cpp @@ -189,8 +189,7 @@ void ColumnDecimal::update_murmur_with_value(size_t start, size_t end, int32_ if (null_data == nullptr) { for (size_t i = start; i < end; i++) { if constexpr (!IsDecimalV2) { - hash = HashUtil::murmur_hash3_32(&data[i], sizeof(T), - HashUtil::SPARK_MURMUR_32_SEED); + hash = HashUtil::murmur_hash3_32(&data[i], sizeof(T), hash); } else { decimalv2_do_murmur(i, hash); } @@ -199,8 +198,7 @@ void ColumnDecimal::update_murmur_with_value(size_t start, size_t end, int32_ for (size_t i = start; i < end; i++) { if (null_data[i] == 0) { if constexpr (!IsDecimalV2) { - hash = HashUtil::murmur_hash3_32(&data[i], sizeof(T), - HashUtil::SPARK_MURMUR_32_SEED); + hash = HashUtil::murmur_hash3_32(&data[i], sizeof(T), hash); } else { decimalv2_do_murmur(i, hash); } @@ -217,7 +215,7 @@ void ColumnDecimal::update_murmurs_with_value(int32_t* __restrict hashes, Pri DCHECK(s == size()); if constexpr (!IsDecimalV2) { - DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL(HashUtil::SPARK_MURMUR_32_SEED) + DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL() } else { if (null_data == nullptr) { for (size_t i = 0; i < s; i++) { diff --git a/be/src/vec/columns/column_map.cpp b/be/src/vec/columns/column_map.cpp index 06017ed0f391a05..a095620ddfe04dd 100644 --- a/be/src/vec/columns/column_map.cpp +++ b/be/src/vec/columns/column_map.cpp @@ -346,9 +346,6 @@ void ColumnMap::update_crc_with_value(size_t start, size_t end, uint32_t& hash, void ColumnMap::update_murmur_with_value(size_t start, size_t end, int32_t& hash, const uint8_t* __restrict null_data) const { auto& offsets = get_offsets(); - if (hash == 0) { - hash = HashUtil::SPARK_MURMUR_32_SEED; - } if (null_data) { for (size_t i = start; i < end; ++i) { if (null_data[i] == 0) { diff --git a/be/src/vec/columns/column_nullable.cpp b/be/src/vec/columns/column_nullable.cpp index 4f8de7f9cfe1736..49854b30163fc53 100644 --- a/be/src/vec/columns/column_nullable.cpp +++ b/be/src/vec/columns/column_nullable.cpp @@ -94,9 +94,8 @@ void ColumnNullable::update_murmur_with_value(size_t start, size_t end, int32_t& nested_column->update_murmur_with_value(start, end, hash, nullptr); } else { const auto* __restrict real_null_data = - assert_cast(*null_map).get_data().data(); - hash = HashUtil::SPARK_MURMUR_32_SEED; - for (int i = start; i < end; ++i) { + assert_cast(get_null_map_column()).get_data().data(); + for (size_t i = start; i < end; ++i) { if (real_null_data[i] != 0) { hash = HashUtil::murmur_hash3_32_null(hash); } @@ -141,13 +140,13 @@ void ColumnNullable::update_murmurs_with_value(int32_t* __restrict hashes, auto s = rows; DCHECK(s == size()); const auto* __restrict real_null_data = - assert_cast(*null_map).get_data().data(); + assert_cast(get_null_map_column()).get_data().data(); if (!has_null()) { nested_column->update_murmurs_with_value(hashes, type, rows, offset, nullptr); } else { for (int i = 0; i < s; ++i) { if (real_null_data[i] != 0) { - hashes[i] = HashUtil::murmur_hash3_32_null(HashUtil::SPARK_MURMUR_32_SEED); + hashes[i] = HashUtil::murmur_hash3_32_null(hashes[i]); } } nested_column->update_murmurs_with_value(hashes, type, rows, offset, real_null_data); diff --git a/be/src/vec/columns/column_string.cpp b/be/src/vec/columns/column_string.cpp index 95ac46fd27838a6..049930b79c611b2 100644 --- a/be/src/vec/columns/column_string.cpp +++ b/be/src/vec/columns/column_string.cpp @@ -286,7 +286,8 @@ void ColumnStr::update_crcs_with_value(uint32_t* __restrict hashes, doris::Pr } } -void ColumnString::update_murmurs_with_value(int32_t* __restrict hashes, doris::PrimitiveType type, +template +void ColumnStr::update_murmurs_with_value(int32_t* __restrict hashes, doris::PrimitiveType type, int32_t rows, uint32_t offset, const uint8_t* __restrict null_data) const { auto s = rows; @@ -295,15 +296,13 @@ void ColumnString::update_murmurs_with_value(int32_t* __restrict hashes, doris:: if (null_data == nullptr) { for (size_t i = 0; i < s; i++) { auto data_ref = get_data_at(i); - hashes[i] = HashUtil::murmur_hash3_32(data_ref.data, data_ref.size, - HashUtil::SPARK_MURMUR_32_SEED); + hashes[i] = HashUtil::murmur_hash3_32(data_ref.data, data_ref.size, hashes[i]); } } else { for (size_t i = 0; i < s; i++) { if (null_data[i] == 0) { auto data_ref = get_data_at(i); - hashes[i] = HashUtil::murmur_hash3_32(data_ref.data, data_ref.size, - HashUtil::SPARK_MURMUR_32_SEED); + hashes[i] = HashUtil::murmur_hash3_32(data_ref.data, data_ref.size, hashes[i]); } } } diff --git a/be/src/vec/columns/column_string.h b/be/src/vec/columns/column_string.h index 044fcec952ce47f..dd4ca1e69ed0b4b 100644 --- a/be/src/vec/columns/column_string.h +++ b/be/src/vec/columns/column_string.h @@ -433,15 +433,13 @@ class ColumnStr final : public COWHelper> { for (size_t i = start; i < end; ++i) { if (null_data[i] == 0) { auto data_ref = get_data_at(i); - hash = HashUtil::murmur_hash3_32(data_ref.data, data_ref.size, - HashUtil::SPARK_MURMUR_32_SEED); + hash = HashUtil::murmur_hash3_32(data_ref.data, data_ref.size, hash); } } } else { for (size_t i = start; i < end; ++i) { auto data_ref = get_data_at(i); - hash = HashUtil::murmur_hash3_32(data_ref.data, data_ref.size, - HashUtil::SPARK_MURMUR_32_SEED); + hash = HashUtil::murmur_hash3_32(data_ref.data, data_ref.size, hash); } } } diff --git a/be/src/vec/columns/column_vector.cpp b/be/src/vec/columns/column_vector.cpp index 97a72265969a2a0..1c75f26ec37ed95 100644 --- a/be/src/vec/columns/column_vector.cpp +++ b/be/src/vec/columns/column_vector.cpp @@ -220,14 +220,14 @@ void ColumnVector::update_murmurs_with_value(int32_t* __restrict hashes, Prim DCHECK(s == size()); if constexpr (!std::is_same_v) { - DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL(HashUtil::SPARK_MURMUR_32_SEED) + DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL() } else { if (type == TYPE_DATE || type == TYPE_DATETIME) { char buf[64]; auto date_convert_do_crc = [&](size_t i) { const VecDateTimeValue& date_val = (const VecDateTimeValue&)data[i]; auto len = date_val.to_buffer(buf); - hashes[i] = HashUtil::murmur_hash3_32(buf, len, HashUtil::SPARK_MURMUR_32_SEED); + hashes[i] = HashUtil::murmur_hash3_32(buf, len, hashes[i]); }; if (null_data == nullptr) { @@ -242,7 +242,7 @@ void ColumnVector::update_murmurs_with_value(int32_t* __restrict hashes, Prim } } } else { - DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL(HashUtil::SPARK_MURMUR_32_SEED) + DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL() } } } diff --git a/be/src/vec/columns/column_vector.h b/be/src/vec/columns/column_vector.h index 4d038958b74a679..7297f7a6efb05ca 100644 --- a/be/src/vec/columns/column_vector.h +++ b/be/src/vec/columns/column_vector.h @@ -309,9 +309,6 @@ class ColumnVector final : public COWHelper> { void update_murmur_with_value(size_t start, size_t end, int32_t& hash, const uint8_t* __restrict null_data) const override { - if (hash == 0) { - hash = HashUtil::SPARK_MURMUR_32_SEED; - } if (null_data) { for (size_t i = start; i < end; i++) { if (null_data[i] == 0) { diff --git a/be/src/vec/runtime/partitioner.cpp b/be/src/vec/runtime/partitioner.cpp index de15516cc8ae64b..c3bf779bd13c563 100644 --- a/be/src/vec/runtime/partitioner.cpp +++ b/be/src/vec/runtime/partitioner.cpp @@ -17,18 +17,16 @@ #include "partitioner.h" -#include "common/cast_set.h" #include "pipeline/local_exchange/local_exchange_sink_operator.h" #include "runtime/thread_context.h" #include "vec/columns/column_const.h" #include "vec/sink/vdata_stream_sender.h" namespace doris::vectorized { -#include "common/compile_check_begin.h" template -Status Partitioner::do_partitioning(RuntimeState* state, Block* block, - MemTracker* mem_tracker) const { +Status Partitioner::do_partitioning(RuntimeState* state, + Block* block) const { int rows = block->rows(); if (rows > 0) { @@ -38,7 +36,7 @@ Status Partitioner::do_partitioning(RuntimeState* sta std::vector result(result_size); _hash_vals.resize(rows); - std::fill(_hash_vals.begin(), _hash_vals.end(), 0); + std::fill(_hash_vals.begin(), _hash_vals.end(), _get_default_seed()); auto* __restrict hashes = _hash_vals.data(); { RETURN_IF_ERROR(_get_partition_column_result(block, result)); } for (int j = 0; j < result_size; ++j) { @@ -57,7 +55,7 @@ Status Partitioner::do_partitioning(RuntimeState* sta template void Crc32HashPartitioner::_do_hash(const ColumnPtr& column, uint32_t* __restrict result, int idx) const { - column->update_crcs_with_value(result, _partition_expr_ctxs[idx]->root()->type().type, + column->update_crcs_with_value(result, Base::_partition_expr_ctxs[idx]->root()->type().type, cast_set(column->size())); } @@ -71,12 +69,13 @@ void Murmur32HashPartitioner::_do_hash(const ColumnPtr& column, template Status Crc32HashPartitioner::clone(RuntimeState* state, std::unique_ptr& partitioner) { - auto* new_partitioner = new Crc32HashPartitioner(cast_set(_partition_count)); + auto* new_partitioner = + new Crc32HashPartitioner(cast_set(Base::_partition_count)); partitioner.reset(new_partitioner); - new_partitioner->_partition_expr_ctxs.resize(_partition_expr_ctxs.size()); - for (size_t i = 0; i < _partition_expr_ctxs.size(); i++) { - RETURN_IF_ERROR( - _partition_expr_ctxs[i]->clone(state, new_partitioner->_partition_expr_ctxs[i])); + new_partitioner->_partition_expr_ctxs.resize(Base::_partition_expr_ctxs.size()); + for (size_t i = 0; i < Base::_partition_expr_ctxs.size(); i++) { + RETURN_IF_ERROR(Base::_partition_expr_ctxs[i]->clone( + state, new_partitioner->_partition_expr_ctxs[i])); } return Status::OK(); } @@ -84,7 +83,8 @@ Status Crc32HashPartitioner::clone(RuntimeState* state, template Status Murmur32HashPartitioner::clone(RuntimeState* state, std::unique_ptr& partitioner) { - auto* new_partitioner = new Murmur32HashPartitioner(Base::_partition_count); + auto* new_partitioner = + new Murmur32HashPartitioner(cast_set(Base::_partition_count)); partitioner.reset(new_partitioner); new_partitioner->_partition_expr_ctxs.resize(Base::_partition_expr_ctxs.size()); for (size_t i = 0; i < Base::_partition_expr_ctxs.size(); i++) { @@ -94,6 +94,11 @@ Status Murmur32HashPartitioner::clone(RuntimeState* state, return Status::OK(); } +template +int32_t Murmur32HashPartitioner::_get_default_seed() const { + return static_cast(HashUtil::SPARK_MURMUR_32_SEED); +} + template class Crc32HashPartitioner; template class Crc32HashPartitioner; template class Murmur32HashPartitioner; diff --git a/be/src/vec/runtime/partitioner.h b/be/src/vec/runtime/partitioner.h index 6556b91d6ef4a65..c1a05fc6755d2d4 100644 --- a/be/src/vec/runtime/partitioner.h +++ b/be/src/vec/runtime/partitioner.h @@ -59,11 +59,11 @@ class PartitionerBase { const size_t _partition_count; }; -template -class Crc32HashPartitioner : public PartitionerBase { +template +class Partitioner : public PartitionerBase { public: - Crc32HashPartitioner(int partition_count) : PartitionerBase(partition_count) {} - ~Crc32HashPartitioner() override = default; + Partitioner(int partition_count) : PartitionerBase(partition_count) {} + ~Partitioner() override = default; Status init(const std::vector& texprs) override { return VExpr::create_expr_trees(texprs, _partition_expr_ctxs); @@ -79,9 +79,9 @@ class Crc32HashPartitioner : public PartitionerBase { Status do_partitioning(RuntimeState* state, Block* block) const override; - ChannelField get_channel_ids() const override { return {_hash_vals.data(), sizeof(uint32_t)}; } - - Status clone(RuntimeState* state, std::unique_ptr& partitioner) override; + ChannelField get_channel_ids() const override { + return {_hash_vals.data(), sizeof(HashValueType)}; + } protected: Status _get_partition_column_result(Block* block, std::vector& result) const { @@ -92,10 +92,12 @@ class Crc32HashPartitioner : public PartitionerBase { return Status::OK(); } - void _do_hash(const ColumnPtr& column, uint32_t* __restrict result, int idx) const; + virtual void _do_hash(const ColumnPtr& column, HashValueType* __restrict result, + int idx) const = 0; + virtual HashValueType _get_default_seed() const { return static_cast(0); } VExprContextSPtrs _partition_expr_ctxs; - mutable std::vector _hash_vals; + mutable std::vector _hash_vals; }; struct ShuffleChannelIds { @@ -112,6 +114,27 @@ struct SpillPartitionChannelIds { } }; +struct ShufflePModChannelIds { + template + HashValueType operator()(HashValueType l, int32_t r) { + return (l % r + r) % r; + } +}; + +template +class Crc32HashPartitioner final : public Partitioner { +public: + using Base = Partitioner; + Crc32HashPartitioner(int partition_count) + : Partitioner(partition_count) {} + ~Crc32HashPartitioner() override = default; + + Status clone(RuntimeState* state, std::unique_ptr& partitioner) override; + +private: + void _do_hash(const ColumnPtr& column, uint32_t* __restrict result, int idx) const override; +}; + template class Murmur32HashPartitioner final : public Partitioner { public: @@ -124,6 +147,7 @@ class Murmur32HashPartitioner final : public Partitioner { private: void _do_hash(const ColumnPtr& column, int32_t* __restrict result, int idx) const override; + int32_t _get_default_seed() const override; }; } // namespace vectorized diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 8c3aab6622b6936..0ff1f252d5441f1 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -100,13 +100,6 @@ class BlockSerializer { const int _batch_size; }; -struct ShufflePModChannelIds { - template - HashValueType operator()(HashValueType l, int32_t r) { - return (l % r + r) % r; - } -}; - class Channel { public: friend class pipeline::ExchangeSinkBuffer; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java index 639a2bd715f7eda..bc7a95d0eacca09 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java @@ -23,9 +23,11 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; import org.apache.doris.common.IndexedPriorityQueue; +import org.apache.doris.common.Pair; import org.apache.doris.common.ResettableRandomizedIterator; import org.apache.doris.common.UserException; import org.apache.doris.common.util.ConsistentHash; +import org.apache.doris.datasource.hive.HiveBucketUtil; import org.apache.doris.mysql.privilege.UserProperty; import org.apache.doris.qe.ConnectContext; import org.apache.doris.resource.Tag; @@ -81,6 +83,7 @@ public Map getAssignedWeightPerBackend() { private Map assignedWeightPerBackend = Maps.newHashMap(); protected ConsistentHash consistentHash; + protected ConsistentHash consistentBucketHash; private int nextBe = 0; private boolean initialized = false; @@ -200,6 +203,8 @@ public void init(BeSelectionPolicy policy) throws UserException { backendMap.putAll(backends.stream().collect(Collectors.groupingBy(Backend::getHost))); try { consistentHash = consistentHashCache.get(new HashCacheKey(backends)); + consistentBucketHash = new ConsistentHash<>(Hashing.murmur3_128(), new BucketHash(), + new BackendHash(), backends, Config.split_assigner_virtual_node_number); } catch (ExecutionException e) { throw new UserException("failed to get consistent hash", e); } @@ -216,6 +221,21 @@ public void setEnableSplitsRedistribution(boolean enableSplitsRedistribution) { this.enableSplitsRedistribution = enableSplitsRedistribution; } + public Multimap, Split> computeBucketAwareScanRangeAssignmentWith(List splits) + throws UserException { + ListMultimap, Split> assignment = ArrayListMultimap.create(); + int bucketNum = 0; + for (Split split : splits) { + FileSplit fileSplit = (FileSplit) split; + bucketNum = HiveBucketUtil.getBucketNumberFromPath(fileSplit.getPath().getPath().getName()).getAsInt(); + + List candidateNodes = consistentBucketHash.getNode(bucketNum, 1); + assignment.put(Pair.of(candidateNodes.get(0), bucketNum), split); + } + + return assignment; + } + /** * Assign splits to each backend. Ensure that each backend receives a similar amount of data. * In order to make sure backends utilize the os page cache as much as possible, and all backends read splits diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index fb24269c374663a..cd3b7395cf83c0c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -30,13 +30,13 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.NotImplementedException; +import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.hive.AcidInfo; import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo; import org.apache.doris.datasource.hive.HMSExternalTable; -import org.apache.doris.datasource.hive.HiveBucketUtil; import org.apache.doris.datasource.hive.source.HiveScanNode; import org.apache.doris.datasource.hive.source.HiveSplit; import org.apache.doris.planner.DataPartition; @@ -68,6 +68,7 @@ import org.apache.doris.thrift.TTransactionalHiveDeleteDeltaDesc; import org.apache.doris.thrift.TTransactionalHiveDesc; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; @@ -91,11 +92,11 @@ public abstract class FileQueryScanNode extends FileScanNode { private static final Logger LOG = LogManager.getLogger(FileQueryScanNode.class); + public ArrayListMultimap bucketSeq2locations = ArrayListMultimap.create(); + protected Map destSlotDescByName; protected TFileScanRangeParams params; - public ArrayListMultimap bucketSeq2locations = ArrayListMultimap.create(); - @Getter protected TableSample tableSample; @@ -281,6 +282,13 @@ protected Optional getSerializedTable() { @Override public void createScanRangeLocations() throws UserException { + if (!scanRangeLocations.isEmpty()) { + /* Note: createScanRangeLocations invoked twice thru finalizeForNereids() + * and produced duplicate records. + */ + return; + } + long start = System.currentTimeMillis(); if (ConnectContext.get().getExecutor() != null) { ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsStartTime(); @@ -326,13 +334,20 @@ public void createScanRangeLocations() throws UserException { params.setProperties(locationProperties); } + boolean isSparkBucketedHiveTable = false; + TableIf targetTable = getTargetTable(); + if (targetTable instanceof HMSExternalTable) { + isSparkBucketedHiveTable = ((HMSExternalTable) targetTable).isSparkBucketedTable(); + } + int numBackends = backendPolicy.numBackends(); List pathPartitionKeys = getPathPartitionKeys(); if (isBatchMode()) { // File splits are generated lazily, and fetched by backends while scanning. // Only provide the unique ID of split source to backend. splitAssignment = new SplitAssignment( - backendPolicy, this, this::splitToScanRange, locationProperties, pathPartitionKeys); + backendPolicy, this, this::splitToScanRange, locationProperties, pathPartitionKeys, + isSparkBucketedHiveTable); splitAssignment.init(); if (ConnectContext.get().getExecutor() != null) { ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime(); @@ -357,76 +372,15 @@ public void createScanRangeLocations() throws UserException { tSource.setSplitSourceId(splitSource.getUniqueId()); tSource.setNumSplits(numSplitsPerBE); curLocations.getScanRange().getExtScanRange().getFileScanRange().setSplitSource(tSource); -/*======= - // If fileSplit has partition values, use the values collected from hive partitions. - // Otherwise, use the values in file path. - boolean isACID = false; - if (fileSplit instanceof HiveSplit) { - HiveSplit hiveSplit = (HiveSplit) fileSplit; - isACID = hiveSplit.isACID(); - } - List partitionValuesFromPath = fileSplit.getPartitionValues() == null - ? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys, - false, isACID) : fileSplit.getPartitionValues(); - - boolean isSparkBucketedHiveTable = false; - int bucketNum = 0; - TableIf targetTable = getTargetTable(); - if (targetTable instanceof HMSExternalTable) { - isSparkBucketedHiveTable = ((HMSExternalTable) targetTable).isSparkBucketedTable(); - if (isSparkBucketedHiveTable) { - bucketNum = HiveBucketUtil.getBucketNumberFromPath(fileSplit.getPath().getName()).getAsInt(); - } - } - TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys, - locationType); - TFileCompressType fileCompressType = getFileCompressType(fileSplit); - rangeDesc.setCompressType(fileCompressType); - if (isACID) { - HiveSplit hiveSplit = (HiveSplit) fileSplit; - hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE); - TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); - tableFormatFileDesc.setTableFormatType(hiveSplit.getTableFormatType().value()); - AcidInfo acidInfo = (AcidInfo) hiveSplit.getInfo(); - TTransactionalHiveDesc transactionalHiveDesc = new TTransactionalHiveDesc(); - transactionalHiveDesc.setPartition(acidInfo.getPartitionLocation()); - List deleteDeltaDescs = new ArrayList<>(); - for (DeleteDeltaInfo deleteDeltaInfo : acidInfo.getDeleteDeltas()) { - TTransactionalHiveDeleteDeltaDesc deleteDeltaDesc = new TTransactionalHiveDeleteDeltaDesc(); - deleteDeltaDesc.setDirectoryLocation(deleteDeltaInfo.getDirectoryLocation()); - deleteDeltaDesc.setFileNames(deleteDeltaInfo.getFileNames()); - deleteDeltaDescs.add(deleteDeltaDesc); - } - transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs); - tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc); - rangeDesc.setTableFormatParams(tableFormatFileDesc); - } - setScanParams(rangeDesc, fileSplit); - - curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); ->>>>>>> a5ce2395a2 ([feature](datalake) Add BucketShuffleJoin support for Hive table data generated by Spark. (27783)) -*/ TScanRangeLocation location = new TScanRangeLocation(); location.setBackendId(backend.getId()); location.setServer(new TNetworkAddress(backend.getHost(), backend.getBePort())); curLocations.addToLocations(location); - // So there's only one scan range for each backend. + // So there's only one scan range for each backend. // Each backend only starts up one ScanNode instance. // However, even one ScanNode instance can provide maximum scanning concurrency. -/*======= - if (LOG.isDebugEnabled()) { - LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}", - curLocations.getLocations().get(0).getBackendId(), fileSplit.getPath(), - fileSplit.getStart(), fileSplit.getLength(), - Joiner.on("|").join(fileSplit.getHosts())); - } - if (isSparkBucketedHiveTable) { - bucketSeq2locations.put(bucketNum, curLocations); - } ->>>>>>> a5ce2395a2 ([feature](datalake) Add BucketShuffleJoin support for Hive table data generated by Spark. (27783)) -*/ scanRangeLocations.add(curLocations); setLocationPropertiesIfNecessary(backend, locationType, locationProperties); scanBackendIds.add(backend.getId()); @@ -440,14 +394,33 @@ >>>>>>> a5ce2395a2 ([feature](datalake) Add BucketShuffleJoin support for Hive t if (inputSplits.isEmpty() && !isFileStreamType()) { return; } - Multimap assignment = backendPolicy.computeScanRangeAssignment(inputSplits); - for (Backend backend : assignment.keySet()) { - Collection splits = assignment.get(backend); - for (Split split : splits) { - scanRangeLocations.add(splitToScanRange(backend, locationProperties, split, pathPartitionKeys)); - totalFileSize += split.getLength(); + + if (isSparkBucketedHiveTable) { + Multimap, Split> assignment; + + assignment = backendPolicy.computeBucketAwareScanRangeAssignmentWith(inputSplits); + for (Pair backend : assignment.keySet()) { + Collection splits = assignment.get(backend); + for (Split split : splits) { + scanRangeLocations.add(splitToScanRange(backend.first, backend.second, locationProperties, + split, pathPartitionKeys)); + totalFileSize += split.getLength(); + } + scanBackendIds.add(backend.first.getId()); + } + } else { + Multimap assignment; + + assignment = backendPolicy.computeScanRangeAssignment(inputSplits); + for (Backend backend : assignment.keySet()) { + Collection splits = assignment.get(backend); + for (Split split : splits) { + scanRangeLocations.add(splitToScanRange(backend, 0, locationProperties, split, + pathPartitionKeys)); + totalFileSize += split.getLength(); + } + scanBackendIds.add(backend.getId()); } - scanBackendIds.add(backend.getId()); } } @@ -464,6 +437,7 @@ >>>>>>> a5ce2395a2 ([feature](datalake) Add BucketShuffleJoin support for Hive t private TScanRangeLocations splitToScanRange( Backend backend, + Integer bucketNum, Map locationProperties, Split split, List pathPartitionKeys) throws UserException { @@ -480,6 +454,9 @@ private TScanRangeLocations splitToScanRange( ? BrokerUtil.parseColumnsFromPath(fileSplit.getPathString(), pathPartitionKeys, false, isACID) : fileSplit.getPartitionValues(); + boolean isSparkBucketedHiveTable = false; + TableIf targetTable = getTargetTable(); + TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys); TFileCompressType fileCompressType = getFileCompressType(fileSplit); rangeDesc.setCompressType(fileCompressType); @@ -518,6 +495,23 @@ private TScanRangeLocations splitToScanRange( location.setBackendId(backend.getId()); location.setServer(new TNetworkAddress(backend.getHost(), backend.getBePort())); curLocations.addToLocations(location); + + if (targetTable instanceof HMSExternalTable) { + isSparkBucketedHiveTable = ((HMSExternalTable) targetTable).isSparkBucketedTable(); + if (isSparkBucketedHiveTable) { + if (!bucketSeq2locations.containsKey(bucketNum)) { + bucketSeq2locations.put(bucketNum, curLocations); + } + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}, bucketNum: {}", + curLocations.getLocations().get(0).getBackendId(), fileSplit.getPath(), + fileSplit.getStart(), fileSplit.getLength(), + Joiner.on("|").join(fileSplit.getHosts()), bucketNum); + } + return curLocations; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java index a26abc7fc5e037f..0e34fae4043e8e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java @@ -17,11 +17,13 @@ package org.apache.doris.datasource; +import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.spi.Split; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TScanRangeLocations; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; import java.util.ArrayList; @@ -49,6 +51,7 @@ public class SplitAssignment { private final Map locationProperties; private final List pathPartitionKeys; private final Object assignLock = new Object(); + private final boolean useBucketAssignment; private Split sampleSplit = null; private final AtomicBoolean isStop = new AtomicBoolean(false); private final AtomicBoolean scheduleFinished = new AtomicBoolean(false); @@ -60,12 +63,14 @@ public SplitAssignment( SplitGenerator splitGenerator, SplitToScanRange splitToScanRange, Map locationProperties, - List pathPartitionKeys) { + List pathPartitionKeys, + boolean useBucketedAssignment) { this.backendPolicy = backendPolicy; this.splitGenerator = splitGenerator; this.splitToScanRange = splitToScanRange; this.locationProperties = locationProperties; this.pathPartitionKeys = pathPartitionKeys; + this.useBucketAssignment = useBucketedAssignment; } public void init() throws UserException { @@ -88,14 +93,15 @@ private boolean waitFirstSplit() { return !scheduleFinished.get() && !isStop.get() && exception == null; } - private void appendBatch(Multimap batch) throws UserException { - for (Backend backend : batch.keySet()) { + private void appendBatch(Multimap, Split> batch) throws UserException { + for (Pair backend : batch.keySet()) { Collection splits = batch.get(backend); List locations = new ArrayList<>(splits.size()); for (Split split : splits) { - locations.add(splitToScanRange.getScanRange(backend, locationProperties, split, pathPartitionKeys)); + locations.add(splitToScanRange.getScanRange(backend.first, backend.second, locationProperties, + split, pathPartitionKeys)); } - if (!assignment.computeIfAbsent(backend, be -> new LinkedBlockingQueue<>()).offer(locations)) { + if (!assignment.computeIfAbsent(backend.first, be -> new LinkedBlockingQueue<>()).offer(locations)) { throw new UserException("Failed to offer batch split"); } } @@ -117,14 +123,20 @@ public void addToQueue(List splits) { if (splits.isEmpty()) { return; } - Multimap batch = null; + Multimap, Split> batch = ArrayListMultimap.create(); synchronized (assignLock) { if (sampleSplit == null) { sampleSplit = splits.get(0); assignLock.notify(); } try { - batch = backendPolicy.computeScanRangeAssignment(splits); + if (useBucketAssignment) { + batch = backendPolicy.computeBucketAwareScanRangeAssignmentWith(splits); + } else { + Multimap, Split> finalBatch = batch; + backendPolicy.computeScanRangeAssignment(splits).entries() + .forEach(e -> finalBatch.put(Pair.of(e.getKey(), 0), e.getValue())); + } } catch (UserException e) { exception = e; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitToScanRange.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitToScanRange.java index 0e890252857583c..ea58b6d8d0bb0a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitToScanRange.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitToScanRange.java @@ -28,6 +28,7 @@ public interface SplitToScanRange { TScanRangeLocations getScanRange( Backend backend, + Integer bucketNum, Map locationProperties, Split split, List pathPartitionKeys) throws UserException; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index c6136d816f98084..dbcf104a51304b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -63,6 +63,7 @@ import org.apache.doris.thrift.TTableType; import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -84,10 +85,8 @@ import org.apache.logging.log4j.Logger; import java.io.IOException; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.time.LocalDate; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -571,7 +570,7 @@ public Optional initSchema() { } else { columns = getHiveSchema(); } - List partitionColumns = initPartitionColumns(columns); + partitionColumns = initPartitionColumns(columns); initBucketingColumns(columns); return Optional.of(new HMSSchemaCacheValue(columns, partitionColumns)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveBucketUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveBucketUtil.java index fc0bed8d5e1c252..41d04507a1debaa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveBucketUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveBucketUtil.java @@ -103,7 +103,7 @@ private static PrimitiveTypeInfo convertToHiveColType(PrimitiveType dorisType) t private static final Iterable BUCKET_PATTERNS = ImmutableList.of( // spark/parquet pattern - // format: f"part-[paritionId]-[tid]-[txnId]-[jobId]-[taskAttemptId]-[fileCount].c000.snappy.parquet" + // format: f"part-[paritionId]-[tid]-[txnId]-[jobId]-[taskAttemptId]_[fileCount].c000.snappy.parquet" Pattern.compile("part-\\d{5}-\\w{8}-\\w{4}-\\w{4}-\\w{4}-\\w{12}_(\\d{5})(?:[-_.].*)?"), // legacy Presto naming pattern (current version matches Hive) Pattern.compile("\\d{8}_\\d{6}_\\d{5}_[a-z0-9]{5}_bucket-(\\d+)(?:[-_.].*)?"), diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index ecd6bc6173018a1..ab094112cf7a39e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -503,7 +503,7 @@ protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws User @Override public DataPartition constructInputPartitionByDistributionInfo() { - if (hmsTable.isSparkBucketedTable()) { + if (hmsTable.isSparkBucketedTable() && ConnectContext.get().getSessionVariable().isEnableSparkShuffle()) { DistributionInfo distributionInfo = hmsTable.getDefaultDistributionInfo(); if (!(distributionInfo instanceof HashDistributionInfo)) { return DataPartition.RANDOM; @@ -526,7 +526,7 @@ public HMSExternalTable getHiveTable() { @Override public THashType getHashType() { - if (hmsTable.isSparkBucketedTable() + if (hmsTable.isSparkBucketedTable() && ConnectContext.get().getSessionVariable().isEnableSparkShuffle() && hmsTable.getDefaultDistributionInfo() instanceof HashDistributionInfo) { return THashType.SPARK_MURMUR32; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 06f86b7e3302f83..c1ffa84396f80bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -347,6 +347,7 @@ public PlanFragment visitPhysicalDistribute(PhysicalDistribute d } DataPartition dataPartition = toDataPartition(distribute.getDistributionSpec(), validOutputIds, context); exchangeNode.setPartitionType(dataPartition.getType()); + exchangeNode.setHashType(dataPartition.getHashType()); exchangeNode.setChildrenDistributeExprLists(distributeExprLists); PlanFragment parentFragment = new PlanFragment(context.nextFragmentId(), exchangeNode, dataPartition); if (distribute.getDistributionSpec() instanceof DistributionSpecGather) { @@ -557,7 +558,6 @@ public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTransla SessionVariable sv = ConnectContext.get().getSessionVariable(); // TODO(cmy): determine the needCheckColumnPriv param FileQueryScanNode scanNode; - DataPartition dataPartition = DataPartition.RANDOM; if (table instanceof HMSExternalTable) { switch (((HMSExternalTable) table).getDlaType()) { case ICEBERG: @@ -589,8 +589,8 @@ public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTransla } else { throw new RuntimeException("do not support table type " + table.getType()); } - if (fileScan.getTableSnapshot().isPresent() && scanNode instanceof FileQueryScanNode) { - ((FileQueryScanNode) scanNode).setQueryTableSnapshot(fileScan.getTableSnapshot().get()); + if (fileScan.getTableSnapshot().isPresent()) { + scanNode.setQueryTableSnapshot(fileScan.getTableSnapshot().get()); } return getPlanFragmentForPhysicalFileScan(fileScan, context, scanNode, table, tupleDescriptor); } @@ -689,9 +689,9 @@ private PlanFragment getPlanFragmentForPhysicalFileScan(PhysicalFileScan fileSca if (fileScan.getDistributionSpec() instanceof DistributionSpecHash) { DistributionSpecHash distributionSpecHash = (DistributionSpecHash) fileScan.getDistributionSpec(); List partitionExprs = distributionSpecHash.getOrderedShuffledColumns().stream() - .map(context::findSlotRef).collect(Collectors.toList()); + .map(context::findSlotRef).collect(Collectors.toList()); dataPartition = new DataPartition(TPartitionType.HASH_PARTITIONED, - partitionExprs, scanNode.getHashType()); + partitionExprs, ((FileQueryScanNode) scanNode).getHashType()); } // Create PlanFragment PlanFragment planFragment = createPlanFragment(scanNode, dataPartition, fileScan); @@ -1427,6 +1427,8 @@ public PlanFragment visitPhysicalHashJoin( hashJoinNode.setDistributionMode(DistributionMode.BROADCAST); } else if (JoinUtils.shouldBucketShuffleJoin(physicalHashJoin)) { hashJoinNode.setDistributionMode(DistributionMode.BUCKET_SHUFFLE); + hashJoinNode.setHashType(((DistributionSpecHash) physicalHashJoin.left() + .getPhysicalProperties().getDistributionSpec()).getShuffleFunction()); } else { hashJoinNode.setDistributionMode(DistributionMode.PARTITIONED); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java index 7b9d696cabc5e9c..a68d44cc3118aad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java @@ -503,6 +503,8 @@ private PhysicalProperties computeShuffleJoinOutputProperties( ShuffleType outputShuffleType = shuffleSide == ShuffleSide.LEFT ? rightHashSpec.getShuffleType() : leftHashSpec.getShuffleType(); + DistributionSpecHash.StorageBucketHashType outputShuffleFunction = shuffleSide == ShuffleSide.LEFT + ? rightHashSpec.getShuffleFunction() : leftHashSpec.getShuffleFunction(); switch (hashJoin.getJoinType()) { case INNER_JOIN: @@ -522,7 +524,7 @@ private PhysicalProperties computeShuffleJoinOutputProperties( case LEFT_OUTER_JOIN: if (shuffleSide == ShuffleSide.LEFT) { return new PhysicalProperties( - leftHashSpec.withShuffleTypeAndForbidColocateJoin(outputShuffleType) + leftHashSpec.withShuffleTypeAndForbidColocateJoin(outputShuffleType, outputShuffleFunction) ); } else { return new PhysicalProperties(leftHashSpec); @@ -536,7 +538,7 @@ private PhysicalProperties computeShuffleJoinOutputProperties( // retain left shuffle type, since coordinator use left most node to schedule fragment // forbid colocate join, since right table already shuffle return new PhysicalProperties(rightHashSpec.withShuffleTypeAndForbidColocateJoin( - leftHashSpec.getShuffleType())); + leftHashSpec.getShuffleType(), leftHashSpec.getShuffleFunction())); } case FULL_OUTER_JOIN: return PhysicalProperties.createAnyFromHash(leftHashSpec, rightHashSpec); @@ -582,7 +584,7 @@ private PhysicalProperties legacyComputeShuffleJoinOutputProperties( // retain left shuffle type, since coordinator use left most node to schedule fragment // forbid colocate join, since right table already shuffle return new PhysicalProperties(rightHashSpec.withShuffleTypeAndForbidColocateJoin( - leftHashSpec.getShuffleType())); + leftHashSpec.getShuffleType(), leftHashSpec.getShuffleFunction())); } case FULL_OUTER_JOIN: return PhysicalProperties.createAnyFromHash(leftHashSpec); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java index d786215692cbb24..5cca6464afff654 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java @@ -679,7 +679,7 @@ private PhysicalProperties calAnotherSideRequired(ShuffleType shuffleType, notNeedShuffleSideRequired, needShuffleSideRequired); return new PhysicalProperties(new DistributionSpecHash(shuffleSideIds, shuffleType, needShuffleSideOutput.getTableId(), needShuffleSideOutput.getSelectedIndexId(), - needShuffleSideOutput.getPartitionIds(), notShuffleSideOutput.getShuffleFunction())); + needShuffleSideOutput.getPartitionIds(), notNeedShuffleSideOutput.getShuffleFunction())); } private void updateChildEnforceAndCost(int index, PhysicalProperties targetProperties) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java index 3bd1ab1ae529647..75e5682d9245505 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java @@ -346,6 +346,9 @@ public enum StorageBucketHashType { // SPARK_MURMUR32 is the hash function for Spark bucketed hive table's storage and computation STORAGE_BUCKET_SPARK_MURMUR32; + /** + * convert to thrift + */ public THashType toThrift() { switch (this) { case STORAGE_BUCKET_CRC32: @@ -353,8 +356,25 @@ public THashType toThrift() { case STORAGE_BUCKET_SPARK_MURMUR32: return THashType.SPARK_MURMUR32; case STORAGE_BUCKET_XXHASH64: - default: return THashType.XXHASH64; + default: + return THashType.CRC32; + } + } + + /** + * convert from thrift + */ + public static StorageBucketHashType fromThrift(THashType hashType) { + switch (hashType) { + case CRC32: + return STORAGE_BUCKET_CRC32; + case SPARK_MURMUR32: + return STORAGE_BUCKET_SPARK_MURMUR32; + case XXHASH64: + return STORAGE_BUCKET_XXHASH64; + default: + return STORAGE_BUCKET_CRC32; } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java index 5162949ed796d18..54ffd42457fc80d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java @@ -28,12 +28,13 @@ import org.apache.doris.nereids.properties.DistributionSpecStorageAny; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; -import org.apache.doris.nereids.trees.plans.logical.LogicalHudiScan; import org.apache.doris.nereids.trees.expressions.ExprId; import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalHudiScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan; +import org.apache.doris.qe.ConnectContext; import com.google.common.collect.Lists; @@ -68,8 +69,18 @@ private DistributionSpec convertDistribution(LogicalFileScan fileScan) { } HMSExternalTable hmsExternalTable = (HMSExternalTable) table; + if (hmsExternalTable.getDlaType() != HMSExternalTable.DLAType.HIVE + && !hmsExternalTable.isSparkBucketedTable()) { + return DistributionSpecStorageAny.INSTANCE; + } + + boolean isSelectUnpartitioned = !hmsExternalTable.isPartitionedTable() + || hmsExternalTable.getPartitionNames().size() == 1 + || fileScan.getSelectedPartitions().selectedPartitions.size() == 1; + DistributionInfo distributionInfo = hmsExternalTable.getDefaultDistributionInfo(); - if (distributionInfo instanceof HashDistributionInfo) { + if (distributionInfo instanceof HashDistributionInfo && isSelectUnpartitioned + && ConnectContext.get().getSessionVariable().isEnableSparkShuffle()) { HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo; List output = fileScan.getOutput(); List hashColumns = Lists.newArrayList(); @@ -80,12 +91,10 @@ private DistributionSpec convertDistribution(LogicalFileScan fileScan) { } } } - StorageBucketHashType function = StorageBucketHashType.STORAGE_BUCKET_CRC32; - if (hmsExternalTable.isSparkBucketedTable()) { - function = StorageBucketHashType.STORAGE_BUCKET_SPARK_MURMUR32; - } + return new DistributionSpecHash(hashColumns, DistributionSpecHash.ShuffleType.NATURAL, - fileScan.getTable().getId(), -1, Collections.emptySet(), function); + fileScan.getTable().getId(), -1, Collections.emptySet(), + StorageBucketHashType.STORAGE_BUCKET_SPARK_MURMUR32); } return DistributionSpecStorageAny.INSTANCE; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 3ba8efa48a29a04..7b6575d66f64ddc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -41,6 +41,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.source.HiveScanNode; +import org.apache.doris.nereids.properties.DistributionSpecHash; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.THashType; import org.apache.doris.thrift.TPartitionType; @@ -339,6 +340,7 @@ private PlanFragment createHashJoinFragment( Ref hashType = Ref.from(THashType.CRC32); if (canBucketShuffleJoin(node, leftChildFragment, rhsPartitionExprs, hashType)) { node.setDistributionMode(HashJoinNode.DistributionMode.BUCKET_SHUFFLE); + node.setHashType(DistributionSpecHash.StorageBucketHashType.fromThrift(hashType.value)); DataPartition rhsJoinPartition = new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, rhsPartitionExprs, hashType.value); @@ -658,7 +660,8 @@ private boolean canBucketShuffleJoinForHive(HashJoinNode node, HiveScanNode left HMSExternalTable leftTable = leftScanNode.getHiveTable(); DistributionInfo leftDistribution = leftTable.getDefaultDistributionInfo(); - if (leftDistribution == null || !(leftDistribution instanceof HiveExternalDistributionInfo)) { + if (leftDistribution == null || !(leftDistribution instanceof HiveExternalDistributionInfo) + || !ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin()) { return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java index cb6628b01c556b9..a13f830692f3855 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java @@ -30,6 +30,7 @@ import org.apache.doris.statistics.StatsRecursiveDerive; import org.apache.doris.thrift.TExchangeNode; import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.THashType; import org.apache.doris.thrift.TPartitionType; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; @@ -67,6 +68,7 @@ public class ExchangeNode extends PlanNode { private boolean isRightChildOfBroadcastHashJoin = false; private TPartitionType partitionType; + private THashType hashType; /** * use for Nereids only. @@ -168,6 +170,10 @@ public void setMergeInfo(SortInfo info) { this.planNodeName = "V" + MERGING_EXCHANGE_NODE; } + public void setHashType(THashType hashType) { + this.hashType = hashType; + } + @Override protected void toThrift(TPlanNode msg) { // If this fragment has another scan node, this exchange node is serial or not should be decided by the scan @@ -182,6 +188,9 @@ protected void toThrift(TPlanNode msg) { if (mergeInfo != null) { msg.exchange_node.setSortInfo(mergeInfo.toThrift()); } + if (hashType != null) { + msg.exchange_node.setHashType(hashType); + } msg.exchange_node.setOffset(offset); msg.exchange_node.setPartitionType(partitionType); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java index c3cbf2afce15ac4..685e07b13b39919 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java @@ -37,6 +37,7 @@ import org.apache.doris.common.CheckedMath; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; +import org.apache.doris.nereids.properties.DistributionSpecHash; import org.apache.doris.nereids.trees.expressions.ExprId; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TEqJoinCondition; @@ -79,6 +80,7 @@ public class HashJoinNode extends JoinNodeBase { private List markJoinConjuncts; private DistributionMode distrMode; + private DistributionSpecHash.StorageBucketHashType hashType; private boolean isColocate = false; //the flag for colocate join private String colocateReason = ""; // if can not do colocate join, set reason here @@ -249,6 +251,10 @@ public void setColocate(boolean colocate, String reason) { colocateReason = reason; } + public void setHashType(DistributionSpecHash.StorageBucketHashType hashType) { + this.hashType = hashType; + } + /** * Calculate the slots output after going through the hash table in the hash join node. * The most essential difference between 'hashOutputSlots' and 'outputSlots' is that @@ -817,6 +823,9 @@ protected void toThrift(TPlanNode msg) { } } msg.hash_join_node.setDistType(isColocate ? TJoinDistributionType.COLOCATE : distrMode.toThrift()); + if (hashType != null) { + msg.hash_join_node.setHashType(hashType.toThrift()); + } msg.hash_join_node.setUseSpecificProjections(useSpecificProjections); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index d615787d401b56d..2ecfc923132e842 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -37,8 +37,8 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.ExternalScanNode; import org.apache.doris.datasource.FileQueryScanNode; -import org.apache.doris.datasource.hive.source.HiveScanNode; import org.apache.doris.datasource.hive.HMSTransaction; +import org.apache.doris.datasource.hive.source.HiveScanNode; import org.apache.doris.datasource.iceberg.IcebergTransaction; import org.apache.doris.load.loadv2.LoadJob; import org.apache.doris.metric.MetricRepo; @@ -2161,7 +2161,7 @@ protected void computeScanRangeAssignment() throws Exception { scanNode.getFragment().getPlanRoot()) && (scanNode instanceof OlapScanNode); boolean fragmentContainsBucketShuffleJoin = bucketShuffleJoinController .isBucketShuffleJoin(scanNode.getFragmentId().asInt(), scanNode.getFragment().getPlanRoot()) - && (scanNode instanceof OlapScanNode); + && (scanNode instanceof OlapScanNode || scanNode instanceof HiveScanNode); // A fragment may contain both colocate join and bucket shuffle join // on need both compute scanRange to init basic data for query coordinator @@ -2562,7 +2562,7 @@ class BucketShuffleJoinController { private final Map> fragmentIdToSeqToAddressMap = Maps.newHashMap(); // fragment_id -> < be_id -> bucket_count > - private final Map> fragmentIdToBuckendIdBucketCountMap = Maps.newHashMap(); + private final Map> fragmentIdToBackendIdBucketCountMap = Maps.newHashMap(); // fragment_id -> bucket_num protected final Map fragmentIdToBucketNumMap = Maps.newHashMap(); @@ -2617,30 +2617,30 @@ private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations seqLoc PlanFragmentId fragmentId, Integer bucketSeq, ImmutableMap idToBackend, Map addressToBackendID, Map replicaNumPerHost) throws Exception { - Map buckendIdToBucketCountMap = fragmentIdToBuckendIdBucketCountMap.get(fragmentId); + Map backendIdToBucketCountMap = fragmentIdToBackendIdBucketCountMap.get(fragmentId); int maxBucketNum = Integer.MAX_VALUE; - long buckendId = Long.MAX_VALUE; + long backendId = Long.MAX_VALUE; Long minReplicaNum = Long.MAX_VALUE; for (TScanRangeLocation location : seqLocation.locations) { - if (buckendIdToBucketCountMap.getOrDefault(location.backend_id, 0) < maxBucketNum) { - maxBucketNum = buckendIdToBucketCountMap.getOrDefault(location.backend_id, 0); - buckendId = location.backend_id; + if (backendIdToBucketCountMap.getOrDefault(location.backend_id, 0) < maxBucketNum) { + maxBucketNum = backendIdToBucketCountMap.getOrDefault(location.backend_id, 0); + backendId = location.backend_id; minReplicaNum = replicaNumPerHost.get(location.server); - } else if (buckendIdToBucketCountMap.getOrDefault(location.backend_id, 0) == maxBucketNum + } else if (backendIdToBucketCountMap.getOrDefault(location.backend_id, 0) == maxBucketNum && replicaNumPerHost.get(location.server) < minReplicaNum) { - buckendId = location.backend_id; + backendId = location.backend_id; minReplicaNum = replicaNumPerHost.get(location.server); } } Reference backendIdRef = new Reference<>(); - TNetworkAddress execHostPort = SimpleScheduler.getHost(buckendId, + TNetworkAddress execHostPort = SimpleScheduler.getHost(backendId, seqLocation.locations, idToBackend, backendIdRef); - //the backend with buckendId is not alive, chose another new backend - if (backendIdRef.getRef() != buckendId) { - buckendIdToBucketCountMap.put(backendIdRef.getRef(), - buckendIdToBucketCountMap.getOrDefault(backendIdRef.getRef(), 0) + 1); - } else { //the backend with buckendId is alive, update buckendIdToBucketCountMap directly - buckendIdToBucketCountMap.put(buckendId, buckendIdToBucketCountMap.getOrDefault(buckendId, 0) + 1); + //the backend with backendId is not alive, chose another new backend + if (backendIdRef.getRef() != backendId) { + backendIdToBucketCountMap.put(backendIdRef.getRef(), + backendIdToBucketCountMap.getOrDefault(backendIdRef.getRef(), 0) + 1); + } else { //the backend with backendId is alive, update backendIdToBucketCountMap directly + backendIdToBucketCountMap.put(backendId, backendIdToBucketCountMap.getOrDefault(backendId, 0) + 1); } for (TScanRangeLocation location : seqLocation.locations) { replicaNumPerHost.put(location.server, replicaNumPerHost.get(location.server) - 1); @@ -2672,7 +2672,7 @@ private void computeScanRangeAssignmentByBucketForOlap( fragmentIdToBucketNumMap.put(scanNode.getFragmentId(), bucketNum); fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new HashMap<>()); fragmentIdBucketSeqToScanRangeMap.put(scanNode.getFragmentId(), new BucketSeqToScanRange()); - fragmentIdToBuckendIdBucketCountMap.put(scanNode.getFragmentId(), new HashMap<>()); + fragmentIdToBackendIdBucketCountMap.put(scanNode.getFragmentId(), new HashMap<>()); scanNode.getFragment().setBucketNum(bucketNum); } Map bucketSeqToAddress @@ -2717,7 +2717,8 @@ private void computeScanRangeAssignmentByBucketForHive( fragmentIdToBucketNumMap.put(scanNode.getFragmentId(), bucketNum); fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new HashMap<>()); fragmentIdBucketSeqToScanRangeMap.put(scanNode.getFragmentId(), new BucketSeqToScanRange()); - fragmentIdToBuckendIdBucketCountMap.put(scanNode.getFragmentId(), new HashMap<>()); + fragmentIdToBackendIdBucketCountMap.put(scanNode.getFragmentId(), new HashMap<>()); + scanNode.getFragment().setBucketNum(bucketNum); } Map bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId()); @@ -2806,12 +2807,11 @@ private void assignScanRanges(PlanFragmentId fragmentId, int parallelExecInstanc for (Pair>> nodeScanRangeMap : scanRange) { for (Map.Entry> nodeScanRange : nodeScanRangeMap.second.entrySet()) { - if (!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) { - range.put(nodeScanRange.getKey(), Lists.newArrayList()); - instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), Lists.newArrayList()); - } - range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue()); - instanceParam.perNodeScanRanges.get(nodeScanRange.getKey()) + + range.computeIfAbsent(nodeScanRange.getKey(), ArrayList::new) + .addAll(nodeScanRange.getValue()); + instanceParam.perNodeScanRanges + .computeIfAbsent(nodeScanRange.getKey(), ArrayList::new) .addAll(nodeScanRange.getValue()); } } @@ -3327,10 +3327,25 @@ public void appendScanRange(StringBuilder sb, List params) { } TEsScanRange esScanRange = range.getScanRange().getEsScanRange(); if (esScanRange != null) { + if (idx++ != 0) { + sb.append(","); + } sb.append("{ index=").append(esScanRange.getIndex()) .append(", shardid=").append(esScanRange.getShardId()) .append("}"); } + TExternalScanRange extScanRange = range.getScanRange().getExtScanRange(); + if (extScanRange != null) { + TFileScanRange fileScanRange = extScanRange.getFileScanRange(); + if (fileScanRange != null) { + if (idx++ != 0) { + sb.append(","); + } + sb.append("{path=") + .append(fileScanRange.getRanges().get(0).getPath()) + .append("}"); + } + } } sb.append("]"); } @@ -3347,10 +3362,10 @@ public void appendTo(StringBuilder sb) { } TNetworkAddress address = instanceExecParams.get(i).host; Map> scanRanges = - scanRangeAssignment.get(address); + instanceExecParams.get(i).perNodeScanRanges; sb.append("{"); sb.append("id=").append(DebugUtil.printId(instanceExecParams.get(i).instanceId)); - sb.append(",host=").append(instanceExecParams.get(i).host); + sb.append(",host=").append(address); if (scanRanges == null) { sb.append("}"); continue; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 380c758e5751c86..371283f978e5201 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -699,6 +699,8 @@ public class SessionVariable implements Serializable, Writable { */ public static final String ENABLE_AUTO_CREATE_WHEN_OVERWRITE = "enable_auto_create_when_overwrite"; + public static final String ENABLE_SPARK_BUCKET_SHUFFLE = "enable_spark_bucket_shuffle"; + /** * If set false, user couldn't submit analyze SQL and FE won't allocate any related resources. */ @@ -2365,6 +2367,17 @@ public boolean isEnableSortSpill() { return enableSortSpill; } + @VariableMgr.VarAttr(name = ENABLE_SPARK_BUCKET_SHUFFLE) + public boolean enableSparkShuffle = false; + + public boolean isEnableSparkShuffle() { + return enableSparkShuffle; + } + + public void setEnableSparkShuffle(boolean enableSparkShuffle) { + this.enableSparkShuffle = enableSparkShuffle; + } + // If this fe is in fuzzy mode, then will use initFuzzyModeVariables to generate some variables, // not the default value set in the code. @SuppressWarnings("checkstyle:Indentation") diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index bd8c43622d1f608..d0527e07b7a9e21 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -864,6 +864,7 @@ struct THashJoinNode { 13: optional list mark_join_conjuncts // use_specific_projections true, if output exprssions is denoted by srcExprList represents, o.w. PlanNode.projections 14: optional bool use_specific_projections + 15: optional Partitions.THashType hash_type } struct TNestedLoopJoinNode { @@ -1169,6 +1170,7 @@ struct TExchangeNode { 3: optional i64 offset // Shuffle partition type 4: optional Partitions.TPartitionType partition_type + 5: optional Partitions.THashType hash_type } struct TOlapRewriteNode { diff --git a/regression-test/suites/external_table_p0/hive/test_hive_spark_clustered_table.groovy b/regression-test/suites/external_table_p0/hive/test_hive_spark_clustered_table.groovy index bf7f5c1794a96fe..0751d1f6e65c482 100644 --- a/regression-test/suites/external_table_p0/hive/test_hive_spark_clustered_table.groovy +++ b/regression-test/suites/external_table_p0/hive/test_hive_spark_clustered_table.groovy @@ -51,6 +51,7 @@ suite("test_hive_spark_clustered_table", "p0,external,hive,external_docker,exter sql """use `${catalog_name}`.`default`""" sql """set enable_fallback_to_original_planner=false;""" + sql """SET enable_spark_bucket_shuffle=true""" q01() diff --git a/regression-test/suites/nereids_p0/join/bucket_shuffle_join.groovy b/regression-test/suites/nereids_p0/join/bucket_shuffle_join.groovy index dd6a00b3123e74c..e85a2bbc088964a 100644 --- a/regression-test/suites/nereids_p0/join/bucket_shuffle_join.groovy +++ b/regression-test/suites/nereids_p0/join/bucket_shuffle_join.groovy @@ -20,6 +20,7 @@ suite("bucket-shuffle-join") { sql "SET enable_fallback_to_original_planner=false" sql 'SET be_number_for_test=1' sql 'SET parallel_pipeline_task_num=1' + sql 'SET enable_spark_bucket_shuffle=true' order_qt_test_bucket """ select * from test_bucket_shuffle_join where rectime="2021-12-01 00:00:00" and id in (select k1 from test_join where k1 in (1,2)) """