Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhance](runtime filter) impl partition pruning in runtime filer #47025

Merged
merged 21 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/vec/core/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ void Block::initialize_index_by_name() {
void Block::insert(size_t position, const ColumnWithTypeAndName& elem) {
if (position > data.size()) {
throw Exception(ErrorCode::INTERNAL_ERROR,
"invalid input position, position={}, data.size{}, names={}", position,
"invalid input position, position={}, data.size={}, names={}", position,
data.size(), dump_names());
}

Expand All @@ -164,7 +164,7 @@ void Block::insert(size_t position, const ColumnWithTypeAndName& elem) {
void Block::insert(size_t position, ColumnWithTypeAndName&& elem) {
if (position > data.size()) {
throw Exception(ErrorCode::INTERNAL_ERROR,
"invalid input position, position={}, data.size{}, names={}", position,
"invalid input position, position={}, data.size={}, names={}", position,
data.size(), dump_names());
}

Expand Down
162 changes: 156 additions & 6 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,29 @@
#include <fmt/format.h>
#include <gen_cpp/Exprs_types.h>
#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/Opcodes_types.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/PlanNodes_types.h>
#include <glog/logging.h>

#include <algorithm>
#include <boost/iterator/iterator_facade.hpp>
#include <iterator>
#include <map>
#include <ranges>
#include <tuple>
#include <unordered_map>
#include <utility>

#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "io/cache/block_file_cache_profile.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "runtime/types.h"
#include "util/runtime_profile.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/columns/column.h"
#include "vec/columns/column_nullable.h"
Expand Down Expand Up @@ -67,6 +74,7 @@
#include "vec/exec/scan/vscan_node.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/exprs/vexpr_fwd.h"
#include "vec/exprs/vslot_ref.h"
#include "vec/functions/function.h"
#include "vec/functions/function_string.h"
Expand Down Expand Up @@ -130,12 +138,17 @@ Status VFileScanner::prepare(RuntimeState* state, const VExprContextSPtrs& conju
ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerPreFilterTimer", 1);
_convert_to_output_block_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
"FileScannerConvertOuputBlockTime", 1);
_runtime_filter_partition_prune_timer = ADD_TIMER_WITH_LEVEL(
_local_state->scanner_profile(), "FileScannerRuntimeFilterPartitionPruningTime", 1);
_empty_file_counter =
ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "EmptyFileNum", TUnit::UNIT, 1);
_not_found_file_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
"NotFoundFileNum", TUnit::UNIT, 1);
_file_counter =
ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT, 1);
_runtime_filter_partition_pruned_range_counter =
ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
"RuntimeFilterPartitionPrunedRangeNum", TUnit::UNIT, 1);

_file_cache_statistics.reset(new io::FileCacheStatistics());
_io_ctx.reset(new io::IOContext());
Expand Down Expand Up @@ -174,6 +187,113 @@ Status VFileScanner::prepare(RuntimeState* state, const VExprContextSPtrs& conju
return Status::OK();
}

// check if the expr is a partition pruning expr
bool VFileScanner::_check_partition_prune_expr(const VExprSPtr& expr) {
if (expr->is_slot_ref()) {
auto* slot_ref = static_cast<VSlotRef*>(expr.get());
return _partition_slot_index_map.find(slot_ref->slot_id()) !=
_partition_slot_index_map.end();
}
if (expr->is_literal()) {
return true;
}
return std::ranges::all_of(expr->children(), [this](const auto& child) {
return _check_partition_prune_expr(child);
});
}

void VFileScanner::_init_runtime_filter_partition_prune_ctxs() {
_runtime_filter_partition_prune_ctxs.clear();
for (auto& conjunct : _conjuncts) {
auto impl = conjunct->root()->get_impl();
// If impl is not null, which means this a conjuncts from runtime filter.
auto expr = impl ? impl : conjunct->root();
if (_check_partition_prune_expr(expr)) {
_runtime_filter_partition_prune_ctxs.emplace_back(conjunct);
}
}
}

void VFileScanner::_init_runtime_filter_partition_prune_block() {
// init block with empty column
for (auto const* slot_desc : _real_tuple_desc->slots()) {
if (!slot_desc->need_materialize()) {
// should be ignored from reading
continue;
}
_runtime_filter_partition_prune_block.insert(
ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
slot_desc->get_data_type_ptr(), slot_desc->col_name()));
}
}

Status VFileScanner::_process_runtime_filters_partition_prune(bool& can_filter_all) {
SCOPED_TIMER(_runtime_filter_partition_prune_timer);
if (_runtime_filter_partition_prune_ctxs.empty() || _partition_col_descs.empty()) {
return Status::OK();
}
size_t partition_value_column_size = 1;

// 1. Get partition key values to string columns.
std::unordered_map<SlotId, MutableColumnPtr> parititon_slot_id_to_column;
for (auto const& partition_col_desc : _partition_col_descs) {
const auto& [partition_value, partition_slot_desc] = partition_col_desc.second;
auto test_serde = partition_slot_desc->get_data_type_ptr()->get_serde();
auto partition_value_column = partition_slot_desc->get_data_type_ptr()->create_column();
auto* col_ptr = static_cast<IColumn*>(partition_value_column.get());
Slice slice(partition_value.data(), partition_value.size());
int num_deserialized = 0;
RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
*col_ptr, slice, partition_value_column_size, &num_deserialized, {}));
parititon_slot_id_to_column[partition_slot_desc->id()] = std::move(partition_value_column);
}

// 2. Fill _runtime_filter_partition_prune_block from the partition column, then execute conjuncts and filter block.
// 2.1 Fill _runtime_filter_partition_prune_block from the partition column to match the conjuncts executing.
size_t index = 0;
bool first_column_filled = false;
for (auto const* slot_desc : _real_tuple_desc->slots()) {
if (!slot_desc->need_materialize()) {
// should be ignored from reading
continue;
}
if (parititon_slot_id_to_column.find(slot_desc->id()) !=
parititon_slot_id_to_column.end()) {
auto data_type = slot_desc->get_data_type_ptr();
auto partition_value_column = std::move(parititon_slot_id_to_column[slot_desc->id()]);
if (data_type->is_nullable()) {
_runtime_filter_partition_prune_block.insert(
index, ColumnWithTypeAndName(
ColumnNullable::create(
std::move(partition_value_column),
ColumnUInt8::create(partition_value_column_size, 0)),
data_type, slot_desc->col_name()));
} else {
_runtime_filter_partition_prune_block.insert(
index, ColumnWithTypeAndName(std::move(partition_value_column), data_type,
slot_desc->col_name()));
}
if (index == 0) {
first_column_filled = true;
}
}
index++;
}

// 2.2 Execute conjuncts.
if (!first_column_filled) {
// VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
// The following process may be tricky and time-consuming, but we have no other way.
_runtime_filter_partition_prune_block.get_by_position(0).column->assume_mutable()->resize(
partition_value_column_size);
}
IColumn::Filter result_filter(_runtime_filter_partition_prune_block.rows(), 1);
RETURN_IF_ERROR(VExprContext::execute_conjuncts(_runtime_filter_partition_prune_ctxs, nullptr,
&_runtime_filter_partition_prune_block,
&result_filter, &can_filter_all));
return Status::OK();
}

Status VFileScanner::_process_conjuncts_for_dict_filter() {
_slot_id_to_filter_conjuncts.clear();
_not_single_slot_filter_conjuncts.clear();
Expand Down Expand Up @@ -237,6 +357,11 @@ Status VFileScanner::open(RuntimeState* state) {
RETURN_IF_ERROR(_split_source->get_next(&_first_scan_range, &_current_range));
if (_first_scan_range) {
RETURN_IF_ERROR(_init_expr_ctxes());
if (_state->query_options().enable_runtime_filter_partition_prune &&
!_partition_slot_index_map.empty()) {
_init_runtime_filter_partition_prune_ctxs();
_init_runtime_filter_partition_prune_block();
}
} else {
// there's no scan range in split source. stop scanner directly.
_scanner_eof = true;
Expand Down Expand Up @@ -752,6 +877,29 @@ Status VFileScanner::_get_next_reader() {
const TFileRangeDesc& range = _current_range;
_current_range_path = range.path;

if (!_partition_slot_descs.empty()) {
// we need get partition columns first for runtime filter partition pruning
RETURN_IF_ERROR(_generate_parititon_columns());

if (_state->query_options().enable_runtime_filter_partition_prune) {
// if enable_runtime_filter_partition_prune is true, we need to check whether this range can be filtered out
// by runtime filter partition prune
if (_push_down_conjuncts.size() < _conjuncts.size()) {
// there are new runtime filters, need to re-init runtime filter partition pruning ctxs
_init_runtime_filter_partition_prune_ctxs();
}

bool can_filter_all = false;
RETURN_IF_ERROR(_process_runtime_filters_partition_prune(can_filter_all));
if (can_filter_all) {
// this range can be filtered out by runtime filter partition pruning
// so we need to skip this range
COUNTER_UPDATE(_runtime_filter_partition_pruned_range_counter, 1);
continue;
}
}
}

// create reader for specific format
Status init_status;
// for compatibility, if format_type is not set in range, use the format type of params
Expand Down Expand Up @@ -1012,7 +1160,8 @@ Status VFileScanner::_get_next_reader() {
_missing_cols.clear();
RETURN_IF_ERROR(_cur_reader->get_columns(&_name_to_col_type, &_missing_cols));
_cur_reader->set_push_down_agg_type(_get_push_down_agg_type());
RETURN_IF_ERROR(_generate_fill_columns());
RETURN_IF_ERROR(_generate_missing_columns());
RETURN_IF_ERROR(_cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs));
if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
fmt::memory_buffer col_buf;
for (auto& col : _missing_cols) {
Expand Down Expand Up @@ -1042,10 +1191,8 @@ Status VFileScanner::_get_next_reader() {
return Status::OK();
}

Status VFileScanner::_generate_fill_columns() {
Status VFileScanner::_generate_parititon_columns() {
_partition_col_descs.clear();
_missing_col_descs.clear();

const TFileRangeDesc& range = _current_range;
if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
for (const auto& slot_desc : _partition_slot_descs) {
Expand All @@ -1066,7 +1213,11 @@ Status VFileScanner::_generate_fill_columns() {
}
}
}
return Status::OK();
}

Status VFileScanner::_generate_missing_columns() {
_missing_col_descs.clear();
if (!_missing_cols.empty()) {
for (auto slot_desc : _real_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
Expand All @@ -1084,8 +1235,7 @@ Status VFileScanner::_generate_fill_columns() {
_missing_col_descs.emplace(slot_desc->col_name(), it->second);
}
}

return _cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs);
return Status::OK();
}

Status VFileScanner::_init_expr_ctxes() {
Expand Down
12 changes: 11 additions & 1 deletion be/src/vec/exec/scan/vfile_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "vec/core/block.h"
#include "vec/exec/format/generic_reader.h"
#include "vec/exec/scan/vscanner.h"
#include "vec/exprs/vexpr_fwd.h"

namespace doris {
class RuntimeState;
Expand Down Expand Up @@ -163,6 +164,8 @@ class VFileScanner : public VScanner {
Block _src_block;

VExprContextSPtrs _push_down_conjuncts;
VExprContextSPtrs _runtime_filter_partition_prune_ctxs;
Block _runtime_filter_partition_prune_block;

std::unique_ptr<io::FileCacheStatistics> _file_cache_statistics;
std::unique_ptr<io::IOContext> _io_ctx;
Expand All @@ -183,9 +186,11 @@ class VFileScanner : public VScanner {
RuntimeProfile::Counter* _fill_missing_columns_timer = nullptr;
RuntimeProfile::Counter* _pre_filter_timer = nullptr;
RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr;
RuntimeProfile::Counter* _runtime_filter_partition_prune_timer = nullptr;
RuntimeProfile::Counter* _empty_file_counter = nullptr;
RuntimeProfile::Counter* _not_found_file_counter = nullptr;
RuntimeProfile::Counter* _file_counter = nullptr;
RuntimeProfile::Counter* _runtime_filter_partition_pruned_range_counter = nullptr;

const std::unordered_map<std::string, int>* _col_name_to_slot_id = nullptr;
// single slot filter conjuncts
Expand Down Expand Up @@ -213,7 +218,12 @@ class VFileScanner : public VScanner {
Status _convert_to_output_block(Block* block);
Status _truncate_char_or_varchar_columns(Block* block);
void _truncate_char_or_varchar_column(Block* block, int idx, int len);
Status _generate_fill_columns();
Status _generate_parititon_columns();
Status _generate_missing_columns();
bool _check_partition_prune_expr(const VExprSPtr& expr);
void _init_runtime_filter_partition_prune_ctxs();
void _init_runtime_filter_partition_prune_block();
Status _process_runtime_filters_partition_prune(bool& is_partition_pruned);
Status _process_conjuncts_for_dict_filter();
Status _process_late_arrival_conjuncts();
void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,3 @@ INSERT INTO employees VALUES


msck repair table employees;


Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
create database if not exists partition_tables;
use partition_tables;

CREATE TABLE decimal_partition_table (
id INT,
name STRING,
value FLOAT
)
PARTITIONED BY (partition_col DECIMAL(10, 2))
STORED AS PARQUET
LOCATION '/user/doris/preinstalled_data/partition_tables/decimal_partition_table';

CREATE TABLE int_partition_table (
id INT,
name STRING,
value FLOAT
)
PARTITIONED BY (partition_col INT)
STORED AS PARQUET
LOCATION '/user/doris/preinstalled_data/partition_tables/int_partition_table';

CREATE TABLE string_partition_table (
id INT,
name STRING,
value FLOAT
)
PARTITIONED BY (partition_col STRING)
STORED AS PARQUET
LOCATION '/user/doris/preinstalled_data/partition_tables/string_partition_table';

CREATE TABLE date_partition_table (
id INT,
name STRING,
value FLOAT
)
PARTITIONED BY (partition_col DATE)
STORED AS PARQUET
LOCATION '/user/doris/preinstalled_data/partition_tables/date_partition_table';


msck repair table decimal_partition_table;
msck repair table int_partition_table;
msck repair table string_partition_table;
msck repair table date_partition_table;
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading
Loading