Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions be/src/olap/partial_update_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,9 +433,12 @@ Status FixedReadPlan::fill_missing_columns(
DCHECK(column.type() == FieldType::OLAP_FIELD_TYPE_BIGINT);
auto* auto_inc_column =
assert_cast<vectorized::ColumnInt64*>(missing_col.get());
auto_inc_column->insert_from(
*block->get_by_name(BeConsts::PARTIAL_UPDATE_AUTO_INC_COL).column.get(),
idx);
int pos = block->get_position_by_name(BeConsts::PARTIAL_UPDATE_AUTO_INC_COL);
if (pos == -1) {
return Status::InternalError("auto increment column not found in block {}",
block->dump_structure());
}
auto_inc_column->insert_from(*block->get_by_position(pos).column.get(), idx);
} else {
// If the control flow reaches this branch, the column neither has default value
// nor is nullable. It means that the row's delete sign is marked, and the value
Expand Down
18 changes: 0 additions & 18 deletions be/src/vec/core/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,24 +241,6 @@ const ColumnWithTypeAndName& Block::safe_get_by_position(size_t position) const
return data[position];
}

ColumnWithTypeAndName& Block::get_by_name(const std::string& name) {
int pos = get_position_by_name(name);
if (pos == -1) {
throw Exception(ErrorCode::INTERNAL_ERROR, "No such name in Block, name={}, block_names={}",
name, dump_names());
}
return data[pos];
}

const ColumnWithTypeAndName& Block::get_by_name(const std::string& name) const {
int pos = get_position_by_name(name);
if (pos == -1) {
throw Exception(ErrorCode::INTERNAL_ERROR, "No such name in Block, name={}, block_names={}",
name, dump_names());
}
return data[pos];
}

int Block::get_position_by_name(const std::string& name) const {
for (int i = 0; i < data.size(); i++) {
if (data[i].name == name) {
Expand Down
15 changes: 7 additions & 8 deletions be/src/vec/core/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,13 @@ class Block {
std::swap(data, new_data);
}

// Use this method only when you are certain index_by_name will not be used
// This is a temporary compromise; index_by_name may be removed in the future
void simple_insert(const ColumnWithTypeAndName& elem) { data.emplace_back(elem); }
std::unordered_map<std::string, uint32_t> get_name_to_pos_map() const {
std::unordered_map<std::string, uint32_t> name_to_index_map;
for (uint32_t i = 0; i < data.size(); ++i) {
name_to_index_map[data[i].name] = i;
}
return name_to_index_map;
}

/// References are invalidated after calling functions above.
ColumnWithTypeAndName& get_by_position(size_t position) {
Expand All @@ -144,11 +148,6 @@ class Block {
ColumnWithTypeAndName& safe_get_by_position(size_t position);
const ColumnWithTypeAndName& safe_get_by_position(size_t position) const;

// Get column by name. Throws an exception if there is no column with that name.
// ATTN: this method is O(N). better maintain name -> position map in caller if you need to call it frequently.
ColumnWithTypeAndName& get_by_name(const std::string& name);
const ColumnWithTypeAndName& get_by_name(const std::string& name) const;

Container::iterator begin() { return data.begin(); }
Container::iterator end() { return data.end(); }
Container::const_iterator begin() const { return data.begin(); }
Expand Down
92 changes: 57 additions & 35 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1283,11 +1283,10 @@ Status OrcReader::_fill_partition_columns(
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
partition_columns) {
DataTypeSerDe::FormatOptions _text_formatOptions;
// todo: maybe do not need to build name to index map every time
auto name_to_pos_map = block->get_name_to_pos_map();
for (const auto& kv : partition_columns) {
auto doris_column = block->get_by_name(kv.first).column;
// block is a Block*, and get_by_name returns a ColumnPtr,
// which is a const pointer. Therefore, using const_cast is permissible.
auto* col_ptr = const_cast<IColumn*>(doris_column.get());
auto col_ptr = block->get_by_position(name_to_pos_map[kv.first]).column->assume_mutable();
const auto& [value, slot_desc] = kv.second;
auto text_serde = slot_desc->get_data_type_ptr()->get_serde();
Slice slice(value.data(), value.size());
Expand All @@ -1312,10 +1311,18 @@ Status OrcReader::_fill_partition_columns(
Status OrcReader::_fill_missing_columns(
Block* block, uint64_t rows,
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
// todo: maybe do not need to build name to index map every time
auto name_to_pos_map = block->get_name_to_pos_map();
std::set<size_t> positions_to_erase;
for (const auto& kv : missing_columns) {
if (!name_to_pos_map.contains(kv.first)) {
return Status::InternalError("Failed to find missing column: {}, block: {}", kv.first,
block->dump_structure());
}
if (kv.second == nullptr) {
// no default column, fill with null
auto mutable_column = block->get_by_name(kv.first).column->assume_mutable();
auto mutable_column =
block->get_by_position(name_to_pos_map[kv.first]).column->assume_mutable();
auto* nullable_column = static_cast<vectorized::ColumnNullable*>(mutable_column.get());
nullable_column->insert_many_defaults(rows);
} else {
Expand All @@ -1335,19 +1342,16 @@ Status OrcReader::_fill_missing_columns(
mutable_column->resize(rows);
// result_column_ptr maybe a ColumnConst, convert it to a normal column
result_column_ptr = result_column_ptr->convert_to_full_column_if_const();
auto origin_column_type = block->get_by_name(kv.first).type;
auto origin_column_type = block->get_by_position(name_to_pos_map[kv.first]).type;
bool is_nullable = origin_column_type->is_nullable();
int pos = block->get_position_by_name(kv.first);
if (pos == -1) {
return Status::InternalError("Failed to find column: {}, block: {}", kv.first,
block->dump_structure());
}
block->replace_by_position(
pos, is_nullable ? make_nullable(result_column_ptr) : result_column_ptr);
block->erase(result_column_id);
name_to_pos_map[kv.first],
is_nullable ? make_nullable(result_column_ptr) : result_column_ptr);
positions_to_erase.insert(result_column_id);
}
}
}
block->erase(positions_to_erase);
return Status::OK();
}

Expand Down Expand Up @@ -1988,8 +1992,10 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo
std::vector<orc::ColumnVectorBatch*> batch_vec;
_fill_batch_vec(batch_vec, _batch.get(), 0);

// todo: maybe do not need to build name to index map every time
auto name_to_pos_map = block->get_name_to_pos_map();
for (auto& col_name : _lazy_read_ctx.lazy_read_columns) {
auto& column_with_type_and_name = block->get_by_name(col_name);
auto& column_with_type_and_name = block->get_by_position(name_to_pos_map[col_name]);
auto& column_ptr = column_with_type_and_name.column;
auto& column_type = column_with_type_and_name.type;
auto file_column_name = _table_info_node_ptr->children_file_column_name(col_name);
Expand Down Expand Up @@ -2055,15 +2061,17 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo
}
}

// todo: maybe do not need to build name to index map every time
auto name_to_pos_map = block->get_name_to_pos_map();
if (!_dict_cols_has_converted && !_dict_filter_cols.empty()) {
for (auto& dict_filter_cols : _dict_filter_cols) {
MutableColumnPtr dict_col_ptr = ColumnInt32::create();
int pos = block->get_position_by_name(dict_filter_cols.first);
if (pos == -1) {
if (!name_to_pos_map.contains(dict_filter_cols.first)) {
return Status::InternalError(
"Failed to find dict filter column '{}' in block {}",
dict_filter_cols.first, block->dump_structure());
}
auto pos = name_to_pos_map[dict_filter_cols.first];
auto& column_with_type_and_name = block->get_by_position(pos);
auto& column_type = column_with_type_and_name.type;
if (column_type->is_nullable()) {
Expand All @@ -2085,7 +2093,7 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo
_fill_batch_vec(batch_vec, _batch.get(), 0);

for (auto& col_name : _lazy_read_ctx.all_read_columns) {
auto& column_with_type_and_name = block->get_by_name(col_name);
auto& column_with_type_and_name = block->get_by_position(name_to_pos_map[col_name]);
auto& column_ptr = column_with_type_and_name.column;
auto& column_type = column_with_type_and_name.type;
auto file_column_name = _table_info_node_ptr->children_file_column_name(col_name);
Expand Down Expand Up @@ -2196,19 +2204,27 @@ void OrcReader::_build_delete_row_filter(const Block* block, size_t rows) {
if (_delete_rows != nullptr) {
_delete_rows_filter_ptr = std::make_unique<IColumn::Filter>(rows, 1);
auto* __restrict _pos_delete_filter_data = _delete_rows_filter_ptr->data();
// todo: maybe do not need to build name to index map every time
auto name_to_pos_map = block->get_name_to_pos_map();
const auto& original_transaction_column = assert_cast<const ColumnInt64&>(*remove_nullable(
block->get_by_name(TransactionalHive::ORIGINAL_TRANSACTION_LOWER_CASE).column));
const auto& bucket_id_column = assert_cast<const ColumnInt32&>(
*remove_nullable(block->get_by_name(TransactionalHive::BUCKET_LOWER_CASE).column));
const auto& row_id_column = assert_cast<const ColumnInt64&>(
*remove_nullable(block->get_by_name(TransactionalHive::ROW_ID_LOWER_CASE).column));
block->get_by_position(
name_to_pos_map[TransactionalHive::ORIGINAL_TRANSACTION_LOWER_CASE])
.column));
const auto& bucket_id_column = assert_cast<const ColumnInt32&>(*remove_nullable(
block->get_by_position(name_to_pos_map[TransactionalHive::BUCKET_LOWER_CASE])
.column));
const auto& row_id_column = assert_cast<const ColumnInt64&>(*remove_nullable(
block->get_by_position(name_to_pos_map[TransactionalHive::ROW_ID_LOWER_CASE])
.column));
for (int i = 0; i < rows; ++i) {
auto original_transaction = original_transaction_column.get_int(i);
auto bucket_id = bucket_id_column.get_int(i);
auto row_id = row_id_column.get_int(i);

TransactionalHiveReader::AcidRowID transactional_row_id = {original_transaction,
bucket_id, row_id};
TransactionalHiveReader::AcidRowID transactional_row_id = {
.original_transaction = original_transaction,
.bucket = bucket_id,
.row_id = row_id};
if (_delete_rows->contains(transactional_row_id)) {
_pos_delete_filter_data[i] = 0;
}
Expand All @@ -2222,13 +2238,15 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
size_t origin_column_num = block->columns();

if (!_dict_cols_has_converted && !_dict_filter_cols.empty()) {
// todo: maybe do not need to build name to index map every time
auto name_to_pos_map = block->get_name_to_pos_map();
for (auto& dict_filter_cols : _dict_filter_cols) {
MutableColumnPtr dict_col_ptr = ColumnInt32::create();
int pos = block->get_position_by_name(dict_filter_cols.first);
if (pos == -1) {
return Status::InternalError("Wrong read column '{}' in orc file, block: {}",
if (!name_to_pos_map.contains(dict_filter_cols.first)) {
return Status::InternalError("Failed to find dict filter column '{}' in block {}",
dict_filter_cols.first, block->dump_structure());
}
MutableColumnPtr dict_col_ptr = ColumnInt32::create();
auto pos = name_to_pos_map[dict_filter_cols.first];
auto& column_with_type_and_name = block->get_by_position(pos);
auto& column_type = column_with_type_and_name.type;
if (column_type->is_nullable()) {
Expand All @@ -2254,8 +2272,10 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.begin(),
TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.end());
}
// todo: maybe do not need to build name to index map every time
auto name_to_pos_map = block->get_name_to_pos_map();
for (auto& table_col_name : table_col_names) {
auto& column_with_type_and_name = block->get_by_name(table_col_name);
auto& column_with_type_and_name = block->get_by_position(name_to_pos_map[table_col_name]);
auto& column_ptr = column_with_type_and_name.column;
auto& column_type = column_with_type_and_name.type;
auto file_column_name = _table_info_node_ptr->children_file_column_name(table_col_name);
Expand Down Expand Up @@ -2307,13 +2327,13 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
if (can_filter_all) {
for (auto& col : table_col_names) {
// clean block to read predicate columns and acid columns
block->get_by_name(col).column->assume_mutable()->clear();
block->get_by_position(name_to_pos_map[col]).column->assume_mutable()->clear();
}
for (auto& col : _lazy_read_ctx.predicate_partition_columns) {
block->get_by_name(col.first).column->assume_mutable()->clear();
block->get_by_position(name_to_pos_map[col.first]).column->assume_mutable()->clear();
}
for (auto& col : _lazy_read_ctx.predicate_missing_columns) {
block->get_by_name(col.first).column->assume_mutable()->clear();
block->get_by_position(name_to_pos_map[col.first]).column->assume_mutable()->clear();
}
Block::erase_useless_column(block, origin_column_num);
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, nullptr));
Expand Down Expand Up @@ -2627,12 +2647,14 @@ Status OrcReader::_convert_dict_cols_to_string_cols(
return Status::OK();
}
if (!_dict_filter_cols.empty()) {
// todo: maybe do not need to build name to index map every time
auto name_to_pos_map = block->get_name_to_pos_map();
for (auto& dict_filter_cols : _dict_filter_cols) {
int pos = block->get_position_by_name(dict_filter_cols.first);
if (pos == -1) {
return Status::InternalError("Wrong read column '{}' in orc file, block: {}",
if (!name_to_pos_map.contains(dict_filter_cols.first)) {
return Status::InternalError("Failed to find dict filter column '{}' in block {}",
dict_filter_cols.first, block->dump_structure());
}
auto pos = name_to_pos_map[dict_filter_cols.first];
ColumnWithTypeAndName& column_with_type_and_name = block->get_by_position(pos);
const ColumnPtr& column = column_with_type_and_name.column;

Expand Down
Loading
Loading