Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions be/src/common/consts.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ namespace BeConsts {
const std::string CSV = "csv";
const std::string CSV_WITH_NAMES = "csv_with_names";
const std::string CSV_WITH_NAMES_AND_TYPES = "csv_with_names_and_types";
const std::string BLOCK_TEMP_COLUMN_PREFIX = "__TEMP__";
const std::string BLOCK_TEMP_COLUMN_SCANNER_FILTERED = "__TEMP__scanner_filtered";
const std::string ROWID_COL = "__DORIS_ROWID_COL__";
const std::string GLOBAL_ROWID_COL = "__DORIS_GLOBAL_ROWID_COL__";
const std::string ROW_STORE_COL = "__DORIS_ROW_STORE_COL__";
Expand Down
16 changes: 11 additions & 5 deletions be/src/exec/rowid_fetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,11 @@ Status RowIDFetcher::_merge_rpc_results(const PMultiGetRequest& request,
}
// Merge partial blocks
vectorized::Block partial_block;
RETURN_IF_ERROR(partial_block.deserialize(resp.block()));
[[maybe_unused]] size_t uncompressed_size = 0;
[[maybe_unused]] int64_t uncompressed_time = 0;

RETURN_IF_ERROR(
partial_block.deserialize(resp.block(), &uncompressed_size, &uncompressed_time));
if (partial_block.is_empty_column()) {
return Status::OK();
}
Expand Down Expand Up @@ -493,9 +497,10 @@ Status RowIdStorageReader::read_by_rowids(const PMultiGetRequest& request,
<< ", be_exec_version:" << request.be_exec_version();
[[maybe_unused]] size_t compressed_size = 0;
[[maybe_unused]] size_t uncompressed_size = 0;
[[maybe_unused]] int64_t compress_time = 0;
int be_exec_version = request.has_be_exec_version() ? request.be_exec_version() : 0;
RETURN_IF_ERROR(result_block.serialize(be_exec_version, response->mutable_block(),
&uncompressed_size, &compressed_size,
&uncompressed_size, &compressed_size, &compress_time,
segment_v2::CompressionTypePB::LZ4));
}

Expand Down Expand Up @@ -600,10 +605,11 @@ Status RowIdStorageReader::read_by_rowids(const PMultiGetRequestV2& request,

[[maybe_unused]] size_t compressed_size = 0;
[[maybe_unused]] size_t uncompressed_size = 0;
[[maybe_unused]] int64_t compress_time = 0;
int be_exec_version = request.has_be_exec_version() ? request.be_exec_version() : 0;
RETURN_IF_ERROR(result_blocks[i].serialize(be_exec_version, pblock->mutable_block(),
&uncompressed_size, &compressed_size,
segment_v2::CompressionTypePB::LZ4));
RETURN_IF_ERROR(result_blocks[i].serialize(
be_exec_version, pblock->mutable_block(), &uncompressed_size, &compressed_size,
&compress_time, segment_v2::CompressionTypePB::LZ4));
}

// Build file type statistics string
Expand Down
7 changes: 3 additions & 4 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -941,11 +941,10 @@ Status BaseTablet::fetch_value_by_rowids(RowsetSharedPtr input_rowset, uint32_t

const signed char* BaseTablet::get_delete_sign_column_data(const vectorized::Block& block,
size_t rows_at_least) {
if (const vectorized::ColumnWithTypeAndName* delete_sign_column =
block.try_get_by_name(DELETE_SIGN);
delete_sign_column != nullptr) {
if (int pos = block.get_position_by_name(DELETE_SIGN); pos != -1) {
const vectorized::ColumnWithTypeAndName& delete_sign_column = block.get_by_position(pos);
const auto& delete_sign_col =
reinterpret_cast<const vectorized::ColumnInt8&>(*(delete_sign_column->column));
assert_cast<const vectorized::ColumnInt8&>(*(delete_sign_column.column));
if (delete_sign_col.size() >= rows_at_least) {
return delete_sign_col.get_data().data();
}
Expand Down
28 changes: 16 additions & 12 deletions be/src/olap/partial_update_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,7 @@ Status FixedReadPlan::read_columns_by_plan(
const signed char* __restrict cur_delete_signs) const {
if (force_read_old_delete_signs) {
// always read delete sign column from historical data
if (const vectorized::ColumnWithTypeAndName* old_delete_sign_column =
block.try_get_by_name(DELETE_SIGN);
old_delete_sign_column == nullptr) {
if (block.get_position_by_name(DELETE_SIGN) == -1) {
auto del_col_cid = tablet_schema.field_index(DELETE_SIGN);
cids_to_read.emplace_back(del_col_cid);
block.swap(tablet_schema.create_block_by_cids(cids_to_read));
Expand Down Expand Up @@ -384,7 +382,10 @@ Status FixedReadPlan::fill_missing_columns(
old_value_block, &read_index, true, nullptr));

const auto* old_delete_signs = BaseTablet::get_delete_sign_column_data(old_value_block);
DCHECK(old_delete_signs != nullptr);
if (old_delete_signs == nullptr) {
return Status::InternalError("old delete signs column not found, block: {}",
old_value_block.dump_structure());
}
// build default value columns
auto default_value_block = old_value_block.clone_empty();
RETURN_IF_ERROR(BaseTablet::generate_default_value_block(
Expand Down Expand Up @@ -420,27 +421,30 @@ Status FixedReadPlan::fill_missing_columns(
}

if (should_use_default) {
// clang-format off
if (tablet_column.has_default_value()) {
missing_col->insert_from(*mutable_default_value_columns[i], 0);
} else if (tablet_column.is_nullable()) {
auto* nullable_column = assert_cast<vectorized::ColumnNullable*, TypeCheckOnRelease::DISABLE>(missing_col.get());
auto* nullable_column =
assert_cast<vectorized::ColumnNullable*>(missing_col.get());
nullable_column->insert_many_defaults(1);
} else if (tablet_schema.auto_increment_column() == tablet_column.name()) {
const auto& column = *DORIS_TRY(rowset_ctx->tablet_schema->column(tablet_column.name()));
const auto& column =
*DORIS_TRY(rowset_ctx->tablet_schema->column(tablet_column.name()));
DCHECK(column.type() == FieldType::OLAP_FIELD_TYPE_BIGINT);
auto* auto_inc_column =
assert_cast<vectorized::ColumnInt64*, TypeCheckOnRelease::DISABLE>(missing_col.get());
auto_inc_column->insert(vectorized::Field::create_field<TYPE_BIGINT>(
assert_cast<const vectorized::ColumnInt64*, TypeCheckOnRelease::DISABLE>(
block->get_by_name(BeConsts::PARTIAL_UPDATE_AUTO_INC_COL).column.get())->get_element(idx)));
assert_cast<vectorized::ColumnInt64*>(missing_col.get());
int pos = block->get_position_by_name(BeConsts::PARTIAL_UPDATE_AUTO_INC_COL);
if (pos == -1) {
return Status::InternalError("auto increment column not found in block {}",
block->dump_structure());
}
auto_inc_column->insert_from(*block->get_by_position(pos).column.get(), idx);
} else {
// If the control flow reaches this branch, the column neither has default value
// nor is nullable. It means that the row's delete sign is marked, and the value
// columns are useless and won't be read. So we can just put arbitary values in the cells
missing_col->insert(tablet_column.get_vec_type()->get_default());
}
// clang-format on
} else {
missing_col->insert_from(*old_value_block.get_by_position(i).column,
pos_in_old_block);
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -478,10 +478,10 @@ Status PushBrokerReader::_cast_to_input_block() {
if (slot_desc->type()->get_primitive_type() == PrimitiveType::TYPE_VARIANT) {
continue;
}
auto& arg = _src_block_ptr->get_by_name(slot_desc->col_name());
// remove nullable here, let the get_function decide whether nullable
auto return_type = slot_desc->get_data_type_ptr();
idx = _src_block_name_to_idx[slot_desc->col_name()];
auto& arg = _src_block_ptr->get_by_position(idx);
// bitmap convert:src -> to_base64 -> bitmap_from_base64
if (slot_desc->type()->get_primitive_type() == TYPE_BITMAP) {
auto base64_return_type = vectorized::DataTypeFactory::instance().create_data_type(
Expand All @@ -491,7 +491,7 @@ Status PushBrokerReader::_cast_to_input_block() {
RETURN_IF_ERROR(func_to_base64->execute(nullptr, *_src_block_ptr, {idx}, idx,
arg.column->size()));
_src_block_ptr->get_by_position(idx).type = std::move(base64_return_type);
auto& arg_base64 = _src_block_ptr->get_by_name(slot_desc->col_name());
auto& arg_base64 = _src_block_ptr->get_by_position(idx);
auto func_bitmap_from_base64 =
vectorized::SimpleFunctionFactory::instance().get_function(
"bitmap_from_base64", {arg_base64}, return_type);
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/tablet_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,6 @@ class TabletSchema : public MetadataAdder<TabletSchema> {
void set_skip_write_index_on_load(bool skip) { _skip_write_index_on_load = skip; }
bool skip_write_index_on_load() const { return _skip_write_index_on_load; }
int32_t delete_sign_idx() const { return _delete_sign_idx; }
void set_delete_sign_idx(int32_t delete_sign_idx) { _delete_sign_idx = delete_sign_idx; }
bool has_sequence_col() const { return _sequence_col_idx != -1; }
int32_t sequence_col_idx() const { return _sequence_col_idx; }
void set_version_col_idx(int32_t version_col_idx) { _version_col_idx = version_col_idx; }
Expand Down
5 changes: 4 additions & 1 deletion be/src/pipeline/exec/materialization_opertor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,11 @@ Status MaterializationSharedState::merge_multi_response() {
for (int i = 0; i < block_order_results.size(); ++i) {
for (auto& [backend_id, rpc_struct] : rpc_struct_map) {
vectorized::Block partial_block;
size_t uncompressed_size = 0;
int64_t uncompressed_time = 0;
DCHECK(rpc_struct.response.blocks_size() > i);
RETURN_IF_ERROR(partial_block.deserialize(rpc_struct.response.blocks(i).block()));
RETURN_IF_ERROR(partial_block.deserialize(rpc_struct.response.blocks(i).block(),
&uncompressed_size, &uncompressed_time));
if (rpc_struct.response.blocks(i).has_profile()) {
auto response_profile =
RuntimeProfile::from_proto(rpc_struct.response.blocks(i).profile());
Expand Down
7 changes: 0 additions & 7 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1318,13 +1318,6 @@ Status ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized:
bool* eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
// in inverted index apply logic, in order to optimize query performance,
// we built some temporary columns into block, these columns only used in scan node level,
// remove them when query leave scan node to avoid other nodes use block->columns() to make a wrong decision
Defer drop_block_temp_column {[&]() {
std::unique_lock l(local_state._block_lock);
block->erase_tmp_columns();
}};

if (state->is_cancelled()) {
if (local_state._scanner_ctx) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/schema_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ Status SchemaScanOperatorX::prepare(RuntimeState* state) {
int j = 0;
for (; j < columns_desc.size(); ++j) {
if (boost::iequals(_dest_tuple_desc->slots()[i]->col_name(), columns_desc[j].name)) {
_slot_offsets[i] = j;
break;
}
}
Expand Down Expand Up @@ -250,11 +251,10 @@ Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block* bl
if (src_block.rows()) {
// block->check_number_of_rows();
for (int i = 0; i < _slot_num; ++i) {
auto* dest_slot_desc = _dest_tuple_desc->slots()[i];
vectorized::MutableColumnPtr column_ptr =
std::move(*block->get_by_position(i).column).mutate();
column_ptr->insert_range_from(
*src_block.get_by_name(dest_slot_desc->col_name()).column, 0,
*src_block.safe_get_by_position(_slot_offsets[i]).column, 0,
src_block.rows());
}
RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block,
Expand Down
5 changes: 5 additions & 0 deletions be/src/pipeline/exec/schema_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

#include <stdint.h>

#include <unordered_map>

#include "common/status.h"
#include "exec/schema_scanner.h"
#include "operator.h"
Expand Down Expand Up @@ -85,6 +87,9 @@ class SchemaScanOperatorX final : public OperatorX<SchemaScanLocalState> {
int _tuple_idx;
// slot num need to fill in and return
int _slot_num;

// slot index mapping to src column index
std::unordered_map<int, int> _slot_offsets;
};

#include "common/compile_check_end.h"
Expand Down
4 changes: 3 additions & 1 deletion be/src/runtime/tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,9 @@ Status BaseTabletsChannel::_write_block_data(
std::unordered_map<int64_t, DorisVector<uint32_t>>& tablet_to_rowidxs,
PTabletWriterAddBlockResult* response) {
vectorized::Block send_data;
RETURN_IF_ERROR(send_data.deserialize(request.block()));
[[maybe_unused]] size_t uncompressed_size = 0;
[[maybe_unused]] int64_t uncompressed_time = 0;
RETURN_IF_ERROR(send_data.deserialize(request.block(), &uncompressed_size, &uncompressed_time));
CHECK(send_data.rows() == request.tablet_ids_size())
<< "block rows: " << send_data.rows()
<< ", tablet_ids_size: " << request.tablet_ids_size();
Expand Down
5 changes: 4 additions & 1 deletion be/src/service/arrow_flight/arrow_flight_batch_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,10 @@ arrow::Status ArrowFlightBatchRemoteReader::_fetch_data() {
{
SCOPED_ATOMIC_TIMER(&_deserialize_block_timer);
_block = vectorized::Block::create_shared();
st = _block->deserialize(callback->response_->block());
[[maybe_unused]] size_t uncompressed_size = 0;
[[maybe_unused]] int64_t uncompressed_time = 0;
st = _block->deserialize(callback->response_->block(), &uncompressed_size,
&uncompressed_time);
ARROW_RETURN_NOT_OK(to_arrow_status(st));
break;
}
Expand Down
8 changes: 6 additions & 2 deletions be/src/vec/aggregate_functions/aggregate_function_sort.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ struct AggregateFunctionSortData {
PBlock pblock;
size_t uncompressed_bytes = 0;
size_t compressed_bytes = 0;
int64_t compressed_time = 0;
auto st = block.serialize(state->be_exec_version(), &pblock, &uncompressed_bytes,
&compressed_bytes, segment_v2::CompressionTypePB::NO_COMPRESSION);
&compressed_bytes, &compressed_time,
segment_v2::CompressionTypePB::NO_COMPRESSION);
if (!st.ok()) {
throw doris::Exception(st);
}
Expand All @@ -87,7 +89,9 @@ struct AggregateFunctionSortData {

PBlock pblock;
pblock.ParseFromString(data);
auto st = block.deserialize(pblock);
[[maybe_unused]] size_t uncompressed_size = 0;
[[maybe_unused]] int64_t uncompressed_time = 0;
auto st = block.deserialize(pblock, &uncompressed_size, &uncompressed_time);
// If memory allocate failed during deserialize, st is not ok, throw exception here to
// stop the query.
if (!st.ok()) {
Expand Down
Loading
Loading