Skip to content

Commit

Permalink
[enhance](runtime filter) impl partition pruning in runtime filer (#4…
Browse files Browse the repository at this point in the history
…7025)

This PR implements partition pruning through runtime filters. When
executing a SQL query like:
```sql
SELECT count(*) 
FROM int_partition_table 
WHERE partition_col = (
    SELECT partition_col 
    FROM int_partition_table 
    GROUP BY partition_col 
    HAVING count(*) > 0 
    ORDER BY partition_col DESC 
    LIMIT 1
)
```
During execution, the backend (BE) will receive a dynamic runtime filter
condition `partition_col = xxx`. Since partition_col is a partitioning
column, we can use its value to determine if the partition can be
pruned.

Additionally, this mechanism also supports filtering queries like:
```sql
SELECT count(*) 
FROM int_partition_table 
WHERE func(partition_col) = xxx
```
If func cannot be evaluated at the frontend (FE), the frontend will not
perform partition pruning. However, since the backend can compute func,
this mechanism allows us to handle pruning scenarios that are not
possible at the frontend, providing a more efficient pruning process on
the backend side.
  • Loading branch information
suxiaogang223 authored Feb 10, 2025
1 parent 74e82fd commit e91468b
Show file tree
Hide file tree
Showing 21 changed files with 398 additions and 11 deletions.
4 changes: 2 additions & 2 deletions be/src/vec/core/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ void Block::initialize_index_by_name() {
void Block::insert(size_t position, const ColumnWithTypeAndName& elem) {
if (position > data.size()) {
throw Exception(ErrorCode::INTERNAL_ERROR,
"invalid input position, position={}, data.size{}, names={}", position,
"invalid input position, position={}, data.size={}, names={}", position,
data.size(), dump_names());
}

Expand All @@ -164,7 +164,7 @@ void Block::insert(size_t position, const ColumnWithTypeAndName& elem) {
void Block::insert(size_t position, ColumnWithTypeAndName&& elem) {
if (position > data.size()) {
throw Exception(ErrorCode::INTERNAL_ERROR,
"invalid input position, position={}, data.size{}, names={}", position,
"invalid input position, position={}, data.size={}, names={}", position,
data.size(), dump_names());
}

Expand Down
162 changes: 156 additions & 6 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,29 @@
#include <fmt/format.h>
#include <gen_cpp/Exprs_types.h>
#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/Opcodes_types.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/PlanNodes_types.h>
#include <glog/logging.h>

#include <algorithm>
#include <boost/iterator/iterator_facade.hpp>
#include <iterator>
#include <map>
#include <ranges>
#include <tuple>
#include <unordered_map>
#include <utility>

#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "io/cache/block_file_cache_profile.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "runtime/types.h"
#include "util/runtime_profile.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/columns/column.h"
#include "vec/columns/column_nullable.h"
Expand Down Expand Up @@ -67,6 +74,7 @@
#include "vec/exec/scan/vscan_node.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/exprs/vexpr_fwd.h"
#include "vec/exprs/vslot_ref.h"
#include "vec/functions/function.h"
#include "vec/functions/function_string.h"
Expand Down Expand Up @@ -130,12 +138,17 @@ Status VFileScanner::prepare(RuntimeState* state, const VExprContextSPtrs& conju
ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerPreFilterTimer", 1);
_convert_to_output_block_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
"FileScannerConvertOuputBlockTime", 1);
_runtime_filter_partition_prune_timer = ADD_TIMER_WITH_LEVEL(
_local_state->scanner_profile(), "FileScannerRuntimeFilterPartitionPruningTime", 1);
_empty_file_counter =
ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "EmptyFileNum", TUnit::UNIT, 1);
_not_found_file_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
"NotFoundFileNum", TUnit::UNIT, 1);
_file_counter =
ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT, 1);
_runtime_filter_partition_pruned_range_counter =
ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
"RuntimeFilterPartitionPrunedRangeNum", TUnit::UNIT, 1);

_file_cache_statistics.reset(new io::FileCacheStatistics());
_io_ctx.reset(new io::IOContext());
Expand Down Expand Up @@ -174,6 +187,113 @@ Status VFileScanner::prepare(RuntimeState* state, const VExprContextSPtrs& conju
return Status::OK();
}

// check if the expr is a partition pruning expr
bool VFileScanner::_check_partition_prune_expr(const VExprSPtr& expr) {
if (expr->is_slot_ref()) {
auto* slot_ref = static_cast<VSlotRef*>(expr.get());
return _partition_slot_index_map.find(slot_ref->slot_id()) !=
_partition_slot_index_map.end();
}
if (expr->is_literal()) {
return true;
}
return std::ranges::all_of(expr->children(), [this](const auto& child) {
return _check_partition_prune_expr(child);
});
}

void VFileScanner::_init_runtime_filter_partition_prune_ctxs() {
_runtime_filter_partition_prune_ctxs.clear();
for (auto& conjunct : _conjuncts) {
auto impl = conjunct->root()->get_impl();
// If impl is not null, which means this a conjuncts from runtime filter.
auto expr = impl ? impl : conjunct->root();
if (_check_partition_prune_expr(expr)) {
_runtime_filter_partition_prune_ctxs.emplace_back(conjunct);
}
}
}

void VFileScanner::_init_runtime_filter_partition_prune_block() {
// init block with empty column
for (auto const* slot_desc : _real_tuple_desc->slots()) {
if (!slot_desc->need_materialize()) {
// should be ignored from reading
continue;
}
_runtime_filter_partition_prune_block.insert(
ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
slot_desc->get_data_type_ptr(), slot_desc->col_name()));
}
}

Status VFileScanner::_process_runtime_filters_partition_prune(bool& can_filter_all) {
SCOPED_TIMER(_runtime_filter_partition_prune_timer);
if (_runtime_filter_partition_prune_ctxs.empty() || _partition_col_descs.empty()) {
return Status::OK();
}
size_t partition_value_column_size = 1;

// 1. Get partition key values to string columns.
std::unordered_map<SlotId, MutableColumnPtr> parititon_slot_id_to_column;
for (auto const& partition_col_desc : _partition_col_descs) {
const auto& [partition_value, partition_slot_desc] = partition_col_desc.second;
auto test_serde = partition_slot_desc->get_data_type_ptr()->get_serde();
auto partition_value_column = partition_slot_desc->get_data_type_ptr()->create_column();
auto* col_ptr = static_cast<IColumn*>(partition_value_column.get());
Slice slice(partition_value.data(), partition_value.size());
int num_deserialized = 0;
RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
*col_ptr, slice, partition_value_column_size, &num_deserialized, {}));
parititon_slot_id_to_column[partition_slot_desc->id()] = std::move(partition_value_column);
}

// 2. Fill _runtime_filter_partition_prune_block from the partition column, then execute conjuncts and filter block.
// 2.1 Fill _runtime_filter_partition_prune_block from the partition column to match the conjuncts executing.
size_t index = 0;
bool first_column_filled = false;
for (auto const* slot_desc : _real_tuple_desc->slots()) {
if (!slot_desc->need_materialize()) {
// should be ignored from reading
continue;
}
if (parititon_slot_id_to_column.find(slot_desc->id()) !=
parititon_slot_id_to_column.end()) {
auto data_type = slot_desc->get_data_type_ptr();
auto partition_value_column = std::move(parititon_slot_id_to_column[slot_desc->id()]);
if (data_type->is_nullable()) {
_runtime_filter_partition_prune_block.insert(
index, ColumnWithTypeAndName(
ColumnNullable::create(
std::move(partition_value_column),
ColumnUInt8::create(partition_value_column_size, 0)),
data_type, slot_desc->col_name()));
} else {
_runtime_filter_partition_prune_block.insert(
index, ColumnWithTypeAndName(std::move(partition_value_column), data_type,
slot_desc->col_name()));
}
if (index == 0) {
first_column_filled = true;
}
}
index++;
}

// 2.2 Execute conjuncts.
if (!first_column_filled) {
// VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
// The following process may be tricky and time-consuming, but we have no other way.
_runtime_filter_partition_prune_block.get_by_position(0).column->assume_mutable()->resize(
partition_value_column_size);
}
IColumn::Filter result_filter(_runtime_filter_partition_prune_block.rows(), 1);
RETURN_IF_ERROR(VExprContext::execute_conjuncts(_runtime_filter_partition_prune_ctxs, nullptr,
&_runtime_filter_partition_prune_block,
&result_filter, &can_filter_all));
return Status::OK();
}

Status VFileScanner::_process_conjuncts_for_dict_filter() {
_slot_id_to_filter_conjuncts.clear();
_not_single_slot_filter_conjuncts.clear();
Expand Down Expand Up @@ -237,6 +357,11 @@ Status VFileScanner::open(RuntimeState* state) {
RETURN_IF_ERROR(_split_source->get_next(&_first_scan_range, &_current_range));
if (_first_scan_range) {
RETURN_IF_ERROR(_init_expr_ctxes());
if (_state->query_options().enable_runtime_filter_partition_prune &&
!_partition_slot_index_map.empty()) {
_init_runtime_filter_partition_prune_ctxs();
_init_runtime_filter_partition_prune_block();
}
} else {
// there's no scan range in split source. stop scanner directly.
_scanner_eof = true;
Expand Down Expand Up @@ -752,6 +877,29 @@ Status VFileScanner::_get_next_reader() {
const TFileRangeDesc& range = _current_range;
_current_range_path = range.path;

if (!_partition_slot_descs.empty()) {
// we need get partition columns first for runtime filter partition pruning
RETURN_IF_ERROR(_generate_parititon_columns());

if (_state->query_options().enable_runtime_filter_partition_prune) {
// if enable_runtime_filter_partition_prune is true, we need to check whether this range can be filtered out
// by runtime filter partition prune
if (_push_down_conjuncts.size() < _conjuncts.size()) {
// there are new runtime filters, need to re-init runtime filter partition pruning ctxs
_init_runtime_filter_partition_prune_ctxs();
}

bool can_filter_all = false;
RETURN_IF_ERROR(_process_runtime_filters_partition_prune(can_filter_all));
if (can_filter_all) {
// this range can be filtered out by runtime filter partition pruning
// so we need to skip this range
COUNTER_UPDATE(_runtime_filter_partition_pruned_range_counter, 1);
continue;
}
}
}

// create reader for specific format
Status init_status;
// for compatibility, if format_type is not set in range, use the format type of params
Expand Down Expand Up @@ -1012,7 +1160,8 @@ Status VFileScanner::_get_next_reader() {
_missing_cols.clear();
RETURN_IF_ERROR(_cur_reader->get_columns(&_name_to_col_type, &_missing_cols));
_cur_reader->set_push_down_agg_type(_get_push_down_agg_type());
RETURN_IF_ERROR(_generate_fill_columns());
RETURN_IF_ERROR(_generate_missing_columns());
RETURN_IF_ERROR(_cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs));
if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
fmt::memory_buffer col_buf;
for (auto& col : _missing_cols) {
Expand Down Expand Up @@ -1042,10 +1191,8 @@ Status VFileScanner::_get_next_reader() {
return Status::OK();
}

Status VFileScanner::_generate_fill_columns() {
Status VFileScanner::_generate_parititon_columns() {
_partition_col_descs.clear();
_missing_col_descs.clear();

const TFileRangeDesc& range = _current_range;
if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
for (const auto& slot_desc : _partition_slot_descs) {
Expand All @@ -1066,7 +1213,11 @@ Status VFileScanner::_generate_fill_columns() {
}
}
}
return Status::OK();
}

Status VFileScanner::_generate_missing_columns() {
_missing_col_descs.clear();
if (!_missing_cols.empty()) {
for (auto slot_desc : _real_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
Expand All @@ -1084,8 +1235,7 @@ Status VFileScanner::_generate_fill_columns() {
_missing_col_descs.emplace(slot_desc->col_name(), it->second);
}
}

return _cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs);
return Status::OK();
}

Status VFileScanner::_init_expr_ctxes() {
Expand Down
12 changes: 11 additions & 1 deletion be/src/vec/exec/scan/vfile_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "vec/core/block.h"
#include "vec/exec/format/generic_reader.h"
#include "vec/exec/scan/vscanner.h"
#include "vec/exprs/vexpr_fwd.h"

namespace doris {
class RuntimeState;
Expand Down Expand Up @@ -163,6 +164,8 @@ class VFileScanner : public VScanner {
Block _src_block;

VExprContextSPtrs _push_down_conjuncts;
VExprContextSPtrs _runtime_filter_partition_prune_ctxs;
Block _runtime_filter_partition_prune_block;

std::unique_ptr<io::FileCacheStatistics> _file_cache_statistics;
std::unique_ptr<io::IOContext> _io_ctx;
Expand All @@ -183,9 +186,11 @@ class VFileScanner : public VScanner {
RuntimeProfile::Counter* _fill_missing_columns_timer = nullptr;
RuntimeProfile::Counter* _pre_filter_timer = nullptr;
RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr;
RuntimeProfile::Counter* _runtime_filter_partition_prune_timer = nullptr;
RuntimeProfile::Counter* _empty_file_counter = nullptr;
RuntimeProfile::Counter* _not_found_file_counter = nullptr;
RuntimeProfile::Counter* _file_counter = nullptr;
RuntimeProfile::Counter* _runtime_filter_partition_pruned_range_counter = nullptr;

const std::unordered_map<std::string, int>* _col_name_to_slot_id = nullptr;
// single slot filter conjuncts
Expand Down Expand Up @@ -213,7 +218,12 @@ class VFileScanner : public VScanner {
Status _convert_to_output_block(Block* block);
Status _truncate_char_or_varchar_columns(Block* block);
void _truncate_char_or_varchar_column(Block* block, int idx, int len);
Status _generate_fill_columns();
Status _generate_parititon_columns();
Status _generate_missing_columns();
bool _check_partition_prune_expr(const VExprSPtr& expr);
void _init_runtime_filter_partition_prune_ctxs();
void _init_runtime_filter_partition_prune_block();
Status _process_runtime_filters_partition_prune(bool& is_partition_pruned);
Status _process_conjuncts_for_dict_filter();
Status _process_late_arrival_conjuncts();
void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,3 @@ INSERT INTO employees VALUES


msck repair table employees;


Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
create database if not exists partition_tables;
use partition_tables;

CREATE TABLE decimal_partition_table (
id INT,
name STRING,
value FLOAT
)
PARTITIONED BY (partition_col DECIMAL(10, 2))
STORED AS PARQUET
LOCATION '/user/doris/preinstalled_data/partition_tables/decimal_partition_table';

CREATE TABLE int_partition_table (
id INT,
name STRING,
value FLOAT
)
PARTITIONED BY (partition_col INT)
STORED AS PARQUET
LOCATION '/user/doris/preinstalled_data/partition_tables/int_partition_table';

CREATE TABLE string_partition_table (
id INT,
name STRING,
value FLOAT
)
PARTITIONED BY (partition_col STRING)
STORED AS PARQUET
LOCATION '/user/doris/preinstalled_data/partition_tables/string_partition_table';

CREATE TABLE date_partition_table (
id INT,
name STRING,
value FLOAT
)
PARTITIONED BY (partition_col DATE)
STORED AS PARQUET
LOCATION '/user/doris/preinstalled_data/partition_tables/date_partition_table';


msck repair table decimal_partition_table;
msck repair table int_partition_table;
msck repair table string_partition_table;
msck repair table date_partition_table;
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading

0 comments on commit e91468b

Please sign in to comment.