diff --git a/be/src/common/consts.h b/be/src/common/consts.h index 4618ffe7d749ea..ca4662c839e5f1 100644 --- a/be/src/common/consts.h +++ b/be/src/common/consts.h @@ -24,8 +24,6 @@ namespace BeConsts { const std::string CSV = "csv"; const std::string CSV_WITH_NAMES = "csv_with_names"; const std::string CSV_WITH_NAMES_AND_TYPES = "csv_with_names_and_types"; -const std::string BLOCK_TEMP_COLUMN_PREFIX = "__TEMP__"; -const std::string BLOCK_TEMP_COLUMN_SCANNER_FILTERED = "__TEMP__scanner_filtered"; const std::string ROWID_COL = "__DORIS_ROWID_COL__"; const std::string GLOBAL_ROWID_COL = "__DORIS_GLOBAL_ROWID_COL__"; const std::string ROW_STORE_COL = "__DORIS_ROW_STORE_COL__"; diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp index 0f4619e130325b..4e09c9ac36c3b8 100644 --- a/be/src/exec/rowid_fetcher.cpp +++ b/be/src/exec/rowid_fetcher.cpp @@ -182,7 +182,11 @@ Status RowIDFetcher::_merge_rpc_results(const PMultiGetRequest& request, } // Merge partial blocks vectorized::Block partial_block; - RETURN_IF_ERROR(partial_block.deserialize(resp.block())); + [[maybe_unused]] size_t uncompressed_size = 0; + [[maybe_unused]] int64_t uncompressed_time = 0; + + RETURN_IF_ERROR( + partial_block.deserialize(resp.block(), &uncompressed_size, &uncompressed_time)); if (partial_block.is_empty_column()) { return Status::OK(); } @@ -493,9 +497,10 @@ Status RowIdStorageReader::read_by_rowids(const PMultiGetRequest& request, << ", be_exec_version:" << request.be_exec_version(); [[maybe_unused]] size_t compressed_size = 0; [[maybe_unused]] size_t uncompressed_size = 0; + [[maybe_unused]] int64_t compress_time = 0; int be_exec_version = request.has_be_exec_version() ? request.be_exec_version() : 0; RETURN_IF_ERROR(result_block.serialize(be_exec_version, response->mutable_block(), - &uncompressed_size, &compressed_size, + &uncompressed_size, &compressed_size, &compress_time, segment_v2::CompressionTypePB::LZ4)); } @@ -600,10 +605,11 @@ Status RowIdStorageReader::read_by_rowids(const PMultiGetRequestV2& request, [[maybe_unused]] size_t compressed_size = 0; [[maybe_unused]] size_t uncompressed_size = 0; + [[maybe_unused]] int64_t compress_time = 0; int be_exec_version = request.has_be_exec_version() ? request.be_exec_version() : 0; - RETURN_IF_ERROR(result_blocks[i].serialize(be_exec_version, pblock->mutable_block(), - &uncompressed_size, &compressed_size, - segment_v2::CompressionTypePB::LZ4)); + RETURN_IF_ERROR(result_blocks[i].serialize( + be_exec_version, pblock->mutable_block(), &uncompressed_size, &compressed_size, + &compress_time, segment_v2::CompressionTypePB::LZ4)); } // Build file type statistics string diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 9cde2bbdf45bdc..2c858e557bb14b 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -941,11 +941,10 @@ Status BaseTablet::fetch_value_by_rowids(RowsetSharedPtr input_rowset, uint32_t const signed char* BaseTablet::get_delete_sign_column_data(const vectorized::Block& block, size_t rows_at_least) { - if (const vectorized::ColumnWithTypeAndName* delete_sign_column = - block.try_get_by_name(DELETE_SIGN); - delete_sign_column != nullptr) { + if (int pos = block.get_position_by_name(DELETE_SIGN); pos != -1) { + const vectorized::ColumnWithTypeAndName& delete_sign_column = block.get_by_position(pos); const auto& delete_sign_col = - reinterpret_cast(*(delete_sign_column->column)); + assert_cast(*(delete_sign_column.column)); if (delete_sign_col.size() >= rows_at_least) { return delete_sign_col.get_data().data(); } diff --git a/be/src/olap/partial_update_info.cpp b/be/src/olap/partial_update_info.cpp index 5e6082c5c13938..928ae80b38fc8f 100644 --- a/be/src/olap/partial_update_info.cpp +++ b/be/src/olap/partial_update_info.cpp @@ -311,9 +311,7 @@ Status FixedReadPlan::read_columns_by_plan( const signed char* __restrict cur_delete_signs) const { if (force_read_old_delete_signs) { // always read delete sign column from historical data - if (const vectorized::ColumnWithTypeAndName* old_delete_sign_column = - block.try_get_by_name(DELETE_SIGN); - old_delete_sign_column == nullptr) { + if (block.get_position_by_name(DELETE_SIGN) == -1) { auto del_col_cid = tablet_schema.field_index(DELETE_SIGN); cids_to_read.emplace_back(del_col_cid); block.swap(tablet_schema.create_block_by_cids(cids_to_read)); @@ -384,7 +382,10 @@ Status FixedReadPlan::fill_missing_columns( old_value_block, &read_index, true, nullptr)); const auto* old_delete_signs = BaseTablet::get_delete_sign_column_data(old_value_block); - DCHECK(old_delete_signs != nullptr); + if (old_delete_signs == nullptr) { + return Status::InternalError("old delete signs column not found, block: {}", + old_value_block.dump_structure()); + } // build default value columns auto default_value_block = old_value_block.clone_empty(); RETURN_IF_ERROR(BaseTablet::generate_default_value_block( @@ -420,27 +421,30 @@ Status FixedReadPlan::fill_missing_columns( } if (should_use_default) { - // clang-format off if (tablet_column.has_default_value()) { missing_col->insert_from(*mutable_default_value_columns[i], 0); } else if (tablet_column.is_nullable()) { - auto* nullable_column = assert_cast(missing_col.get()); + auto* nullable_column = + assert_cast(missing_col.get()); nullable_column->insert_many_defaults(1); } else if (tablet_schema.auto_increment_column() == tablet_column.name()) { - const auto& column = *DORIS_TRY(rowset_ctx->tablet_schema->column(tablet_column.name())); + const auto& column = + *DORIS_TRY(rowset_ctx->tablet_schema->column(tablet_column.name())); DCHECK(column.type() == FieldType::OLAP_FIELD_TYPE_BIGINT); auto* auto_inc_column = - assert_cast(missing_col.get()); - auto_inc_column->insert(vectorized::Field::create_field( -assert_cast( -block->get_by_name(BeConsts::PARTIAL_UPDATE_AUTO_INC_COL).column.get())->get_element(idx))); + assert_cast(missing_col.get()); + int pos = block->get_position_by_name(BeConsts::PARTIAL_UPDATE_AUTO_INC_COL); + if (pos == -1) { + return Status::InternalError("auto increment column not found in block {}", + block->dump_structure()); + } + auto_inc_column->insert_from(*block->get_by_position(pos).column.get(), idx); } else { // If the control flow reaches this branch, the column neither has default value // nor is nullable. It means that the row's delete sign is marked, and the value // columns are useless and won't be read. So we can just put arbitary values in the cells missing_col->insert(tablet_column.get_vec_type()->get_default()); } - // clang-format on } else { missing_col->insert_from(*old_value_block.get_by_position(i).column, pos_in_old_block); diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index 07586acc5a2b8a..e48585b1aacdf2 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -478,10 +478,10 @@ Status PushBrokerReader::_cast_to_input_block() { if (slot_desc->type()->get_primitive_type() == PrimitiveType::TYPE_VARIANT) { continue; } - auto& arg = _src_block_ptr->get_by_name(slot_desc->col_name()); // remove nullable here, let the get_function decide whether nullable auto return_type = slot_desc->get_data_type_ptr(); idx = _src_block_name_to_idx[slot_desc->col_name()]; + auto& arg = _src_block_ptr->get_by_position(idx); // bitmap convert:src -> to_base64 -> bitmap_from_base64 if (slot_desc->type()->get_primitive_type() == TYPE_BITMAP) { auto base64_return_type = vectorized::DataTypeFactory::instance().create_data_type( @@ -491,7 +491,7 @@ Status PushBrokerReader::_cast_to_input_block() { RETURN_IF_ERROR(func_to_base64->execute(nullptr, *_src_block_ptr, {idx}, idx, arg.column->size())); _src_block_ptr->get_by_position(idx).type = std::move(base64_return_type); - auto& arg_base64 = _src_block_ptr->get_by_name(slot_desc->col_name()); + auto& arg_base64 = _src_block_ptr->get_by_position(idx); auto func_bitmap_from_base64 = vectorized::SimpleFunctionFactory::instance().get_function( "bitmap_from_base64", {arg_base64}, return_type); diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index 57463316cdf038..b839231464b0a8 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -463,7 +463,6 @@ class TabletSchema : public MetadataAdder { void set_skip_write_index_on_load(bool skip) { _skip_write_index_on_load = skip; } bool skip_write_index_on_load() const { return _skip_write_index_on_load; } int32_t delete_sign_idx() const { return _delete_sign_idx; } - void set_delete_sign_idx(int32_t delete_sign_idx) { _delete_sign_idx = delete_sign_idx; } bool has_sequence_col() const { return _sequence_col_idx != -1; } int32_t sequence_col_idx() const { return _sequence_col_idx; } void set_version_col_idx(int32_t version_col_idx) { _version_col_idx = version_col_idx; } diff --git a/be/src/pipeline/exec/materialization_opertor.cpp b/be/src/pipeline/exec/materialization_opertor.cpp index 63fcd7b47a6040..689d345dd0a91a 100644 --- a/be/src/pipeline/exec/materialization_opertor.cpp +++ b/be/src/pipeline/exec/materialization_opertor.cpp @@ -59,8 +59,11 @@ Status MaterializationSharedState::merge_multi_response() { for (int i = 0; i < block_order_results.size(); ++i) { for (auto& [backend_id, rpc_struct] : rpc_struct_map) { vectorized::Block partial_block; + size_t uncompressed_size = 0; + int64_t uncompressed_time = 0; DCHECK(rpc_struct.response.blocks_size() > i); - RETURN_IF_ERROR(partial_block.deserialize(rpc_struct.response.blocks(i).block())); + RETURN_IF_ERROR(partial_block.deserialize(rpc_struct.response.blocks(i).block(), + &uncompressed_size, &uncompressed_time)); if (rpc_struct.response.blocks(i).has_profile()) { auto response_profile = RuntimeProfile::from_proto(rpc_struct.response.blocks(i).profile()); diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 60536cbbbc9183..61e42c81503fc3 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -1318,13 +1318,6 @@ Status ScanOperatorX::get_block(RuntimeState* state, vectorized: bool* eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); - // in inverted index apply logic, in order to optimize query performance, - // we built some temporary columns into block, these columns only used in scan node level, - // remove them when query leave scan node to avoid other nodes use block->columns() to make a wrong decision - Defer drop_block_temp_column {[&]() { - std::unique_lock l(local_state._block_lock); - block->erase_tmp_columns(); - }}; if (state->is_cancelled()) { if (local_state._scanner_ctx) { diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp b/be/src/pipeline/exec/schema_scan_operator.cpp index e57fbd75573877..79987c001de112 100644 --- a/be/src/pipeline/exec/schema_scan_operator.cpp +++ b/be/src/pipeline/exec/schema_scan_operator.cpp @@ -179,6 +179,7 @@ Status SchemaScanOperatorX::prepare(RuntimeState* state) { int j = 0; for (; j < columns_desc.size(); ++j) { if (boost::iequals(_dest_tuple_desc->slots()[i]->col_name(), columns_desc[j].name)) { + _slot_offsets[i] = j; break; } } @@ -250,11 +251,10 @@ Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block* bl if (src_block.rows()) { // block->check_number_of_rows(); for (int i = 0; i < _slot_num; ++i) { - auto* dest_slot_desc = _dest_tuple_desc->slots()[i]; vectorized::MutableColumnPtr column_ptr = std::move(*block->get_by_position(i).column).mutate(); column_ptr->insert_range_from( - *src_block.get_by_name(dest_slot_desc->col_name()).column, 0, + *src_block.safe_get_by_position(_slot_offsets[i]).column, 0, src_block.rows()); } RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, diff --git a/be/src/pipeline/exec/schema_scan_operator.h b/be/src/pipeline/exec/schema_scan_operator.h index 6846d9b59af240..0634b9d569330a 100644 --- a/be/src/pipeline/exec/schema_scan_operator.h +++ b/be/src/pipeline/exec/schema_scan_operator.h @@ -19,6 +19,8 @@ #include +#include + #include "common/status.h" #include "exec/schema_scanner.h" #include "operator.h" @@ -85,6 +87,9 @@ class SchemaScanOperatorX final : public OperatorX { int _tuple_idx; // slot num need to fill in and return int _slot_num; + + // slot index mapping to src column index + std::unordered_map _slot_offsets; }; #include "common/compile_check_end.h" diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index acab5e12935ae3..460879978cd33d 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -569,7 +569,9 @@ Status BaseTabletsChannel::_write_block_data( std::unordered_map>& tablet_to_rowidxs, PTabletWriterAddBlockResult* response) { vectorized::Block send_data; - RETURN_IF_ERROR(send_data.deserialize(request.block())); + [[maybe_unused]] size_t uncompressed_size = 0; + [[maybe_unused]] int64_t uncompressed_time = 0; + RETURN_IF_ERROR(send_data.deserialize(request.block(), &uncompressed_size, &uncompressed_time)); CHECK(send_data.rows() == request.tablet_ids_size()) << "block rows: " << send_data.rows() << ", tablet_ids_size: " << request.tablet_ids_size(); diff --git a/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp b/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp index 07e46cfcfed5c3..24fc977d02729a 100644 --- a/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp +++ b/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp @@ -268,7 +268,10 @@ arrow::Status ArrowFlightBatchRemoteReader::_fetch_data() { { SCOPED_ATOMIC_TIMER(&_deserialize_block_timer); _block = vectorized::Block::create_shared(); - st = _block->deserialize(callback->response_->block()); + [[maybe_unused]] size_t uncompressed_size = 0; + [[maybe_unused]] int64_t uncompressed_time = 0; + st = _block->deserialize(callback->response_->block(), &uncompressed_size, + &uncompressed_time); ARROW_RETURN_NOT_OK(to_arrow_status(st)); break; } diff --git a/be/src/vec/aggregate_functions/aggregate_function_sort.h b/be/src/vec/aggregate_functions/aggregate_function_sort.h index 79fdf4979b1453..3d8bc2480c2a10 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_sort.h +++ b/be/src/vec/aggregate_functions/aggregate_function_sort.h @@ -72,8 +72,10 @@ struct AggregateFunctionSortData { PBlock pblock; size_t uncompressed_bytes = 0; size_t compressed_bytes = 0; + int64_t compressed_time = 0; auto st = block.serialize(state->be_exec_version(), &pblock, &uncompressed_bytes, - &compressed_bytes, segment_v2::CompressionTypePB::NO_COMPRESSION); + &compressed_bytes, &compressed_time, + segment_v2::CompressionTypePB::NO_COMPRESSION); if (!st.ok()) { throw doris::Exception(st); } @@ -87,7 +89,9 @@ struct AggregateFunctionSortData { PBlock pblock; pblock.ParseFromString(data); - auto st = block.deserialize(pblock); + [[maybe_unused]] size_t uncompressed_size = 0; + [[maybe_unused]] int64_t uncompressed_time = 0; + auto st = block.deserialize(pblock, &uncompressed_size, &uncompressed_time); // If memory allocate failed during deserialize, st is not ok, throw exception here to // stop the query. if (!st.ok()) { diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index c09a7188d3ef70..dbaf8ee023b835 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -82,13 +82,9 @@ template void clear_blocks(moodycamel::ConcurrentQueue&, template void clear_blocks(moodycamel::ConcurrentQueue&, RuntimeProfile::Counter* memory_used_counter); -Block::Block(std::initializer_list il) : data {il} { - initialize_index_by_name(); -} +Block::Block(std::initializer_list il) : data {il} {} -Block::Block(ColumnsWithTypeAndName data_) : data {std::move(data_)} { - initialize_index_by_name(); -} +Block::Block(ColumnsWithTypeAndName data_) : data {std::move(data_)} {} Block::Block(const std::vector& slots, size_t block_size, bool ignore_trivial_slot) { @@ -112,7 +108,8 @@ Block::Block(const std::vector& slots, size_t block_size, *this = Block(slot_ptrs, block_size, ignore_trivial_slot); } -Status Block::deserialize(const PBlock& pblock) { +Status Block::deserialize(const PBlock& pblock, size_t* uncompressed_bytes, + int64_t* decompress_time) { swap(Block()); int be_exec_version = pblock.has_be_exec_version() ? pblock.be_exec_version() : 0; RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(be_exec_version)); @@ -121,7 +118,7 @@ Status Block::deserialize(const PBlock& pblock) { std::string compression_scratch; if (pblock.compressed()) { // Decompress - SCOPED_RAW_TIMER(&_decompress_time_ns); + SCOPED_RAW_TIMER(decompress_time); const char* compressed_data = pblock.column_values().c_str(); size_t compressed_size = pblock.column_values().size(); size_t uncompressed_size = 0; @@ -144,7 +141,7 @@ Status Block::deserialize(const PBlock& pblock) { compression_scratch.data()); DCHECK(success) << "snappy::RawUncompress failed"; } - _decompressed_bytes = uncompressed_size; + *uncompressed_bytes = uncompressed_size; buf = compression_scratch.data(); } else { buf = pblock.column_values().data(); @@ -158,22 +155,14 @@ Status Block::deserialize(const PBlock& pblock) { buf = type->deserialize(buf, &data_column, pblock.be_exec_version())); data.emplace_back(data_column->get_ptr(), type, pcol_meta.name()); } - initialize_index_by_name(); return Status::OK(); } void Block::reserve(size_t count) { - index_by_name.reserve(count); data.reserve(count); } -void Block::initialize_index_by_name() { - for (size_t i = 0, size = data.size(); i < size; ++i) { - index_by_name[data[i].name] = i; - } -} - void Block::insert(size_t position, const ColumnWithTypeAndName& elem) { if (position > data.size()) { throw Exception(ErrorCode::INTERNAL_ERROR, @@ -181,13 +170,6 @@ void Block::insert(size_t position, const ColumnWithTypeAndName& elem) { data.size(), dump_names()); } - for (auto& name_pos : index_by_name) { - if (name_pos.second >= position) { - ++name_pos.second; - } - } - - index_by_name.emplace(elem.name, position); data.emplace(data.begin() + position, elem); } @@ -198,30 +180,20 @@ void Block::insert(size_t position, ColumnWithTypeAndName&& elem) { data.size(), dump_names()); } - for (auto& name_pos : index_by_name) { - if (name_pos.second >= position) { - ++name_pos.second; - } - } - - index_by_name.emplace(elem.name, position); data.emplace(data.begin() + position, std::move(elem)); } void Block::clear_names() { - index_by_name.clear(); for (auto& entry : data) { entry.name.clear(); } } void Block::insert(const ColumnWithTypeAndName& elem) { - index_by_name.emplace(elem.name, data.size()); data.emplace_back(elem); } void Block::insert(ColumnWithTypeAndName&& elem) { - index_by_name.emplace(elem.name, data.size()); data.emplace_back(std::move(elem)); } @@ -235,13 +207,6 @@ void Block::erase_tail(size_t start) { DCHECK(start <= data.size()) << fmt::format( "Position out of bound in Block::erase(), max position = {}", data.size()); data.erase(data.begin() + start, data.end()); - for (auto it = index_by_name.begin(); it != index_by_name.end();) { - if (it->second >= start) { - index_by_name.erase(it++); - } else { - ++it; - } - } } void Block::erase(size_t position) { @@ -253,36 +218,7 @@ void Block::erase(size_t position) { } void Block::erase_impl(size_t position) { - bool need_maintain_index_by_name = true; - if (position + 1 == data.size()) { - index_by_name.erase(data.back().name); - need_maintain_index_by_name = false; - } - data.erase(data.begin() + position); - - if (need_maintain_index_by_name) { - for (auto it = index_by_name.begin(); it != index_by_name.end();) { - if (it->second == position) { - index_by_name.erase(it++); - } else { - if (it->second > position) { - --it->second; - } - ++it; - } - } - } -} - -void Block::erase(const String& name) { - auto index_it = index_by_name.find(name); - if (index_it == index_by_name.end()) { - throw Exception(ErrorCode::INTERNAL_ERROR, "No such name in Block, name={}, block_names={}", - name, dump_names()); - } - - erase_impl(index_it->second); } ColumnWithTypeAndName& Block::safe_get_by_position(size_t position) { @@ -303,54 +239,13 @@ const ColumnWithTypeAndName& Block::safe_get_by_position(size_t position) const return data[position]; } -ColumnWithTypeAndName& Block::get_by_name(const std::string& name) { - auto it = index_by_name.find(name); - if (index_by_name.end() == it) { - throw Exception(ErrorCode::INTERNAL_ERROR, "No such name in Block, name={}, block_names={}", - name, dump_names()); - } - - return data[it->second]; -} - -const ColumnWithTypeAndName& Block::get_by_name(const std::string& name) const { - auto it = index_by_name.find(name); - if (index_by_name.end() == it) { - throw Exception(ErrorCode::INTERNAL_ERROR, "No such name in Block, name={}, block_names={}", - name, dump_names()); - } - - return data[it->second]; -} - -ColumnWithTypeAndName* Block::try_get_by_name(const std::string& name) { - auto it = index_by_name.find(name); - if (index_by_name.end() == it) { - return nullptr; - } - return &data[it->second]; -} - -const ColumnWithTypeAndName* Block::try_get_by_name(const std::string& name) const { - auto it = index_by_name.find(name); - if (index_by_name.end() == it) { - return nullptr; - } - return &data[it->second]; -} - -bool Block::has(const std::string& name) const { - return index_by_name.end() != index_by_name.find(name); -} - -size_t Block::get_position_by_name(const std::string& name) const { - auto it = index_by_name.find(name); - if (index_by_name.end() == it) { - throw Exception(ErrorCode::INTERNAL_ERROR, "No such name in Block, name={}, block_names={}", - name, dump_names()); +int Block::get_position_by_name(const std::string& name) const { + for (int i = 0; i < data.size(); i++) { + if (data[i].name == name) { + return i; + } } - - return it->second; + return -1; } void Block::check_number_of_rows(bool allow_null_columns) const { @@ -768,7 +663,6 @@ DataTypes Block::get_data_types() const { void Block::clear() { data.clear(); - index_by_name.clear(); } void Block::clear_column_data(int64_t column_size) noexcept { @@ -789,15 +683,6 @@ void Block::clear_column_data(int64_t column_size) noexcept { } } -void Block::erase_tmp_columns() noexcept { - auto all_column_names = get_names(); - for (auto& name : all_column_names) { - if (name.rfind(BeConsts::BLOCK_TEMP_COLUMN_PREFIX, 0) == 0) { - erase(name); - } - } -} - void Block::clear_column_mem_not_keep(const std::vector& column_keep_flags, bool need_keep_first) { if (data.size() >= column_keep_flags.size()) { @@ -819,17 +704,14 @@ void Block::clear_column_mem_not_keep(const std::vector& column_keep_flags void Block::swap(Block& other) noexcept { SCOPED_SKIP_MEMORY_CHECK(); data.swap(other.data); - index_by_name.swap(other.index_by_name); } void Block::swap(Block&& other) noexcept { SCOPED_SKIP_MEMORY_CHECK(); data = std::move(other.data); - index_by_name = std::move(other.index_by_name); } void Block::shuffle_columns(const std::vector& result_column_ids) { - index_by_name.clear(); Container tmp_data; tmp_data.reserve(result_column_ids.size()); for (const int result_column_id : result_column_ids) { @@ -958,7 +840,8 @@ Status Block::filter_block(Block* block, size_t filter_column_id, size_t column_ Status Block::serialize(int be_exec_version, PBlock* pblock, /*std::string* compressed_buffer,*/ size_t* uncompressed_bytes, - size_t* compressed_bytes, segment_v2::CompressionTypePB compression_type, + size_t* compressed_bytes, int64_t* compress_time, + segment_v2::CompressionTypePB compression_type, bool allow_transfer_large_data) const { RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(be_exec_version)); pblock->set_be_exec_version(be_exec_version); @@ -998,7 +881,7 @@ Status Block::serialize(int be_exec_version, PBlock* pblock, // compress if (compression_type != segment_v2::NO_COMPRESSION && content_uncompressed_size > 0) { - SCOPED_RAW_TIMER(&_compress_time_ns); + SCOPED_RAW_TIMER(compress_time); pblock->set_compression_type(compression_type); pblock->set_uncompressed_size(serialize_bytes); @@ -1030,24 +913,6 @@ Status Block::serialize(int be_exec_version, PBlock* pblock, return Status::OK(); } -MutableBlock::MutableBlock(const std::vector& tuple_descs, int reserve_size, - bool ignore_trivial_slot) { - for (auto* const tuple_desc : tuple_descs) { - for (auto* const slot_desc : tuple_desc->slots()) { - if (ignore_trivial_slot && !slot_desc->is_materialized()) { - continue; - } - _data_types.emplace_back(slot_desc->get_data_type_ptr()); - _columns.emplace_back(_data_types.back()->create_column()); - if (reserve_size != 0) { - _columns.back()->reserve(reserve_size); - } - _names.push_back(slot_desc->col_name()); - } - } - initialize_index_by_name(); -} - size_t MutableBlock::rows() const { for (const auto& column : _columns) { if (column) { @@ -1063,7 +928,6 @@ void MutableBlock::swap(MutableBlock& another) noexcept { _columns.swap(another._columns); _data_types.swap(another._data_types); _names.swap(another._names); - index_by_name.swap(another.index_by_name); } void MutableBlock::add_row(const Block* block, int row) { @@ -1126,31 +990,6 @@ Status MutableBlock::add_rows(const Block* block, const std::vector& ro return Status::OK(); } -void MutableBlock::erase(const String& name) { - auto index_it = index_by_name.find(name); - if (index_it == index_by_name.end()) { - throw Exception(ErrorCode::INTERNAL_ERROR, "No such name in Block, name={}, block_names={}", - name, dump_names()); - } - - auto position = index_it->second; - - _columns.erase(_columns.begin() + position); - _data_types.erase(_data_types.begin() + position); - _names.erase(_names.begin() + position); - - for (auto it = index_by_name.begin(); it != index_by_name.end();) { - if (it->second == position) { - index_by_name.erase(it++); - } else { - if (it->second > position) { - --it->second; - } - ++it; - } - } -} - Block MutableBlock::to_block(int start_column) { return to_block(start_column, (int)_columns.size()); } @@ -1290,26 +1129,6 @@ void MutableBlock::clear_column_data() noexcept { } } -void MutableBlock::initialize_index_by_name() { - for (size_t i = 0, size = _names.size(); i < size; ++i) { - index_by_name[_names[i]] = i; - } -} - -bool MutableBlock::has(const std::string& name) const { - return index_by_name.end() != index_by_name.find(name); -} - -size_t MutableBlock::get_position_by_name(const std::string& name) const { - auto it = index_by_name.find(name); - if (index_by_name.end() == it) { - throw Exception(ErrorCode::INTERNAL_ERROR, "No such name in Block, name={}, block_names={}", - name, dump_names()); - } - - return it->second; -} - std::string MutableBlock::dump_names() const { std::string out; for (auto it = _names.begin(); it != _names.end(); ++it) { diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index b3ecb58ebed9e1..b99f2ec09ada41 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -73,14 +73,7 @@ class Block { private: using Container = ColumnsWithTypeAndName; - using IndexByName = phmap::flat_hash_map; Container data; - IndexByName index_by_name; - - int64_t _decompress_time_ns = 0; - int64_t _decompressed_bytes = 0; - - mutable int64_t _compress_time_ns = 0; public: Block() = default; @@ -113,8 +106,6 @@ class Block { void erase_tail(size_t start); /// remove the columns at the specified positions void erase(const std::set& positions); - /// remove the column with the specified name - void erase(const String& name); // T was std::set, std::vector, std::list template void erase_not_in(const T& container) { @@ -125,7 +116,13 @@ class Block { std::swap(data, new_data); } - void initialize_index_by_name(); + std::unordered_map get_name_to_pos_map() const { + std::unordered_map name_to_index_map; + for (uint32_t i = 0; i < data.size(); ++i) { + name_to_index_map[data[i].name] = i; + } + return name_to_index_map; + } /// References are invalidated after calling functions above. ColumnWithTypeAndName& get_by_position(size_t position) { @@ -151,13 +148,6 @@ class Block { ColumnWithTypeAndName& safe_get_by_position(size_t position); const ColumnWithTypeAndName& safe_get_by_position(size_t position) const; - ColumnWithTypeAndName& get_by_name(const std::string& name); - const ColumnWithTypeAndName& get_by_name(const std::string& name) const; - - // return nullptr when no such column name - ColumnWithTypeAndName* try_get_by_name(const std::string& name); - const ColumnWithTypeAndName* try_get_by_name(const std::string& name) const; - Container::iterator begin() { return data.begin(); } Container::iterator end() { return data.end(); } Container::const_iterator begin() const { return data.begin(); } @@ -165,9 +155,9 @@ class Block { Container::const_iterator cbegin() const { return data.cbegin(); } Container::const_iterator cend() const { return data.cend(); } - bool has(const std::string& name) const; - - size_t get_position_by_name(const std::string& name) const; + // Get position of column by name. Returns -1 if there is no column with that name. + // ATTN: this method is O(N). better maintain name -> position map in caller if you need to call it frequently. + int get_position_by_name(const std::string& name) const; const ColumnsWithTypeAndName& get_columns_with_type_and_name() const; @@ -291,10 +281,11 @@ class Block { // serialize block to PBlock Status serialize(int be_exec_version, PBlock* pblock, size_t* uncompressed_bytes, - size_t* compressed_bytes, segment_v2::CompressionTypePB compression_type, + size_t* compressed_bytes, int64_t* compress_time, + segment_v2::CompressionTypePB compression_type, bool allow_transfer_large_data = false) const; - Status deserialize(const PBlock& pblock); + Status deserialize(const PBlock& pblock, size_t* uncompressed_bytes, int64_t* decompress_time); std::unique_ptr create_same_struct_block(size_t size, bool is_reserve = false) const; @@ -362,15 +353,6 @@ class Block { // for String type or Array type void shrink_char_type_column_suffix_zero(const std::vector& char_type_idx); - int64_t get_decompress_time() const { return _decompress_time_ns; } - int64_t get_decompressed_bytes() const { return _decompressed_bytes; } - int64_t get_compress_time() const { return _compress_time_ns; } - - // remove tmp columns in block - // in inverted index apply logic, in order to optimize query performance, - // we built some temporary columns into block - void erase_tmp_columns() noexcept; - void clear_column_mem_not_keep(const std::vector& column_keep_flags, bool need_keep_first); @@ -391,9 +373,6 @@ class MutableBlock { DataTypes _data_types; std::vector _names; - using IndexByName = phmap::flat_hash_map; - IndexByName index_by_name; - public: static MutableBlock build_mutable_block(Block* block) { return block == nullptr ? MutableBlock() : MutableBlock(block); @@ -401,27 +380,19 @@ class MutableBlock { MutableBlock() = default; ~MutableBlock() = default; - MutableBlock(const std::vector& tuple_descs, int reserve_size = 0, - bool igore_trivial_slot = false); - MutableBlock(Block* block) : _columns(block->mutate_columns()), _data_types(block->get_data_types()), - _names(block->get_names()) { - initialize_index_by_name(); - } + _names(block->get_names()) {} MutableBlock(Block&& block) : _columns(block.mutate_columns()), _data_types(block.get_data_types()), - _names(block.get_names()) { - initialize_index_by_name(); - } + _names(block.get_names()) {} void operator=(MutableBlock&& m_block) { _columns = std::move(m_block._columns); _data_types = std::move(m_block._data_types); _names = std::move(m_block._names); - initialize_index_by_name(); } size_t rows() const; @@ -552,7 +523,6 @@ class MutableBlock { _columns[i] = _data_types[i]->create_column(); } } - initialize_index_by_name(); } else { if (_columns.size() != block.columns()) { return Status::Error( @@ -599,9 +569,6 @@ class MutableBlock { Status add_rows(const Block* block, size_t row_begin, size_t length); Status add_rows(const Block* block, const std::vector& rows); - /// remove the column with the specified name - void erase(const String& name); - std::string dump_data(size_t row_limit = 100) const; std::string dump_data_json(size_t row_limit = 100) const; @@ -627,15 +594,8 @@ class MutableBlock { std::vector& get_names() { return _names; } - bool has(const std::string& name) const; - - size_t get_position_by_name(const std::string& name) const; - /** Get a list of column names separated by commas. */ std::string dump_names() const; - -private: - void initialize_index_by_name(); }; struct IteratorRowRef { diff --git a/be/src/vec/core/sort_block.cpp b/be/src/vec/core/sort_block.cpp index 20bf1f952af47a..75aa9d85a12fd2 100644 --- a/be/src/vec/core/sort_block.cpp +++ b/be/src/vec/core/sort_block.cpp @@ -32,10 +32,7 @@ ColumnsWithSortDescriptions get_columns_with_sort_description(const Block& block for (size_t i = 0; i < size; ++i) { const IColumn* column = - !description[i].column_name.empty() - ? block.get_by_name(description[i].column_name).column.get() - : block.safe_get_by_position(description[i].column_number).column.get(); - + block.safe_get_by_position(description[i].column_number).column.get(); res.emplace_back(column, description[i]); } @@ -53,9 +50,7 @@ void sort_block(Block& src_block, Block& dest_block, const SortDescription& desc bool reverse = description[0].direction == -1; const IColumn* column = - !description[0].column_name.empty() - ? src_block.get_by_name(description[0].column_name).column.get() - : src_block.safe_get_by_position(description[0].column_number).column.get(); + src_block.safe_get_by_position(description[0].column_number).column.get(); IColumn::Permutation perm; column->get_permutation(reverse, limit, description[0].nulls_direction, perm); diff --git a/be/src/vec/core/sort_description.h b/be/src/vec/core/sort_description.h index cdee17f7651737..4d1543d690419f 100644 --- a/be/src/vec/core/sort_description.h +++ b/be/src/vec/core/sort_description.h @@ -20,20 +20,15 @@ #pragma once -#include "cstddef" -#include "memory" -#include "string" -#include "vec/core/field.h" #include "vector" namespace doris::vectorized { /// Description of the sorting rule by one column. struct SortColumnDescription { - std::string column_name; /// The name of the column. - int column_number; /// Column number (used if no name is given). - int direction; /// 1 - ascending, -1 - descending. - int nulls_direction; /// 1 - NULLs and NaNs are greater, -1 - less. + int column_number; /// Column number (used if no name is given). + int direction; /// 1 - ascending, -1 - descending. + int nulls_direction; /// 1 - NULLs and NaNs are greater, -1 - less. SortColumnDescription(int column_number_, int direction_, int nulls_direction_) : column_number(column_number_), diff --git a/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp b/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp index def1645f3232f1..dfd4cdcfc91d16 100644 --- a/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp +++ b/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp @@ -103,12 +103,17 @@ Status ArrowStreamReader::get_next_block(Block* block, size_t* read_rows, bool* auto num_columns = batch.num_columns(); for (int c = 0; c < num_columns; ++c) { arrow::Array* column = batch.column(c).get(); - std::string column_name = batch.schema()->field(c)->name(); try { const vectorized::ColumnWithTypeAndName& column_with_name = - block->get_by_name(column_name); + block->safe_get_by_position(c); + + if (column_with_name.name != column_name) { + return Status::InternalError("Column name mismatch: expected {}, got {}", + column_with_name.name, column_name); + } + RETURN_IF_ERROR(column_with_name.type->get_serde()->read_column_from_arrow( column_with_name.column->assume_mutable_ref(), column, 0, num_rows, _ctzz)); } catch (Exception& e) { diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 7da93423dcaebc..d72439e3c349a2 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -1277,9 +1277,10 @@ Status OrcReader::_fill_partition_columns( const std::unordered_map>& partition_columns) { DataTypeSerDe::FormatOptions _text_formatOptions; + // todo: maybe do not need to build name to index map every time + auto name_to_pos_map = block->get_name_to_pos_map(); for (const auto& kv : partition_columns) { - auto doris_column = block->get_by_name(kv.first).column; - auto* col_ptr = const_cast(doris_column.get()); + auto col_ptr = block->get_by_position(name_to_pos_map[kv.first]).column->assume_mutable(); const auto& [value, slot_desc] = kv.second; auto _text_serde = slot_desc->get_data_type_ptr()->get_serde(); Slice slice(value.data(), value.size()); @@ -1305,10 +1306,18 @@ Status OrcReader::_fill_partition_columns( Status OrcReader::_fill_missing_columns( Block* block, uint64_t rows, const std::unordered_map& missing_columns) { + // todo: maybe do not need to build name to index map every time + auto name_to_pos_map = block->get_name_to_pos_map(); + std::set positions_to_erase; for (const auto& kv : missing_columns) { + if (!name_to_pos_map.contains(kv.first)) { + return Status::InternalError("Failed to find missing column: {}, block: {}", kv.first, + block->dump_structure()); + } if (kv.second == nullptr) { // no default column, fill with null - auto mutable_column = block->get_by_name(kv.first).column->assume_mutable(); + auto mutable_column = + block->get_by_position(name_to_pos_map[kv.first]).column->assume_mutable(); auto* nullable_column = static_cast(mutable_column.get()); nullable_column->insert_many_defaults(rows); } else { @@ -1328,15 +1337,16 @@ Status OrcReader::_fill_missing_columns( mutable_column->resize(rows); // result_column_ptr maybe a ColumnConst, convert it to a normal column result_column_ptr = result_column_ptr->convert_to_full_column_if_const(); - auto origin_column_type = block->get_by_name(kv.first).type; + auto origin_column_type = block->get_by_position(name_to_pos_map[kv.first]).type; bool is_nullable = origin_column_type->is_nullable(); block->replace_by_position( - block->get_position_by_name(kv.first), + name_to_pos_map[kv.first], is_nullable ? make_nullable(result_column_ptr) : result_column_ptr); - block->erase(result_column_id); + positions_to_erase.insert(result_column_id); } } } + block->erase(positions_to_erase); return Status::OK(); } @@ -2009,8 +2019,10 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo std::vector batch_vec; _fill_batch_vec(batch_vec, _batch.get(), 0); + // todo: maybe do not need to build name to index map every time + auto name_to_pos_map = block->get_name_to_pos_map(); for (auto& col_name : _lazy_read_ctx.lazy_read_columns) { - auto& column_with_type_and_name = block->get_by_name(col_name); + auto& column_with_type_and_name = block->get_by_position(name_to_pos_map[col_name]); auto& column_ptr = column_with_type_and_name.column; auto& column_type = column_with_type_and_name.type; auto file_column_name = _table_info_node_ptr->children_file_column_name(col_name); @@ -2076,10 +2088,17 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo } } + // todo: maybe do not need to build name to index map every time + auto name_to_pos_map = block->get_name_to_pos_map(); if (!_dict_cols_has_converted && !_dict_filter_cols.empty()) { for (auto& dict_filter_cols : _dict_filter_cols) { MutableColumnPtr dict_col_ptr = ColumnInt32::create(); - size_t pos = block->get_position_by_name(dict_filter_cols.first); + if (!name_to_pos_map.contains(dict_filter_cols.first)) { + return Status::InternalError( + "Failed to find dict filter column '{}' in block {}", + dict_filter_cols.first, block->dump_structure()); + } + auto pos = name_to_pos_map[dict_filter_cols.first]; auto& column_with_type_and_name = block->get_by_position(pos); auto& column_type = column_with_type_and_name.type; if (column_type->is_nullable()) { @@ -2101,7 +2120,7 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo _fill_batch_vec(batch_vec, _batch.get(), 0); for (auto& col_name : _lazy_read_ctx.all_read_columns) { - auto& column_with_type_and_name = block->get_by_name(col_name); + auto& column_with_type_and_name = block->get_by_position(name_to_pos_map[col_name]); auto& column_ptr = column_with_type_and_name.column; auto& column_type = column_with_type_and_name.type; auto file_column_name = _table_info_node_ptr->children_file_column_name(col_name); @@ -2212,19 +2231,27 @@ void OrcReader::_build_delete_row_filter(const Block* block, size_t rows) { if (_delete_rows != nullptr) { _delete_rows_filter_ptr = std::make_unique(rows, 1); auto* __restrict _pos_delete_filter_data = _delete_rows_filter_ptr->data(); + // todo: maybe do not need to build name to index map every time + auto name_to_pos_map = block->get_name_to_pos_map(); const auto& original_transaction_column = assert_cast(*remove_nullable( - block->get_by_name(TransactionalHive::ORIGINAL_TRANSACTION_LOWER_CASE).column)); - const auto& bucket_id_column = assert_cast( - *remove_nullable(block->get_by_name(TransactionalHive::BUCKET_LOWER_CASE).column)); - const auto& row_id_column = assert_cast( - *remove_nullable(block->get_by_name(TransactionalHive::ROW_ID_LOWER_CASE).column)); + block->get_by_position( + name_to_pos_map[TransactionalHive::ORIGINAL_TRANSACTION_LOWER_CASE]) + .column)); + const auto& bucket_id_column = assert_cast(*remove_nullable( + block->get_by_position(name_to_pos_map[TransactionalHive::BUCKET_LOWER_CASE]) + .column)); + const auto& row_id_column = assert_cast(*remove_nullable( + block->get_by_position(name_to_pos_map[TransactionalHive::ROW_ID_LOWER_CASE]) + .column)); for (int i = 0; i < rows; ++i) { auto original_transaction = original_transaction_column.get_int(i); auto bucket_id = bucket_id_column.get_int(i); auto row_id = row_id_column.get_int(i); - TransactionalHiveReader::AcidRowID transactional_row_id = {original_transaction, - bucket_id, row_id}; + TransactionalHiveReader::AcidRowID transactional_row_id = { + .original_transaction = original_transaction, + .bucket = bucket_id, + .row_id = row_id}; if (_delete_rows->contains(transactional_row_id)) { _pos_delete_filter_data[i] = 0; } @@ -2238,9 +2265,15 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s size_t origin_column_num = block->columns(); if (!_dict_cols_has_converted && !_dict_filter_cols.empty()) { + // todo: maybe do not need to build name to index map every time + auto name_to_pos_map = block->get_name_to_pos_map(); for (auto& dict_filter_cols : _dict_filter_cols) { + if (!name_to_pos_map.contains(dict_filter_cols.first)) { + return Status::InternalError("Failed to find dict filter column '{}' in block {}", + dict_filter_cols.first, block->dump_structure()); + } MutableColumnPtr dict_col_ptr = ColumnInt32::create(); - size_t pos = block->get_position_by_name(dict_filter_cols.first); + auto pos = name_to_pos_map[dict_filter_cols.first]; auto& column_with_type_and_name = block->get_by_position(pos); auto& column_type = column_with_type_and_name.type; if (column_type->is_nullable()) { @@ -2266,8 +2299,10 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.begin(), TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.end()); } + // todo: maybe do not need to build name to index map every time + auto name_to_pos_map = block->get_name_to_pos_map(); for (auto& table_col_name : table_col_names) { - auto& column_with_type_and_name = block->get_by_name(table_col_name); + auto& column_with_type_and_name = block->get_by_position(name_to_pos_map[table_col_name]); auto& column_ptr = column_with_type_and_name.column; auto& column_type = column_with_type_and_name.type; auto file_column_name = _table_info_node_ptr->children_file_column_name(table_col_name); @@ -2319,13 +2354,13 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s if (can_filter_all) { for (auto& col : table_col_names) { // clean block to read predicate columns and acid columns - block->get_by_name(col).column->assume_mutable()->clear(); + block->get_by_position(name_to_pos_map[col]).column->assume_mutable()->clear(); } for (auto& col : _lazy_read_ctx.predicate_partition_columns) { - block->get_by_name(col.first).column->assume_mutable()->clear(); + block->get_by_position(name_to_pos_map[col.first]).column->assume_mutable()->clear(); } for (auto& col : _lazy_read_ctx.predicate_missing_columns) { - block->get_by_name(col.first).column->assume_mutable()->clear(); + block->get_by_position(name_to_pos_map[col.first]).column->assume_mutable()->clear(); } Block::erase_useless_column(block, origin_column_num); RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, nullptr)); @@ -2639,8 +2674,14 @@ Status OrcReader::_convert_dict_cols_to_string_cols( return Status::OK(); } if (!_dict_filter_cols.empty()) { + // todo: maybe do not need to build name to index map every time + auto name_to_pos_map = block->get_name_to_pos_map(); for (auto& dict_filter_cols : _dict_filter_cols) { - size_t pos = block->get_position_by_name(dict_filter_cols.first); + if (!name_to_pos_map.contains(dict_filter_cols.first)) { + return Status::InternalError("Failed to find dict filter column '{}' in block {}", + dict_filter_cols.first, block->dump_structure()); + } + auto pos = name_to_pos_map[dict_filter_cols.first]; ColumnWithTypeAndName& column_with_type_and_name = block->get_by_position(pos); const ColumnPtr& column = column_with_type_and_name.column; diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 96feeceea8d6cf..c0a4b2a17046e0 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -25,6 +25,7 @@ #include #include +#include #include #include "common/config.h" @@ -392,25 +393,32 @@ Status RowGroupReader::_read_column_data(Block* block, FilterMap& filter_map) { size_t batch_read_rows = 0; bool has_eof = false; + // todo: maybe do not need to build name to index map every time + auto name_to_idx = block->get_name_to_pos_map(); for (auto& read_col_name : table_columns) { - auto& column_with_type_and_name = block->get_by_name(read_col_name); + auto& column_with_type_and_name = block->safe_get_by_position(name_to_idx[read_col_name]); auto& column_ptr = column_with_type_and_name.column; auto& column_type = column_with_type_and_name.type; bool is_dict_filter = false; for (auto& _dict_filter_col : _dict_filter_cols) { if (_dict_filter_col.first == read_col_name) { MutableColumnPtr dict_column = ColumnInt32::create(); - size_t pos = block->get_position_by_name(read_col_name); + if (!name_to_idx.contains(read_col_name)) { + return Status::InternalError( + "Wrong read column '{}' in parquet file, block: {}", read_col_name, + block->dump_structure()); + } if (column_type->is_nullable()) { - block->get_by_position(pos).type = + block->get_by_position(name_to_idx[read_col_name]).type = std::make_shared(std::make_shared()); block->replace_by_position( - pos, + name_to_idx[read_col_name], ColumnNullable::create(std::move(dict_column), ColumnUInt8::create(dict_column->size(), 0))); } else { - block->get_by_position(pos).type = std::make_shared(); - block->replace_by_position(pos, std::move(dict_column)); + block->get_by_position(name_to_idx[read_col_name]).type = + std::make_shared(); + block->replace_by_position(name_to_idx[read_col_name], std::move(dict_column)); } is_dict_filter = true; break; @@ -511,20 +519,25 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re } const uint8_t* __restrict filter_map_data = result_filter.data(); - filter_map_ptr.reset(new FilterMap()); + filter_map_ptr = std::make_unique(); RETURN_IF_ERROR(filter_map_ptr->init(filter_map_data, pre_read_rows, can_filter_all)); if (filter_map_ptr->filter_all()) { { SCOPED_RAW_TIMER(&_predicate_filter_time); - for (auto& col : _lazy_read_ctx.predicate_columns.first) { + auto name_to_idx = block->get_name_to_pos_map(); + for (const auto& col : _lazy_read_ctx.predicate_columns.first) { // clean block to read predicate columns - block->get_by_name(col).column->assume_mutable()->clear(); + block->get_by_position(name_to_idx[col]).column->assume_mutable()->clear(); } - for (auto& col : _lazy_read_ctx.predicate_partition_columns) { - block->get_by_name(col.first).column->assume_mutable()->clear(); + for (const auto& col : _lazy_read_ctx.predicate_partition_columns) { + block->get_by_position(name_to_idx[col.first]) + .column->assume_mutable() + ->clear(); } - for (auto& col : _lazy_read_ctx.predicate_missing_columns) { - block->get_by_name(col.first).column->assume_mutable()->clear(); + for (const auto& col : _lazy_read_ctx.predicate_missing_columns) { + block->get_by_position(name_to_idx[col.first]) + .column->assume_mutable() + ->clear(); } if (_row_id_column_iterator_pair.first != nullptr) { block->get_by_position(_row_id_column_iterator_pair.second) @@ -660,10 +673,12 @@ Status RowGroupReader::_fill_partition_columns( const std::unordered_map>& partition_columns) { DataTypeSerDe::FormatOptions _text_formatOptions; - for (auto& kv : partition_columns) { - auto doris_column = block->get_by_name(kv.first).column; - IColumn* col_ptr = const_cast(doris_column.get()); - auto& [value, slot_desc] = kv.second; + auto name_to_idx = block->get_name_to_pos_map(); + for (const auto& kv : partition_columns) { + auto doris_column = block->get_by_position(name_to_idx[kv.first]).column; + // obtained from block*, it is a mutable object. + auto* col_ptr = const_cast(doris_column.get()); + const auto& [value, slot_desc] = kv.second; auto _text_serde = slot_desc->get_data_type_ptr()->get_serde(); Slice slice(value.data(), value.size()); uint64_t num_deserialized = 0; @@ -688,15 +703,23 @@ Status RowGroupReader::_fill_partition_columns( Status RowGroupReader::_fill_missing_columns( Block* block, size_t rows, const std::unordered_map& missing_columns) { - for (auto& kv : missing_columns) { + // todo: maybe do not need to build name to index map every time + auto name_to_idx = block->get_name_to_pos_map(); + std::set positions_to_erase; + for (const auto& kv : missing_columns) { + if (!name_to_idx.contains(kv.first)) { + return Status::InternalError("Missing column: {} not found in block {}", kv.first, + block->dump_structure()); + } if (kv.second == nullptr) { // no default column, fill with null - auto mutable_column = block->get_by_name(kv.first).column->assume_mutable(); + auto mutable_column = + block->get_by_position(name_to_idx[kv.first]).column->assume_mutable(); auto* nullable_column = assert_cast(mutable_column.get()); nullable_column->insert_many_defaults(rows); } else { // fill with default value - auto& ctx = kv.second; + const auto& ctx = kv.second; auto origin_column_num = block->columns(); int result_column_id = -1; // PT1 => dest primitive type @@ -711,15 +734,16 @@ Status RowGroupReader::_fill_missing_columns( mutable_column->resize(rows); // result_column_ptr maybe a ColumnConst, convert it to a normal column result_column_ptr = result_column_ptr->convert_to_full_column_if_const(); - auto origin_column_type = block->get_by_name(kv.first).type; + auto origin_column_type = block->get_by_position(name_to_idx[kv.first]).type; bool is_nullable = origin_column_type->is_nullable(); block->replace_by_position( - block->get_position_by_name(kv.first), + name_to_idx[kv.first], is_nullable ? make_nullable(result_column_ptr) : result_column_ptr); - block->erase(result_column_id); + positions_to_erase.insert(result_column_id); } } } + block->erase(positions_to_erase); return Status::OK(); } @@ -1071,13 +1095,20 @@ Status RowGroupReader::_rewrite_dict_conjuncts(std::vector& dict_codes, } void RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) { + // todo: maybe do not need to build name to index map every time + auto name_to_idx = block->get_name_to_pos_map(); for (auto& dict_filter_cols : _dict_filter_cols) { - size_t pos = block->get_position_by_name(dict_filter_cols.first); - ColumnWithTypeAndName& column_with_type_and_name = block->get_by_position(pos); + if (!name_to_idx.contains(dict_filter_cols.first)) { + throw Exception(ErrorCode::INTERNAL_ERROR, + "Wrong read column '{}' in parquet file, block: {}", + dict_filter_cols.first, block->dump_structure()); + } + ColumnWithTypeAndName& column_with_type_and_name = + block->get_by_position(name_to_idx[dict_filter_cols.first]); const ColumnPtr& column = column_with_type_and_name.column; - if (auto* nullable_column = check_and_get_column(*column)) { + if (const auto* nullable_column = check_and_get_column(*column)) { const ColumnPtr& nested_column = nullable_column->get_nested_column_ptr(); - const ColumnInt32* dict_column = assert_cast(nested_column.get()); + const auto* dict_column = assert_cast(nested_column.get()); DCHECK(dict_column); MutableColumnPtr string_column = @@ -1087,16 +1118,18 @@ void RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) { column_with_type_and_name.type = std::make_shared(std::make_shared()); block->replace_by_position( - pos, ColumnNullable::create(std::move(string_column), - nullable_column->get_null_map_column_ptr())); + name_to_idx[dict_filter_cols.first], + ColumnNullable::create(std::move(string_column), + nullable_column->get_null_map_column_ptr())); } else { - const ColumnInt32* dict_column = assert_cast(column.get()); + const auto* dict_column = assert_cast(column.get()); MutableColumnPtr string_column = _column_readers[dict_filter_cols.first]->convert_dict_column_to_string_column( dict_column); column_with_type_and_name.type = std::make_shared(); - block->replace_by_position(pos, std::move(string_column)); + block->replace_by_position(name_to_idx[dict_filter_cols.first], + std::move(string_column)); } } } diff --git a/be/src/vec/exec/format/table/equality_delete.cpp b/be/src/vec/exec/format/table/equality_delete.cpp index 7f8452f4b18378..48914a021441d8 100644 --- a/be/src/vec/exec/format/table/equality_delete.cpp +++ b/be/src/vec/exec/format/table/equality_delete.cpp @@ -45,15 +45,16 @@ Status SimpleEqualityDelete::_build_set() { Status SimpleEqualityDelete::filter_data_block(Block* data_block) { SCOPED_TIMER(equality_delete_time); - auto* column_and_type = data_block->try_get_by_name(_delete_column_name); - if (column_and_type == nullptr) { - return Status::InternalError("Can't find the delete column '{}' in data file", - _delete_column_name); + int pos = data_block->get_position_by_name(_delete_column_name); + if (pos == -1) { + return Status::InternalError("Column '{}' not found in data block: {}", _delete_column_name, + data_block->dump_structure()); } - if (column_and_type->type->get_primitive_type() != _delete_column_type) { + auto column_and_type = data_block->get_by_position(pos); + if (column_and_type.type->get_primitive_type() != _delete_column_type) { return Status::InternalError( "Not support type change in column '{}', src type: {}, target type: {}", - _delete_column_name, column_and_type->type->get_name(), (int)_delete_column_type); + _delete_column_name, column_and_type.type->get_name(), (int)_delete_column_type); } size_t rows = data_block->rows(); // _filter: 1 => in _hybrid_set; 0 => not in _hybrid_set @@ -64,12 +65,12 @@ Status SimpleEqualityDelete::filter_data_block(Block* data_block) { _filter->assign(rows, UInt8(0)); } - if (column_and_type->column->is_nullable()) { + if (column_and_type.column->is_nullable()) { const NullMap& null_map = - reinterpret_cast(column_and_type->column.get()) + reinterpret_cast(column_and_type.column.get()) ->get_null_map_data(); _hybrid_set->find_batch_nullable( - remove_nullable(column_and_type->column)->assume_mutable_ref(), rows, null_map, + remove_nullable(column_and_type.column)->assume_mutable_ref(), rows, null_map, *_filter); if (_hybrid_set->contain_null()) { auto* filter_data = _filter->data(); @@ -78,7 +79,7 @@ Status SimpleEqualityDelete::filter_data_block(Block* data_block) { } } } else { - _hybrid_set->find_batch(column_and_type->column->assume_mutable_ref(), rows, *_filter); + _hybrid_set->find_batch(column_and_type.column->assume_mutable_ref(), rows, *_filter); } // should reverse _filter auto* filter_data = _filter->data(); @@ -108,19 +109,22 @@ Status MultiEqualityDelete::_build_set() { Status MultiEqualityDelete::filter_data_block(Block* data_block) { SCOPED_TIMER(equality_delete_time); size_t column_index = 0; - for (std::string column_name : _delete_block->get_names()) { - auto* column_and_type = data_block->try_get_by_name(column_name); - if (column_and_type == nullptr) { - return Status::InternalError("Can't find the delete column '{}' in data file", - column_name); + + // todo: maybe do not need to build name to index map every time + auto name_to_pos_map = data_block->get_name_to_pos_map(); + for (auto delete_col : _delete_block->get_columns_with_type_and_name()) { + const std::string& column_name = delete_col.name; + auto column_and_type = data_block->safe_get_by_position(name_to_pos_map[column_name]); + if (name_to_pos_map.contains(column_name) == false) { + return Status::InternalError("Column '{}' not found in data block: {}", column_name, + data_block->dump_structure()); } - if (!_delete_block->get_by_name(column_name).type->equals(*column_and_type->type)) { + if (!delete_col.type->equals(*column_and_type.type)) { return Status::InternalError( "Not support type change in column '{}', src type: {}, target type: {}", - column_name, _delete_block->get_by_name(column_name).type->get_name(), - column_and_type->type->get_name()); + column_name, delete_col.type->get_name(), column_and_type.type->get_name()); } - _data_column_index[column_index++] = data_block->get_position_by_name(column_name); + _data_column_index[column_index++] = name_to_pos_map[column_name]; } size_t rows = data_block->rows(); _data_hashes.clear(); diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index a9640e353b103b..9b78f11a17742a 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -223,20 +223,32 @@ void IcebergTableReader::_generate_equality_delete_block( } Status IcebergTableReader::_expand_block_if_need(Block* block) { + std::set names; + auto block_names = block->get_names(); + names.insert(block_names.begin(), block_names.end()); for (auto& col : _expand_columns) { col.column->assume_mutable()->clear(); - if (block->try_get_by_name(col.name)) { + if (names.contains(col.name)) { return Status::InternalError("Wrong expand column '{}'", col.name); } + names.insert(col.name); block->insert(col); } return Status::OK(); } Status IcebergTableReader::_shrink_block_if_need(Block* block) { + // todo: maybe do not need to build name to index map every time + auto name_to_pos_map = block->get_name_to_pos_map(); + std::set positions_to_erase; for (const std::string& expand_col : _expand_col_names) { - block->erase(expand_col); + if (!name_to_pos_map.contains(expand_col)) { + return Status::InternalError("Wrong erase column '{}', block: {}", expand_col, + block->dump_names()); + } + positions_to_erase.emplace(name_to_pos_map[expand_col]); } + block->erase(positions_to_erase); return Status::OK(); } @@ -383,9 +395,11 @@ void IcebergTableReader::_sort_delete_rows(std::vector*>& d void IcebergTableReader::_gen_position_delete_file_range(Block& block, DeleteFile* position_delete, size_t read_rows, bool file_path_column_dictionary_coded) { - ColumnPtr path_column = block.get_by_name(ICEBERG_FILE_PATH).column; + // todo: maybe do not need to build name to index map every time + auto name_to_pos_map = block.get_name_to_pos_map(); + ColumnPtr path_column = block.get_by_position(name_to_pos_map[ICEBERG_FILE_PATH]).column; DCHECK_EQ(path_column->size(), read_rows); - ColumnPtr pos_column = block.get_by_name(ICEBERG_ROW_POS).column; + ColumnPtr pos_column = block.get_by_position(name_to_pos_map[ICEBERG_ROW_POS]).column; using ColumnType = typename PrimitiveTypeTraits::ColumnType; const int64_t* src_data = assert_cast(*pos_column).get_data().data(); IcebergTableReader::PositionDeleteRange range; diff --git a/be/src/vec/exec/format/wal/wal_reader.cpp b/be/src/vec/exec/format/wal/wal_reader.cpp index bb9b4318e9c906..2390f210d3e250 100644 --- a/be/src/vec/exec/format/wal/wal_reader.cpp +++ b/be/src/vec/exec/format/wal/wal_reader.cpp @@ -60,7 +60,9 @@ Status WalReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { _wal_path); } Block src_block; - RETURN_IF_ERROR(src_block.deserialize(pblock)); + size_t uncompressed_size = 0; + int64_t uncompressed_time = 0; + RETURN_IF_ERROR(src_block.deserialize(pblock, &uncompressed_size, &uncompressed_time)); //convert to dst block Block dst_block; int index = 0; diff --git a/be/src/vec/exec/jni_connector.cpp b/be/src/vec/exec/jni_connector.cpp index 91457cb7718da6..7168639c59a0ea 100644 --- a/be/src/vec/exec/jni_connector.cpp +++ b/be/src/vec/exec/jni_connector.cpp @@ -324,8 +324,10 @@ Status JniConnector::_fill_block(Block* block, size_t num_rows) { SCOPED_RAW_TIMER(&_fill_block_watcher); JNIEnv* env = nullptr; RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + // todo: maybe do not need to build name to index map every time + auto name_to_pos_map = block->get_name_to_pos_map(); for (int i = 0; i < _column_names.size(); ++i) { - auto& column_with_type_and_name = block->get_by_name(_column_names[i]); + auto& column_with_type_and_name = block->get_by_position(name_to_pos_map[_column_names[i]]); auto& column_ptr = column_with_type_and_name.column; auto& column_type = column_with_type_and_name.type; RETURN_IF_ERROR(_fill_column(_table_meta, column_ptr, column_type, num_rows)); diff --git a/be/src/vec/exec/scan/file_scanner.cpp b/be/src/vec/exec/scan/file_scanner.cpp index 116fcbd33e7c1e..476eae26ebe2fa 100644 --- a/be/src/vec/exec/scan/file_scanner.cpp +++ b/be/src/vec/exec/scan/file_scanner.cpp @@ -543,6 +543,9 @@ Status FileScanner::_check_output_block_types() { Status FileScanner::_init_src_block(Block* block) { if (!_is_load) { _src_block_ptr = block; + + // todo: maybe do not need to build name to index map every time + _src_block_name_to_idx = block->get_name_to_pos_map(); return Status::OK(); } RETURN_IF_ERROR(_check_output_block_types()); @@ -609,7 +612,7 @@ Status FileScanner::_cast_to_input_block(Block* block) { // skip variant type continue; } - auto& arg = _src_block_ptr->get_by_name(slot_desc->col_name()); + auto& arg = _src_block_ptr->get_by_position(_src_block_name_to_idx[slot_desc->col_name()]); auto return_type = slot_desc->get_data_type_ptr(); // remove nullable here, let the get_function decide whether nullable auto data_type = get_data_type_with_default_argument(remove_nullable(return_type)); @@ -637,7 +640,9 @@ Status FileScanner::_fill_columns_from_path(size_t rows) { } DataTypeSerDe::FormatOptions _text_formatOptions; for (auto& kv : _partition_col_descs) { - auto doris_column = _src_block_ptr->get_by_name(kv.first).column; + auto doris_column = + _src_block_ptr->get_by_position(_src_block_name_to_idx[kv.first]).column; + // _src_block_ptr points to a mutable block created by this class itself, so const_cast can be used here. IColumn* col_ptr = const_cast(doris_column.get()); auto& [value, slot_desc] = kv.second; auto _text_serde = slot_desc->get_data_type_ptr()->get_serde(); @@ -669,7 +674,8 @@ Status FileScanner::_fill_missing_columns(size_t rows) { for (auto& kv : _missing_col_descs) { if (kv.second == nullptr) { // no default column, fill with null - auto mutable_column = _src_block_ptr->get_by_name(kv.first).column->assume_mutable(); + auto mutable_column = _src_block_ptr->get_by_position(_src_block_name_to_idx[kv.first]) + .column->assume_mutable(); auto* nullable_column = static_cast(mutable_column.get()); nullable_column->insert_many_defaults(rows); } else { @@ -689,10 +695,15 @@ Status FileScanner::_fill_missing_columns(size_t rows) { mutable_column->resize(rows); // result_column_ptr maybe a ColumnConst, convert it to a normal column result_column_ptr = result_column_ptr->convert_to_full_column_if_const(); - auto origin_column_type = _src_block_ptr->get_by_name(kv.first).type; + auto origin_column_type = + _src_block_ptr->get_by_position(_src_block_name_to_idx[kv.first]).type; bool is_nullable = origin_column_type->is_nullable(); + if (!_src_block_name_to_idx.contains(kv.first)) { + return Status::InternalError("Column {} not found in src block {}", kv.first, + _src_block_ptr->dump_structure()); + } _src_block_ptr->replace_by_position( - _src_block_ptr->get_position_by_name(kv.first), + _src_block_name_to_idx[kv.first], is_nullable ? make_nullable(result_column_ptr) : result_column_ptr); _src_block_ptr->erase(result_column_id); } diff --git a/be/src/vec/exec/scan/olap_scanner.cpp b/be/src/vec/exec/scan/olap_scanner.cpp index 3f56d9e5be8c38..c0c0c35f0be0f1 100644 --- a/be/src/vec/exec/scan/olap_scanner.cpp +++ b/be/src/vec/exec/scan/olap_scanner.cpp @@ -427,8 +427,8 @@ Status OlapScanner::_init_tablet_reader_params( DBUG_EXECUTE_IF("NewOlapScanner::_init_tablet_reader_params.block", DBUG_BLOCK); if (!_state->skip_storage_engine_merge()) { - TOlapScanNode& olap_scan_node = - ((pipeline::OlapScanLocalState*)_local_state)->olap_scan_node(); + auto* olap_scan_local_state = (pipeline::OlapScanLocalState*)_local_state; + TOlapScanNode& olap_scan_node = olap_scan_local_state->olap_scan_node(); // order by table keys optimization for topn // will only read head/tail of data file since it's already sorted by keys if (olap_scan_node.__isset.sort_info && !olap_scan_node.sort_info.is_asc_order.empty()) { @@ -440,16 +440,20 @@ Status OlapScanner::_init_tablet_reader_params( _tablet_reader_params.read_orderby_key_num_prefix_columns = olap_scan_node.sort_info.is_asc_order.size(); _tablet_reader_params.read_orderby_key_limit = _limit; - _tablet_reader_params.filter_block_conjuncts = _conjuncts; + + if (_tablet_reader_params.read_orderby_key_limit > 0 && + olap_scan_local_state->_storage_no_merge()) { + _tablet_reader_params.filter_block_conjuncts = _conjuncts; + _conjuncts.clear(); + } } // set push down topn filter _tablet_reader_params.topn_filter_source_node_ids = - ((pipeline::OlapScanLocalState*)_local_state) - ->get_topn_filter_source_node_ids(_state, true); + olap_scan_local_state->get_topn_filter_source_node_ids(_state, true); if (!_tablet_reader_params.topn_filter_source_node_ids.empty()) { _tablet_reader_params.topn_filter_target_node_id = - ((pipeline::OlapScanLocalState*)_local_state)->parent()->node_id(); + olap_scan_local_state->parent()->node_id(); } } diff --git a/be/src/vec/exec/scan/scanner.cpp b/be/src/vec/exec/scan/scanner.cpp index c46c4f4a90f162..c9391960c8e018 100644 --- a/be/src/vec/exec/scan/scanner.cpp +++ b/be/src/vec/exec/scan/scanner.cpp @@ -113,8 +113,6 @@ Status Scanner::get_block(RuntimeState* state, Block* block, bool* eof) { RETURN_IF_ERROR(_get_block_impl(state, block, eof)); if (*eof) { DCHECK(block->rows() == 0); - // clear TEMP columns to avoid column align problem - block->erase_tmp_columns(); break; } _num_rows_read += block->rows(); @@ -145,11 +143,6 @@ Status Scanner::get_block(RuntimeState* state, Block* block, bool* eof) { } Status Scanner::_filter_output_block(Block* block) { - Defer clear_tmp_block([&]() { block->erase_tmp_columns(); }); - if (block->has(BeConsts::BLOCK_TEMP_COLUMN_SCANNER_FILTERED)) { - // scanner filter_block is already done (only by _topn_next currently), just skip it - return Status::OK(); - } auto old_rows = block->rows(); Status st = VExprContext::filter_block(_conjuncts, block, block->columns()); _counter.num_rows_unselected += old_rows - block->rows(); diff --git a/be/src/vec/functions/function_helpers.cpp b/be/src/vec/functions/function_helpers.cpp index 6862e9addb9844..877e47b5318fbe 100644 --- a/be/src/vec/functions/function_helpers.cpp +++ b/be/src/vec/functions/function_helpers.cpp @@ -97,14 +97,6 @@ std::tuple create_block_with_nested_columns(const Block& b } } - // TODO: only support match function, rethink the logic - for (const auto& ctn : block) { - if (ctn.name.size() > BeConsts::BLOCK_TEMP_COLUMN_PREFIX.size() && - starts_with(ctn.name, BeConsts::BLOCK_TEMP_COLUMN_PREFIX)) { - res.insert(ctn); - } - } - return {std::move(res), std::move(res_args)}; } diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp index 7f51bf7c1f00f5..a3186d2d982cc6 100644 --- a/be/src/vec/olap/block_reader.cpp +++ b/be/src/vec/olap/block_reader.cpp @@ -411,7 +411,6 @@ Status BlockReader::_unique_key_next_block(Block* block, bool* eof) { block->insert(column_with_type_and_name); RETURN_IF_ERROR(Block::filter_block(block, target_columns_size, target_columns_size)); _stats.rows_del_filtered += target_block_row - block->rows(); - DCHECK(block->try_get_by_name("__DORIS_COMPACTION_FILTER__") == nullptr); if (UNLIKELY(_reader_context.record_rowids)) { DCHECK_EQ(_block_row_locations.size(), block->rows() + delete_count); } diff --git a/be/src/vec/olap/vcollect_iterator.cpp b/be/src/vec/olap/vcollect_iterator.cpp index 3f375eb2c823f2..deb384de0ba9bc 100644 --- a/be/src/vec/olap/vcollect_iterator.cpp +++ b/be/src/vec/olap/vcollect_iterator.cpp @@ -93,6 +93,7 @@ void VCollectIterator::init(TabletReader* reader, bool ori_data_overlapping, boo _topn_limit = _reader->_reader_context.read_orderby_key_limit; } else { _topn_limit = 0; + DCHECK_EQ(_reader->_reader_context.filter_block_conjuncts.size(), 0); } } @@ -259,8 +260,6 @@ Status VCollectIterator::_topn_next(Block* block) { return Status::Error(""); } - // clear TEMP columns to avoid column align problem - block->erase_tmp_columns(); auto clone_block = block->clone_empty(); /* select id, "${tR2}", @@ -316,8 +315,6 @@ Status VCollectIterator::_topn_next(Block* block) { if (status.is()) { eof = true; if (block->rows() == 0) { - // clear TEMP columns to avoid column align problem in segment iterator - block->erase_tmp_columns(); break; } } else { @@ -328,8 +325,6 @@ Status VCollectIterator::_topn_next(Block* block) { // filter block RETURN_IF_ERROR(VExprContext::filter_block( _reader->_reader_context.filter_block_conjuncts, block, block->columns())); - // clear TMPE columns to avoid column align problem in mutable_block.add_rows bellow - block->erase_tmp_columns(); // update read rows read_rows += block->rows(); @@ -452,12 +447,6 @@ Status VCollectIterator::_topn_next(Block* block) { << " sorted_row_pos.size()=" << sorted_row_pos.size() << " mutable_block.rows()=" << mutable_block.rows(); *block = mutable_block.to_block(); - // append a column to indicate scanner filter_block is already done - auto filtered_datatype = std::make_shared(); - auto filtered_column = filtered_datatype->create_column_const( - block->rows(), Field::create_field(1)); - block->insert( - {filtered_column, filtered_datatype, BeConsts::BLOCK_TEMP_COLUMN_SCANNER_FILTERED}); _topn_eof = true; return block->rows() > 0 ? Status::OK() : Status::Error(""); @@ -894,8 +883,6 @@ Status VCollectIterator::Level1Iterator::_normal_next(Block* block) { while (res.is() && !_children.empty()) { _cur_child = std::move(*(_children.begin())); _children.pop_front(); - // clear TEMP columns to avoid column align problem - block->erase_tmp_columns(); res = _cur_child->next(block); } diff --git a/be/src/vec/olap/vertical_block_reader.cpp b/be/src/vec/olap/vertical_block_reader.cpp index 5f6e376367d79c..24bdf4e87a6e21 100644 --- a/be/src/vec/olap/vertical_block_reader.cpp +++ b/be/src/vec/olap/vertical_block_reader.cpp @@ -515,7 +515,6 @@ Status VerticalBlockReader::_unique_key_next_block(Block* block, bool* eof) { RETURN_IF_ERROR( Block::filter_block(block, target_columns.size(), target_columns.size())); _stats.rows_del_filtered += block_rows - block->rows(); - DCHECK(block->try_get_by_name("__DORIS_COMPACTION_FILTER__") == nullptr); if (UNLIKELY(_reader_context.record_rowids)) { DCHECK_EQ(_block_row_locations.size(), block->rows() + delete_count); } diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 9cd8d4bb8d993a..e6ff6f1a8a8f38 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -101,8 +101,8 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) { RETURN_IF_ERROR(block_item.get_block(next_block)); size_t block_byte_size = block_item.block_byte_size(); COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, block_item.deserialize_time()); - COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time()); - COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes()); + COUNTER_UPDATE(_recvr->_decompress_timer, block_item.decompress_time()); + COUNTER_UPDATE(_recvr->_decompress_bytes, block_item.decompress_bytes()); _recvr->_memory_used_counter->update(-(int64_t)block_byte_size); INJECT_MOCK_SLEEP(std::lock_guard l(_lock)); sub_blocks_memory_usage(block_byte_size); diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index 1d752b1bad89b3..1a1c84f3e67265 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -271,7 +271,8 @@ class VDataStreamRecvr::SenderQueue { DCHECK(_pblock); SCOPED_RAW_TIMER(&_deserialize_time); _block = Block::create_unique(); - RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_block->deserialize(*_pblock)); + RETURN_IF_ERROR_OR_CATCH_EXCEPTION( + _block->deserialize(*_pblock, &_decompress_bytes, &_decompress_time)); } block.swap(_block); _block.reset(); @@ -280,6 +281,8 @@ class VDataStreamRecvr::SenderQueue { size_t block_byte_size() const { return _block_byte_size; } int64_t deserialize_time() const { return _deserialize_time; } + int64_t decompress_time() const { return _decompress_time; } + size_t decompress_bytes() const { return _decompress_bytes; } BlockItem() = default; BlockItem(BlockUPtr&& block, size_t block_byte_size) : _block(std::move(block)), _block_byte_size(block_byte_size) {} @@ -292,6 +295,8 @@ class VDataStreamRecvr::SenderQueue { std::unique_ptr _pblock; size_t _block_byte_size = 0; int64_t _deserialize_time = 0; + int64_t _decompress_time = 0; + size_t _decompress_bytes = 0; }; std::list _block_queue; diff --git a/be/src/vec/sink/varrow_flight_result_writer.cpp b/be/src/vec/sink/varrow_flight_result_writer.cpp index 9105ba9e057dec..b828078554624c 100644 --- a/be/src/vec/sink/varrow_flight_result_writer.cpp +++ b/be/src/vec/sink/varrow_flight_result_writer.cpp @@ -47,10 +47,12 @@ Status GetArrowResultBatchCtx::on_data(const std::shared_ptr& if (_result != nullptr) { auto* arrow_buffer = assert_cast(buffer); size_t uncompressed_bytes = 0, compressed_bytes = 0; + int64_t compressed_time = 0; SCOPED_TIMER(arrow_buffer->_serialize_batch_ns_timer); - RETURN_IF_ERROR(block->serialize( - arrow_buffer->_be_exec_version, _result->mutable_block(), &uncompressed_bytes, - &compressed_bytes, arrow_buffer->_fragment_transmission_compression_type, false)); + RETURN_IF_ERROR(block->serialize(arrow_buffer->_be_exec_version, _result->mutable_block(), + &uncompressed_bytes, &compressed_bytes, &compressed_time, + arrow_buffer->_fragment_transmission_compression_type, + false)); COUNTER_UPDATE(arrow_buffer->_uncompressed_bytes_counter, uncompressed_bytes); COUNTER_UPDATE(arrow_buffer->_compressed_bytes_counter, compressed_bytes); _result->set_packet_seq(packet_seq); diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index b4af185adb341f..a05ce66ff98704 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -347,12 +347,13 @@ Status BlockSerializer::serialize_block(const Block* src, PBlock* dest, size_t n SCOPED_TIMER(_parent->_serialize_batch_timer); dest->Clear(); size_t uncompressed_bytes = 0, compressed_bytes = 0; + int64_t compress_time = 0; RETURN_IF_ERROR(src->serialize(_parent->_state->be_exec_version(), dest, &uncompressed_bytes, - &compressed_bytes, _parent->compression_type(), + &compressed_bytes, &compress_time, _parent->compression_type(), _parent->transfer_large_data_by_brpc())); COUNTER_UPDATE(_parent->_bytes_sent_counter, compressed_bytes * num_receivers); COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, uncompressed_bytes * num_receivers); - COUNTER_UPDATE(_parent->_compress_timer, src->get_compress_time()); + COUNTER_UPDATE(_parent->_compress_timer, compress_time); #ifndef BE_TEST _parent->state()->get_query_ctx()->resource_ctx()->io_context()->update_shuffle_send_bytes( compressed_bytes * num_receivers); diff --git a/be/src/vec/sink/vtablet_block_convertor.cpp b/be/src/vec/sink/vtablet_block_convertor.cpp index e720b5f694eaa7..5bb8f2cbe0f742 100644 --- a/be/src/vec/sink/vtablet_block_convertor.cpp +++ b/be/src/vec/sink/vtablet_block_convertor.cpp @@ -689,10 +689,6 @@ Status OlapTableBlockConvertor::_fill_auto_inc_cols(vectorized::Block* block, si Status OlapTableBlockConvertor::_partial_update_fill_auto_inc_cols(vectorized::Block* block, size_t rows) { - // avoid duplicate PARTIAL_UPDATE_AUTO_INC_COL - if (block->has(BeConsts::PARTIAL_UPDATE_AUTO_INC_COL)) { - return Status::OK(); - } auto dst_column = vectorized::ColumnInt64::create(); vectorized::ColumnInt64::Container& dst_values = dst_column->get_data(); size_t null_value_count = rows; diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index d7a269287af4ec..fe51df4dcb22b6 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -866,8 +866,9 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) { if (block.rows() > 0) { SCOPED_ATOMIC_TIMER(&_serialize_batch_ns); size_t uncompressed_bytes = 0, compressed_bytes = 0; + int64_t compressed_time = 0; Status st = block.serialize(state->be_exec_version(), request->mutable_block(), - &uncompressed_bytes, &compressed_bytes, + &uncompressed_bytes, &compressed_bytes, &compressed_time, state->fragement_transmission_compression_type(), _parent->_transfer_large_data_by_brpc); TEST_INJECTION_POINT_CALLBACK("VNodeChannel::try_send_block", &st); diff --git a/be/src/vec/sink/writer/vwal_writer.cpp b/be/src/vec/sink/writer/vwal_writer.cpp index 5a64b6f7c43a6b..35436da82f1113 100644 --- a/be/src/vec/sink/writer/vwal_writer.cpp +++ b/be/src/vec/sink/writer/vwal_writer.cpp @@ -73,8 +73,9 @@ Status VWalWriter::write_wal(vectorized::Block* block) { { return Status::InternalError("Failed to write wal!"); }); PBlock pblock; size_t uncompressed_bytes = 0, compressed_bytes = 0; + int64_t compressed_time = 0; RETURN_IF_ERROR(block->serialize(_be_exe_version, &pblock, &uncompressed_bytes, - &compressed_bytes, + &compressed_bytes, &compressed_time, segment_v2::CompressionTypePB::NO_COMPRESSION)); RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector {&pblock})); return Status::OK(); diff --git a/be/src/vec/spill/spill_reader.cpp b/be/src/vec/spill/spill_reader.cpp index de3eea6b625848..bb74d20ec33436 100644 --- a/be/src/vec/spill/spill_reader.cpp +++ b/be/src/vec/spill/spill_reader.cpp @@ -142,7 +142,9 @@ Status SpillReader::read(Block* block, bool* eos) { if (!pb_block_.ParseFromArray(result.data, cast_set(result.size))) { return Status::InternalError("Failed to read spilled block"); } - RETURN_IF_ERROR(block->deserialize(pb_block_)); + size_t uncompressed_size = 0; + int64_t uncompressed_time = 0; + RETURN_IF_ERROR(block->deserialize(pb_block_, &uncompressed_size, &uncompressed_time)); } COUNTER_UPDATE(_read_block_data_size, block->bytes()); COUNTER_UPDATE(_read_rows_count, block->rows()); diff --git a/be/src/vec/spill/spill_writer.cpp b/be/src/vec/spill/spill_writer.cpp index a43ba83ccb9667..a43162f43ceeff 100644 --- a/be/src/vec/spill/spill_writer.cpp +++ b/be/src/vec/spill/spill_writer.cpp @@ -115,9 +115,10 @@ Status SpillWriter::_write_internal(const Block& block, size_t& written_bytes) { { PBlock pblock; SCOPED_TIMER(_serialize_timer); + int64_t compressed_time = 0; status = block.serialize( BeExecVersionManager::get_newest_version(), &pblock, &uncompressed_bytes, - &compressed_bytes, + &compressed_bytes, &compressed_time, segment_v2::CompressionTypePB::ZSTD); // ZSTD for better compression ratio RETURN_IF_ERROR(status); int64_t pblock_mem = pblock.ByteSizeLong(); diff --git a/be/test/olap/wal/wal_reader_writer_test.cpp b/be/test/olap/wal/wal_reader_writer_test.cpp index 94fc8ff91a2596..d443cdfa84921b 100644 --- a/be/test/olap/wal/wal_reader_writer_test.cpp +++ b/be/test/olap/wal/wal_reader_writer_test.cpp @@ -64,8 +64,10 @@ void covert_block_to_pb( segment_v2::CompressionTypePB compression_type = segment_v2::CompressionTypePB::SNAPPY) { size_t uncompressed_bytes = 0; size_t compressed_bytes = 0; - Status st = block.serialize(BeExecVersionManager::get_newest_version(), pblock, - &uncompressed_bytes, &compressed_bytes, compression_type); + int64_t compressed_time = 0; + Status st = + block.serialize(BeExecVersionManager::get_newest_version(), pblock, &uncompressed_bytes, + &compressed_bytes, &compressed_time, compression_type); EXPECT_TRUE(st.ok()); EXPECT_TRUE(uncompressed_bytes >= compressed_bytes); EXPECT_EQ(compressed_bytes, pblock->column_values().size()); @@ -132,7 +134,9 @@ TEST_F(WalReaderWriterTest, TestWriteAndRead1) { break; } vectorized::Block block; - EXPECT_TRUE(block.deserialize(pblock).ok()); + size_t uncompress_size = 0; + int64_t uncompressed_time = 0; + EXPECT_TRUE(block.deserialize(pblock, &uncompress_size, &uncompressed_time).ok()); EXPECT_EQ(block_rows, block.rows()); } static_cast(wal_reader.finalize()); diff --git a/be/test/pipeline/exec/vdata_stream_recvr_test.cpp b/be/test/pipeline/exec/vdata_stream_recvr_test.cpp index bed8be3e4f3495..0bf6691d7d0bc5 100644 --- a/be/test/pipeline/exec/vdata_stream_recvr_test.cpp +++ b/be/test/pipeline/exec/vdata_stream_recvr_test.cpp @@ -318,8 +318,9 @@ class MockClosure : public google::protobuf::Closure { void to_pblock(Block& block, PBlock* pblock) { size_t uncompressed_bytes = 0; size_t compressed_bytes = 0; + int64_t compressed_time = 0; EXPECT_TRUE(block.serialize(BeExecVersionManager::get_newest_version(), pblock, - &uncompressed_bytes, &compressed_bytes, + &uncompressed_bytes, &compressed_bytes, &compressed_time, segment_v2::CompressionTypePB::NO_COMPRESSION)); } diff --git a/be/test/pipeline/operator/materialization_shared_state_test.cpp b/be/test/pipeline/operator/materialization_shared_state_test.cpp index 51c097207f1cef..d96653b2dfe326 100644 --- a/be/test/pipeline/operator/materialization_shared_state_test.cpp +++ b/be/test/pipeline/operator/materialization_shared_state_test.cpp @@ -119,8 +119,9 @@ TEST_F(MaterializationSharedStateTest, TestMergeMultiResponse) { auto serialized_block = response_.add_blocks()->mutable_block(); size_t uncompressed_size = 0; size_t compressed_size = 0; + int64_t compress_time = 0; auto s = resp_block1.serialize(0, serialized_block, &uncompressed_size, &compressed_size, - CompressionTypePB::LZ4); + &compress_time, CompressionTypePB::LZ4); EXPECT_TRUE(s.ok()); _shared_state->rpc_struct_map[_backend_id1].response = std::move(response_); @@ -141,8 +142,9 @@ TEST_F(MaterializationSharedStateTest, TestMergeMultiResponse) { auto serialized_block = response_.add_blocks()->mutable_block(); size_t uncompressed_size = 0; size_t compressed_size = 0; + int64_t compress_time = 0; auto s = resp_block2.serialize(0, serialized_block, &uncompressed_size, &compressed_size, - CompressionTypePB::LZ4); + &compress_time, CompressionTypePB::LZ4); EXPECT_TRUE(s.ok()); _shared_state->rpc_struct_map[_backend_id2].response = std::move(response_); @@ -219,8 +221,9 @@ TEST_F(MaterializationSharedStateTest, TestMergeMultiResponseMultiBlocks) { auto serialized_block = response_.add_blocks()->mutable_block(); size_t uncompressed_size = 0; size_t compressed_size = 0; + int64_t compress_time = 0; auto s = resp_block1.serialize(0, serialized_block, &uncompressed_size, &compressed_size, - CompressionTypePB::LZ4); + &compress_time, CompressionTypePB::LZ4); EXPECT_TRUE(s.ok()); _shared_state->rpc_struct_map[_backend_id1].response = std::move(response_); @@ -240,8 +243,9 @@ TEST_F(MaterializationSharedStateTest, TestMergeMultiResponseMultiBlocks) { auto serialized_block = response_.add_blocks()->mutable_block(); size_t uncompressed_size = 0; size_t compressed_size = 0; + int64_t compress_time = 0; auto s = resp_block2.serialize(0, serialized_block, &uncompressed_size, &compressed_size, - CompressionTypePB::LZ4); + &compress_time, CompressionTypePB::LZ4); EXPECT_TRUE(s.ok()); _shared_state->rpc_struct_map[_backend_id2].response = std::move(response_); @@ -260,8 +264,9 @@ TEST_F(MaterializationSharedStateTest, TestMergeMultiResponseMultiBlocks) { _shared_state->rpc_struct_map[_backend_id1].response.add_blocks()->mutable_block(); size_t uncompressed_size = 0; size_t compressed_size = 0; + int64_t compress_time = 0; auto s = resp_block1.serialize(0, serialized_block, &uncompressed_size, &compressed_size, - CompressionTypePB::LZ4); + &compress_time, CompressionTypePB::LZ4); EXPECT_TRUE(s.ok()); _shared_state->response_blocks[1] = resp_block1.clone_empty(); } @@ -278,8 +283,9 @@ TEST_F(MaterializationSharedStateTest, TestMergeMultiResponseMultiBlocks) { _shared_state->rpc_struct_map[_backend_id2].response.add_blocks()->mutable_block(); size_t uncompressed_size = 0; size_t compressed_size = 0; + int64_t compress_time = 0; auto s = resp_block2.serialize(0, serialized_block, &uncompressed_size, &compressed_size, - CompressionTypePB::LZ4); + &compress_time, CompressionTypePB::LZ4); EXPECT_TRUE(s.ok()); } diff --git a/be/test/testutil/mock/mock_data_stream_sender.h b/be/test/testutil/mock/mock_data_stream_sender.h index dc9d2542beb8fb..cd076cca521af3 100644 --- a/be/test/testutil/mock/mock_data_stream_sender.h +++ b/be/test/testutil/mock/mock_data_stream_sender.h @@ -54,7 +54,10 @@ struct MockChannel : public Channel { return Status::OK(); } Block nblock; - RETURN_IF_ERROR_OR_CATCH_EXCEPTION(nblock.deserialize(*_pblock)); + size_t uncompress_size = 0; + int64_t uncompressed_time = 0; + RETURN_IF_ERROR_OR_CATCH_EXCEPTION( + nblock.deserialize(*_pblock, &uncompress_size, &uncompressed_time)); if (!nblock.empty()) { RETURN_IF_ERROR(_send_block.merge(std::move(nblock))); } @@ -64,7 +67,10 @@ struct MockChannel : public Channel { Status send_broadcast_block(std::shared_ptr& block, bool eos = false) override { Block nblock; - RETURN_IF_ERROR_OR_CATCH_EXCEPTION(nblock.deserialize(*block->get_block())); + size_t uncompress_size = 0; + int64_t uncompressed_time = 0; + RETURN_IF_ERROR_OR_CATCH_EXCEPTION( + nblock.deserialize(*block->get_block(), &uncompress_size, &uncompressed_time)); if (!nblock.empty()) { RETURN_IF_ERROR(_send_block.merge(std::move(nblock))); } diff --git a/be/test/vec/core/block_test.cpp b/be/test/vec/core/block_test.cpp index 07e4254c7fc847..9b316baca605c3 100644 --- a/be/test/vec/core/block_test.cpp +++ b/be/test/vec/core/block_test.cpp @@ -74,8 +74,10 @@ void block_to_pb( segment_v2::CompressionTypePB compression_type = segment_v2::CompressionTypePB::SNAPPY) { size_t uncompressed_bytes = 0; size_t compressed_bytes = 0; - Status st = block.serialize(BeExecVersionManager::get_newest_version(), pblock, - &uncompressed_bytes, &compressed_bytes, compression_type); + int64_t compress_time = 0; + Status st = + block.serialize(BeExecVersionManager::get_newest_version(), pblock, &uncompressed_bytes, + &compressed_bytes, &compress_time, compression_type); EXPECT_TRUE(st.ok()); // const column maybe uncompressed_bytes(block2.deserialize(pblock)); + size_t uncompress_size = 0; + int64_t uncompressed_time = 0; + static_cast(block2.deserialize(pblock, &uncompress_size, &uncompressed_time)); PBlock pblock2; block_to_pb(block2, &pblock2, compression_type); std::string s2 = pblock2.DebugString(); @@ -169,7 +173,9 @@ void serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty std::string s1 = pblock.DebugString(); vectorized::Block block2; - static_cast(block2.deserialize(pblock)); + size_t uncompress_size = 0; + int64_t uncompressed_time = 0; + static_cast(block2.deserialize(pblock, &uncompress_size, &uncompressed_time)); PBlock pblock2; block_to_pb(block2, &pblock2, compression_type); std::string s2 = pblock2.DebugString(); @@ -192,7 +198,9 @@ void serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty std::string s1 = pblock.DebugString(); vectorized::Block block2; - static_cast(block2.deserialize(pblock)); + size_t uncompress_size = 0; + int64_t uncompressed_time = 0; + static_cast(block2.deserialize(pblock, &uncompress_size, &uncompressed_time)); PBlock pblock2; block_to_pb(block2, &pblock2, compression_type); std::string s2 = pblock2.DebugString(); @@ -219,7 +227,9 @@ void serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty std::string s1 = pblock.DebugString(); vectorized::Block block2; - static_cast(block2.deserialize(pblock)); + size_t uncompress_size = 0; + int64_t uncompressed_time = 0; + static_cast(block2.deserialize(pblock, &uncompress_size, &uncompressed_time)); PBlock pblock2; block_to_pb(block2, &pblock2, compression_type); std::string s2 = pblock2.DebugString(); @@ -240,7 +250,9 @@ void serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty std::string s1 = pblock.DebugString(); vectorized::Block block2; - static_cast(block2.deserialize(pblock)); + size_t uncompress_size = 0; + int64_t uncompressed_time = 0; + static_cast(block2.deserialize(pblock, &uncompress_size, &uncompressed_time)); PBlock pblock2; block_to_pb(block2, &pblock2, compression_type); std::string s2 = pblock2.DebugString(); @@ -263,7 +275,9 @@ void serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty std::string s1 = pblock.DebugString(); vectorized::Block block2; - static_cast(block2.deserialize(pblock)); + size_t uncompress_size = 0; + int64_t uncompressed_time = 0; + static_cast(block2.deserialize(pblock, &uncompress_size, &uncompressed_time)); PBlock pblock2; block_to_pb(block2, &pblock2, compression_type); std::string s2 = pblock2.DebugString(); @@ -287,7 +301,9 @@ void serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty std::string s1 = pblock.DebugString(); vectorized::Block block2; - static_cast(block2.deserialize(pblock)); + size_t uncompress_size = 0; + int64_t uncompressed_time = 0; + static_cast(block2.deserialize(pblock, &uncompress_size, &uncompressed_time)); PBlock pblock2; block_to_pb(block2, &pblock2, compression_type); std::string s2 = pblock2.DebugString(); @@ -303,7 +319,9 @@ void serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty std::string s1 = pblock.DebugString(); vectorized::Block block2; - static_cast(block2.deserialize(pblock)); + size_t uncompress_size = 0; + int64_t uncompressed_time = 0; + static_cast(block2.deserialize(pblock, &uncompress_size, &uncompressed_time)); PBlock pblock2; block_to_pb(block2, &pblock2, compression_type); std::string s2 = pblock2.DebugString(); @@ -328,7 +346,9 @@ void serialize_and_deserialize_test_one() { std::string s1 = pblock.DebugString(); vectorized::Block block2; - static_cast(block2.deserialize(pblock)); + size_t uncompress_size = 0; + int64_t uncompressed_time = 0; + static_cast(block2.deserialize(pblock, &uncompress_size, &uncompressed_time)); PBlock pblock2; block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::LZ4); std::string s2 = pblock2.DebugString(); @@ -354,7 +374,9 @@ void serialize_and_deserialize_test_int() { std::string s1 = pblock.DebugString(); vectorized::Block block2; - static_cast(block2.deserialize(pblock)); + size_t uncompress_size = 0; + int64_t uncompressed_time = 0; + static_cast(block2.deserialize(pblock, &uncompress_size, &uncompressed_time)); PBlock pblock2; block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::LZ4); std::string s2 = pblock2.DebugString(); @@ -378,7 +400,9 @@ void serialize_and_deserialize_test_int() { std::string s1 = pblock.DebugString(); vectorized::Block block2; - static_cast(block2.deserialize(pblock)); + size_t uncompress_size = 0; + int64_t uncompressed_time = 0; + static_cast(block2.deserialize(pblock, &uncompress_size, &uncompressed_time)); PBlock pblock2; block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::LZ4); std::string s2 = pblock2.DebugString(); @@ -403,7 +427,9 @@ void serialize_and_deserialize_test_long() { std::string s1 = pblock.DebugString(); vectorized::Block block2; - static_cast(block2.deserialize(pblock)); + size_t uncompress_size = 0; + int64_t uncompressed_time = 0; + static_cast(block2.deserialize(pblock, &uncompress_size, &uncompressed_time)); PBlock pblock2; block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::LZ4); std::string s2 = pblock2.DebugString(); @@ -427,7 +453,9 @@ void serialize_and_deserialize_test_long() { std::string s1 = pblock.DebugString(); vectorized::Block block2; - static_cast(block2.deserialize(pblock)); + size_t uncompress_size = 0; + int64_t uncompressed_time = 0; + static_cast(block2.deserialize(pblock, &uncompress_size, &uncompressed_time)); PBlock pblock2; block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::LZ4); std::string s2 = pblock2.DebugString(); @@ -452,7 +480,9 @@ void serialize_and_deserialize_test_string() { std::string s1 = pblock.DebugString(); vectorized::Block block2; - static_cast(block2.deserialize(pblock)); + size_t uncompress_size = 0; + int64_t uncompressed_time = 0; + static_cast(block2.deserialize(pblock, &uncompress_size, &uncompressed_time)); PBlock pblock2; block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::SNAPPY); std::string s2 = pblock2.DebugString(); @@ -477,7 +507,9 @@ void serialize_and_deserialize_test_string() { std::string s1 = pblock.DebugString(); vectorized::Block block2; - static_cast(block2.deserialize(pblock)); + size_t uncompress_size = 0; + int64_t uncompressed_time = 0; + static_cast(block2.deserialize(pblock, &uncompress_size, &uncompressed_time)); PBlock pblock2; block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::SNAPPY); std::string s2 = pblock2.DebugString(); @@ -505,7 +537,9 @@ void serialize_and_deserialize_test_nullable() { std::string s1 = pblock.DebugString(); vectorized::Block block2; - static_cast(block2.deserialize(pblock)); + size_t uncompress_size = 0; + int64_t uncompressed_time = 0; + static_cast(block2.deserialize(pblock, &uncompress_size, &uncompressed_time)); PBlock pblock2; block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::LZ4); std::string s2 = pblock2.DebugString(); @@ -530,7 +564,9 @@ void serialize_and_deserialize_test_nullable() { std::string s1 = pblock.DebugString(); vectorized::Block block2; - static_cast(block2.deserialize(pblock)); + size_t uncompress_size = 0; + int64_t uncompressed_time = 0; + static_cast(block2.deserialize(pblock, &uncompress_size, &uncompressed_time)); PBlock pblock2; block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::LZ4); std::string s2 = pblock2.DebugString(); @@ -554,7 +590,9 @@ void serialize_and_deserialize_test_nullable() { std::string s1 = pblock.DebugString(); vectorized::Block block2; - static_cast(block2.deserialize(pblock)); + size_t uncompress_size = 0; + int64_t uncompressed_time = 0; + static_cast(block2.deserialize(pblock, &uncompress_size, &uncompressed_time)); PBlock pblock2; block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::SNAPPY); std::string s2 = pblock2.DebugString(); @@ -576,7 +614,9 @@ void serialize_and_deserialize_test_nullable() { std::string s1 = pblock.DebugString(); vectorized::Block block2; - static_cast(block2.deserialize(pblock)); + size_t uncompress_size = 0; + int64_t uncompressed_time = 0; + static_cast(block2.deserialize(pblock, &uncompress_size, &uncompressed_time)); PBlock pblock2; block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::SNAPPY); std::string s2 = pblock2.DebugString(); @@ -602,7 +642,9 @@ void serialize_and_deserialize_test_decimal() { std::string s1 = pblock.DebugString(); vectorized::Block block2; - static_cast(block2.deserialize(pblock)); + size_t uncompress_size = 0; + int64_t uncompressed_time = 0; + static_cast(block2.deserialize(pblock, &uncompress_size, &uncompressed_time)); PBlock pblock2; block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::LZ4); std::string s2 = pblock2.DebugString(); @@ -626,7 +668,9 @@ void serialize_and_deserialize_test_decimal() { std::string s1 = pblock.DebugString(); vectorized::Block block2; - static_cast(block2.deserialize(pblock)); + size_t uncompress_size = 0; + int64_t uncompressed_time = 0; + static_cast(block2.deserialize(pblock, &uncompress_size, &uncompressed_time)); PBlock pblock2; block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::LZ4); std::string s2 = pblock2.DebugString(); @@ -658,7 +702,9 @@ void serialize_and_deserialize_test_bitmap() { std::string s1 = pblock.DebugString(); std::string bb1 = block.dump_data(0, 1024); vectorized::Block block2; - static_cast(block2.deserialize(pblock)); + size_t uncompress_size = 0; + int64_t uncompressed_time = 0; + static_cast(block2.deserialize(pblock, &uncompress_size, &uncompressed_time)); std::string bb2 = block2.dump_data(0, 1024); EXPECT_EQ(bb1, bb2); PBlock pblock2; @@ -688,7 +734,9 @@ void serialize_and_deserialize_test_bitmap() { std::string s1 = pblock.DebugString(); std::string bb1 = block.dump_data(0, 1024); vectorized::Block block2; - static_cast(block2.deserialize(pblock)); + size_t uncompress_size = 0; + int64_t uncompressed_time = 0; + static_cast(block2.deserialize(pblock, &uncompress_size, &uncompressed_time)); std::string bb2 = block2.dump_data(0, 1024); EXPECT_EQ(bb1, bb2); EXPECT_EQ(block.dump_data_json(0, 1024), block2.dump_data_json(0, 1024)); @@ -709,7 +757,9 @@ void serialize_and_deserialize_test_array() { block_to_pb(block, &pblock, segment_v2::CompressionTypePB::SNAPPY); std::string s1 = pblock.DebugString(); vectorized::Block block2; - static_cast(block2.deserialize(pblock)); + size_t uncompress_size = 0; + int64_t uncompressed_time = 0; + static_cast(block2.deserialize(pblock, &uncompress_size, &uncompressed_time)); PBlock pblock2; block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::SNAPPY); std::string s2 = pblock2.DebugString(); @@ -1075,26 +1125,6 @@ TEST(BlockTest, ctor) { ASSERT_EQ(block.columns(), 2); ASSERT_EQ(block.get_by_position(0).type->get_primitive_type(), TYPE_INT); ASSERT_TRUE(block.get_by_position(1).type->is_nullable()); - - { - auto mutable_block = - vectorized::MutableBlock::create_unique(tbl->get_tuple_descs(), 10, false); - ASSERT_EQ(mutable_block->columns(), 2); - auto mutable_block2 = vectorized::MutableBlock::create_unique(); - mutable_block->swap(*mutable_block2); - ASSERT_EQ(mutable_block->columns(), 0); - ASSERT_EQ(mutable_block2->columns(), 2); - } - - { - auto mutable_block = - vectorized::MutableBlock::create_unique(tbl->get_tuple_descs(), 10, true); - ASSERT_EQ(mutable_block->columns(), 1); - auto mutable_block2 = vectorized::MutableBlock::create_unique(); - mutable_block->swap(*mutable_block2); - ASSERT_EQ(mutable_block->columns(), 0); - ASSERT_EQ(mutable_block2->columns(), 1); - } } TEST(BlockTest, insert_erase) { @@ -1125,39 +1155,20 @@ TEST(BlockTest, insert_erase) { block.erase_tail(0); ASSERT_EQ(block.columns(), 0); - EXPECT_ANY_THROW(block.erase("column")); column_with_name = vectorized::ColumnHelper::create_column_with_name({}); block.insert(0, column_with_name); - EXPECT_NO_THROW(block.erase("column")); - ASSERT_EQ(block.columns(), 0); - - EXPECT_ANY_THROW(block.safe_get_by_position(0)); + ASSERT_EQ(block.columns(), 1); - ASSERT_EQ(block.try_get_by_name("column"), nullptr); - EXPECT_ANY_THROW(block.get_by_name("column")); - EXPECT_ANY_THROW(block.get_position_by_name("column")); block.insert(0, column_with_name); - EXPECT_NO_THROW(auto item = block.get_by_name("column")); - ASSERT_NE(block.try_get_by_name("column"), nullptr); EXPECT_EQ(block.get_position_by_name("column"), 0); - block.insert({nullptr, nullptr, BeConsts::BLOCK_TEMP_COLUMN_PREFIX}); - EXPECT_NO_THROW(auto item = block.get_by_name(BeConsts::BLOCK_TEMP_COLUMN_PREFIX)); - - block.erase_tmp_columns(); - ASSERT_EQ(block.try_get_by_name(BeConsts::BLOCK_TEMP_COLUMN_PREFIX), nullptr); - { // test const block const auto const_block = block; - EXPECT_EQ(const_block.try_get_by_name("column2"), nullptr); - EXPECT_ANY_THROW(const_block.get_by_name("column2")); - EXPECT_ANY_THROW(const_block.get_position_by_name("column2")); + EXPECT_EQ(const_block.get_position_by_name("column2"), -1); - EXPECT_NO_THROW(auto item = const_block.get_by_name("column")); - ASSERT_NE(const_block.try_get_by_name("column"), nullptr); EXPECT_EQ(const_block.get_position_by_name("column"), 0); } @@ -1166,14 +1177,7 @@ TEST(BlockTest, insert_erase) { block.insert({nullptr, std::make_shared(), "col2"}); - vectorized::MutableBlock mutable_block(&block); - mutable_block.erase("col1"); - ASSERT_EQ(mutable_block.columns(), 2); - - EXPECT_ANY_THROW(mutable_block.erase("col1")); - ASSERT_EQ(mutable_block.columns(), 2); - mutable_block.erase("col2"); - ASSERT_EQ(mutable_block.columns(), 1); + ASSERT_EQ(block.columns(), 3); } TEST(BlockTest, check_number_of_rows) { @@ -1352,8 +1356,6 @@ TEST(BlockTest, others) { mutable_block.clear_column_data(); ASSERT_EQ(mutable_block.get_column_by_position(0)->size(), 0); - ASSERT_TRUE(mutable_block.has("column")); - ASSERT_EQ(mutable_block.get_position_by_name("column"), 0); auto dumped_names = mutable_block.dump_names(); ASSERT_TRUE(dumped_names.find("column") != std::string::npos); diff --git a/be/test/vec/data_types/common_data_type_test.h b/be/test/vec/data_types/common_data_type_test.h index 4425dd31c94d11..745b3d19155f11 100644 --- a/be/test/vec/data_types/common_data_type_test.h +++ b/be/test/vec/data_types/common_data_type_test.h @@ -252,13 +252,16 @@ class CommonDataTypeTest : public ::testing::Test { auto pblock = std::make_unique(); size_t uncompressed_bytes = 0; size_t compressed_bytes = 0; + int64_t compress_time = 0; segment_v2::CompressionTypePB compression_type = segment_v2::CompressionTypePB::ZSTD; Status st = block->serialize(be_exec_version, pblock.get(), &uncompressed_bytes, - &compressed_bytes, compression_type); + &compressed_bytes, &compress_time, compression_type); ASSERT_EQ(st.ok(), true); // deserialize auto block_1 = std::make_shared(); - st = block_1->deserialize(*pblock); + size_t uncompress_size = 0; + int64_t uncompressed_time = 0; + st = block_1->deserialize(*pblock, &uncompress_size, &uncompressed_time); ASSERT_EQ(st.ok(), true); // check block_1 and block is same for (auto col_idx = 0; col_idx < block->columns(); ++col_idx) { diff --git a/be/test/vec/exec/format/parquet/parquet_read_lines.cpp b/be/test/vec/exec/format/parquet/parquet_read_lines.cpp index 9eec232f75f453..08864a86fef400 100644 --- a/be/test/vec/exec/format/parquet/parquet_read_lines.cpp +++ b/be/test/vec/exec/format/parquet/parquet_read_lines.cpp @@ -172,8 +172,8 @@ static void read_parquet_lines(std::vector numeric_types, bool eof = false; size_t read_row = 0; static_cast(p_reader->get_next_block(block.get(), &read_row, &eof)); - auto row_id_string_column = - static_cast(*block->get_by_name("row_id").column.get()); + auto row_id_string_column = static_cast( + *block->get_by_position(block->get_position_by_name("row_id")).column.get()); auto read_lines_tmp = read_lines; for (auto i = 0; i < row_id_string_column.size(); i++) { GlobalRowLoacationV2 info = @@ -184,7 +184,7 @@ static void read_parquet_lines(std::vector numeric_types, EXPECT_EQ(info.backend_id, BackendOptions::get_backend_id()); EXPECT_EQ(info.version, IdManager::ID_VERSION); } - block->erase("row_id"); + block->erase(block->get_position_by_name("row_id")); EXPECT_EQ(block->dump_data(), block_dump); std::cout << block->dump_data(); diff --git a/be/test/vec/exec/orc/orc_read_lines.cpp b/be/test/vec/exec/orc/orc_read_lines.cpp index 7bdd529c7bdbb9..c73d6604b06073 100644 --- a/be/test/vec/exec/orc/orc_read_lines.cpp +++ b/be/test/vec/exec/orc/orc_read_lines.cpp @@ -157,8 +157,8 @@ static void read_orc_line(int64_t line, std::string block_dump) { bool eof = false; size_t read_row = 0; static_cast(reader->get_next_block(block.get(), &read_row, &eof)); - auto row_id_string_column = - static_cast(*block->get_by_name("row_id").column.get()); + auto row_id_string_column = static_cast( + *block->get_by_position(block->get_position_by_name("row_id")).column.get()); for (auto i = 0; i < row_id_string_column.size(); i++) { GlobalRowLoacationV2 info = *((GlobalRowLoacationV2*)row_id_string_column.get_data_at(i).data); @@ -167,7 +167,7 @@ static void read_orc_line(int64_t line, std::string block_dump) { EXPECT_EQ(info.backend_id, BackendOptions::get_backend_id()); EXPECT_EQ(info.version, IdManager::ID_VERSION); } - block->erase("row_id"); + block->erase(block->get_position_by_name("row_id")); std::cout << block->dump_data(); EXPECT_EQ(block->dump_data(), block_dump);