Skip to content

Commit

Permalink
update role state system
Browse files Browse the repository at this point in the history
  • Loading branch information
BiteTheDDDDt committed Feb 12, 2025
1 parent 0cf1271 commit 6f9bce6
Show file tree
Hide file tree
Showing 20 changed files with 401 additions and 377 deletions.
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) {
#ifndef NDEBUG
if (state->fuzzy_disable_runtime_filter_in_be()) {
if ((_parent->operator_id() + random()) % 2 == 0) {
RETURN_IF_ERROR(_runtime_filter_slots->disable_all_filters(state, _finish_dependency));
RETURN_IF_ERROR(
_runtime_filter_slots->skip_runtime_filters_process(state, _finish_dependency));
}
}
#endif
Expand Down
15 changes: 4 additions & 11 deletions be/src/runtime/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ Status RuntimeFilterMgr::register_local_merger_filter(
if (!iter->second.merger) {
std::shared_ptr<RuntimeFilterMerger> merge_filter;
RETURN_IF_ERROR(RuntimeFilterMerger::create(_state, &desc, &merge_filter));
merge_filter->set_ignored();
iter->second.merger = merge_filter;
}
iter->second.merge_time++;
Expand Down Expand Up @@ -198,7 +197,6 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
cnt_val->targetv2_info = targetv2_info;
RETURN_IF_ERROR(RuntimeFilterMerger::create(_state, runtime_filter_desc, &cnt_val->filter));
auto filter_id = runtime_filter_desc->filter_id;
cnt_val->filter->set_ignored();

std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
_filter_map.emplace(filter_id, cnt_val);
Expand Down Expand Up @@ -330,9 +328,9 @@ Status RuntimeFilterMergeControllerEntity::merge(std::weak_ptr<QueryContext> que
if (cnt_val->arrive_id.size() == 1 && cnt_val->runtime_filter_desc.is_broadcast_join) {
return Status::OK();
}
std::shared_ptr<RuntimeFilterMerger> tmp_filter;
std::shared_ptr<RuntimeFilterProducer> tmp_filter;
RETURN_IF_ERROR(
RuntimeFilterMerger::create(_state, &cnt_val->runtime_filter_desc, &tmp_filter));
RuntimeFilterProducer::create(_state, &cnt_val->runtime_filter_desc, &tmp_filter));

RETURN_IF_ERROR(tmp_filter->assign_data_into_wrapper(*request, attach_data));

Expand All @@ -359,13 +357,8 @@ Status RuntimeFilterMergeControllerEntity::merge(std::weak_ptr<QueryContext> que
void* data = nullptr;
int len = 0;
bool has_attachment = false;
if (!cnt_val->filter->get_ignored() && !cnt_val->filter->get_disabled()) {
RETURN_IF_ERROR(cnt_val->filter->serialize(&apply_request, &data, &len));
} else {
apply_request.set_ignored(cnt_val->filter->get_ignored());
apply_request.set_disabled(cnt_val->filter->get_disabled());
apply_request.set_filter_type(PFilterType::UNKNOW_FILTER);
}

RETURN_IF_ERROR(cnt_val->filter->serialize(&apply_request, &data, &len));

if (data != nullptr && len > 0) {
void* allocated = malloc(len);
Expand Down
3 changes: 1 addition & 2 deletions be/src/runtime/runtime_filter_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ class RuntimeFilterConsumer;
class MemTracker;
class MemTrackerLimiter;
class RuntimeState;
enum class RuntimeFilterRole;
class RuntimePredicateWrapper;
class RuntimeFilterWrapper;
class QueryContext;
struct RuntimeFilterParamsContext;
class ExecEnv;
Expand Down
30 changes: 6 additions & 24 deletions be/src/runtime_filter/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,7 @@ Status RuntimeFilter::_push_to_remote(RuntimeState* state, const TNetworkAddress
merge_filter_callback->cntl_->ignore_eovercrowded();
}

if (get_ignored() || get_disabled()) {
merge_filter_request->set_filter_type(PFilterType::UNKNOW_FILTER);
merge_filter_request->set_ignored(get_ignored());
merge_filter_request->set_disabled(get_disabled());
} else {
RETURN_IF_ERROR(serialize(merge_filter_request.get(), &data, &len));
}
RETURN_IF_ERROR(serialize(merge_filter_request.get(), &data, &len));

if (len > 0) {
DCHECK(data != nullptr);
Expand All @@ -94,20 +88,8 @@ Status RuntimeFilter::_push_to_remote(RuntimeState* state, const TNetworkAddress
return Status::OK();
}

void RuntimeFilter::set_ignored() {
_wrapper->set_ignored();
}

bool RuntimeFilter::get_ignored() {
return _wrapper->is_ignored();
}

void RuntimeFilter::set_disabled() {
_wrapper->set_disabled();
}

bool RuntimeFilter::get_disabled() const {
return _wrapper->is_disabled();
RuntimeFilterWrapper::State RuntimeFilter::_get_wrapper_state() {
return _wrapper->get_state();
}

Status RuntimeFilter::_init_with_desc(const TRuntimeFilterDesc* desc,
Expand Down Expand Up @@ -157,7 +139,7 @@ Status RuntimeFilter::_init_with_desc(const TRuntimeFilterDesc* desc,
}
}

_wrapper = std::make_shared<RuntimePredicateWrapper>(&params);
_wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
return Status::OK();
}

Expand All @@ -168,12 +150,12 @@ std::string RuntimeFilter::_debug_string() const {

void RuntimeFilter::_to_protobuf(PInFilter* filter) {
filter->set_column_type(to_proto(_wrapper->column_type()));
_wrapper->_context->hybrid_set->to_pb(filter);
_wrapper->_hybrid_set->to_pb(filter);
}

void RuntimeFilter::_to_protobuf(PMinMaxFilter* filter) {
filter->set_column_type(to_proto(_wrapper->column_type()));
_wrapper->_context->minmax_func->to_pb(filter);
_wrapper->_minmax_func->to_pb(filter);
}

RuntimeFilterType RuntimeFilter::get_real_type() {
Expand Down
24 changes: 10 additions & 14 deletions be/src/runtime_filter/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
#include "runtime_filter/utils.h"

namespace doris {
class RuntimePredicateWrapper;
class RuntimeFilterWrapper;
class RuntimeProfile;

/// The runtimefilter is built in the join node.
Expand All @@ -41,15 +41,6 @@ class RuntimeFilter {

bool has_local_target() const { return _has_local_target; }

RuntimeFilterState current_state() const { return _rf_state.load(std::memory_order_acquire); }

void set_ignored();

bool get_ignored();

void set_disabled();
bool get_disabled() const;

RuntimeFilterType get_real_type();

int filter_id() const { return _wrapper->_filter_id; }
Expand All @@ -66,6 +57,12 @@ class RuntimeFilter {
request->set_filter_type(get_type(real_runtime_filter_type));
request->set_contain_null(_wrapper->contain_null());

auto state = _wrapper->get_state();
if (state != RuntimeFilterWrapper::State::READY) {
request->set_ignored(state == RuntimeFilterWrapper::State::IGNORED);
request->set_disabled(state == RuntimeFilterWrapper::State::DISABLED);
}

if (real_runtime_filter_type == RuntimeFilterType::IN_FILTER) {
auto in_filter = request->mutable_in_filter();
_to_protobuf(in_filter);
Expand All @@ -90,7 +87,6 @@ class RuntimeFilter {
: _state(state),
_has_remote_target(desc->has_remote_targets),
_has_local_target(desc->has_local_targets),
_rf_state(RuntimeFilterState::NOT_READY),
_runtime_filter_type(get_runtime_filter_type(desc)) {
DCHECK_NE(_has_remote_target, _has_local_target);
}
Expand All @@ -106,16 +102,16 @@ class RuntimeFilter {

std::string _debug_string() const;

RuntimeFilterWrapper::State _get_wrapper_state();

RuntimeFilterParamsContext* _state = nullptr;
// _wrapper is a runtime filter function wrapper
std::shared_ptr<RuntimePredicateWrapper> _wrapper;
std::shared_ptr<RuntimeFilterWrapper> _wrapper;

// will apply to remote node
bool _has_remote_target;
// will apply to local node
bool _has_local_target;
// filter is ready for consumer
std::atomic<RuntimeFilterState> _rf_state;

// runtime filter type
RuntimeFilterType _runtime_filter_type;
Expand Down
34 changes: 15 additions & 19 deletions be/src/runtime_filter/runtime_filter_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@

namespace doris {

Status RuntimeFilterConsumer::get_push_expr_ctxs(
Status RuntimeFilterConsumer::apply_ready_expr(
std::list<vectorized::VExprContextSPtr>& probe_ctxs,
std::vector<vectorized::VRuntimeFilterPtr>& push_exprs, bool is_late_arrival) {
auto origin_size = push_exprs.size();
if (!_wrapper->is_ignored() && !_wrapper->is_disabled()) {
_is_push_down = !is_late_arrival;
if (_wrapper->get_state() == RuntimeFilterWrapper::State::READY) {
RETURN_IF_ERROR(_wrapper->get_push_exprs(probe_ctxs, push_exprs, _probe_expr));
}
_profile->add_info_string("Info", formatted_state());
Expand All @@ -38,16 +37,20 @@ Status RuntimeFilterConsumer::get_push_expr_ctxs(
push_exprs[i]->attach_profile_counter(expr_filtered_rows_counter, expr_input_rows_counter,
always_true_counter);
}
if (is_late_arrival) {
_rf_state = State::LATE_APPLIED;
} else {
_rf_state = State::APPLIED;
}
return Status::OK();
}

std::string RuntimeFilterConsumer::formatted_state() const {
return fmt::format(
"[Id = {}, IsPushDown = {}, RuntimeFilterState = {}, HasRemoteTarget = {}, "
"HasLocalTarget = {}, Ignored = {}, Disabled = {}, Type = {}, WaitTimeMS = {}]",
filter_id(), _is_push_down, to_string(_rf_state), _has_remote_target, _has_local_target,
_wrapper->_context->ignored, _wrapper->_context->disabled, _wrapper->get_real_type(),
_rf_wait_time_ms);
"[Id = {}, State = {}, HasRemoteTarget = {}, "
"HasLocalTarget = {}, Wrapper = [{}], WaitTimeMS = {}]",
filter_id(), _to_string(_rf_state), _has_remote_target, _has_local_target,
_wrapper->debug_string(), _rf_wait_time_ms);
}

void RuntimeFilterConsumer::init_profile(RuntimeProfile* parent_profile) {
Expand All @@ -62,14 +65,8 @@ void RuntimeFilterConsumer::init_profile(RuntimeProfile* parent_profile) {
}

void RuntimeFilterConsumer::signal() {
if (!_wrapper->is_ignored() && !_wrapper->is_disabled() && _wrapper->is_bloomfilter() &&
!_wrapper->get_bloomfilter()->inited()) {
throw Exception(ErrorCode::INTERNAL_ERROR, "bf not inited and not ignored/disabled, rf: {}",
debug_string());
}

COUNTER_SET(_wait_timer, int64_t((MonotonicMillis() - _registration_time) * NANOS_PER_MILLIS));
_rf_state.store(RuntimeFilterState::READY);
_rf_state = State::READY;
if (!_filter_timer.empty()) {
for (auto& timer : _filter_timer) {
timer->call_ready();
Expand Down Expand Up @@ -116,13 +113,12 @@ void RuntimeFilterConsumer::update_state() {
int64_t wait_times_ms = _runtime_filter_type == RuntimeFilterType::BITMAP_FILTER
? execution_timeout
: runtime_filter_wait_time_ms;
auto expected = _rf_state.load(std::memory_order_acquire);
// In pipelineX, runtime filters will be ready or timeout before open phase.
if (expected == RuntimeFilterState::NOT_READY) {

if (!is_applied()) {
DCHECK(MonotonicMillis() - _registration_time >= wait_times_ms);
COUNTER_SET(_wait_timer,
int64_t((MonotonicMillis() - _registration_time) * NANOS_PER_MILLIS));
_rf_state = RuntimeFilterState::TIMEOUT;
_rf_state = State::TIMEOUT;
}
}

Expand Down
48 changes: 43 additions & 5 deletions be/src/runtime_filter/runtime_filter_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ class RuntimeFilterConsumer : public RuntimeFilter {

int node_id() const { return _node_id; }

Status get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr>& probe_ctxs,
std::vector<vectorized::VRuntimeFilterPtr>& push_exprs,
bool is_late_arrival);
Status apply_ready_expr(std::list<vectorized::VExprContextSPtr>& probe_ctxs,
std::vector<vectorized::VRuntimeFilterPtr>& push_exprs,
bool is_late_arrival);

std::string formatted_state() const;

void init_profile(RuntimeProfile* parent_profile);
Expand All @@ -59,6 +60,19 @@ class RuntimeFilterConsumer : public RuntimeFilter {
std::to_string((double)local_merge_time / NANOS_PER_SEC) + " s");
}

bool is_ready() { return _rf_state == State::READY; }
bool is_applied() { return _rf_state == State::APPLIED; }

enum class State {
IGNORED,
DISABLED,
NOT_READY,
READY,
TIMEOUT,
APPLIED,
LATE_APPLIED,
};

private:
RuntimeFilterConsumer(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc,
int node_id)
Expand All @@ -68,16 +82,38 @@ class RuntimeFilterConsumer : public RuntimeFilter {
_profile(new RuntimeProfile(fmt::format("RuntimeFilter: (id = {}, type = {})",
desc->filter_id,
to_string(_runtime_filter_type)))),
_registration_time(MonotonicMillis()) {
_registration_time(MonotonicMillis()),
_rf_state(State::NOT_READY) {
// If bitmap filter is not applied, it will cause the query result to be incorrect
bool wait_infinitely = _state->get_query_ctx()->runtime_filter_wait_infinitely() ||
_runtime_filter_type == RuntimeFilterType::BITMAP_FILTER;
_rf_wait_time_ms = wait_infinitely ? _state->get_query_ctx()->execution_timeout() * 1000
: _state->get_query_ctx()->runtime_filter_wait_time_ms();
}

static std::string _to_string(const State& state) {
switch (state) {
case State::IGNORED:
return "IGNORED";
case State::DISABLED:
return "DISABLED";
case State::NOT_READY:
return "NOT_READY";
case State::READY:
return "READY";
case State::TIMEOUT:
return "TIMEOUT";
case State::APPLIED:
return "APPLIED";
case State::LATE_APPLIED:
return "LATE_APPLIED";
default:
throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, "Invalid State {}",
int(state));
}
}

int _node_id;
bool _is_push_down = false;

TExpr _probe_expr;

Expand All @@ -92,6 +128,8 @@ class RuntimeFilterConsumer : public RuntimeFilter {
int32_t _rf_wait_time_ms;
const int64_t _registration_time;

std::atomic<State> _rf_state;

friend class RuntimeFilterProducer;
};

Expand Down
29 changes: 0 additions & 29 deletions be/src/runtime_filter/runtime_filter_definitions.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,6 @@ enum class RuntimeFilterType {
MAX_FILTER // only max
};

enum class RuntimeFilterRole { PRODUCER, CONSUMER, MERGER };

enum class RuntimeFilterState {
WAITING_FOR_SEND_SIZE,
WAITING_FOR_SYNCED_SIZE,
WAITING_FOR_DATA,
READY_TO_PUBLISH,
PUBLISHED,
IGNORED,
DISABLED,
NOT_READY,
READY,
TIMEOUT,
APPLIED,
WAITING_FOR_PRODUCT,
};

struct RuntimeFilterParams {
RuntimeFilterType filter_type {};
PrimitiveType column_return_type {};
Expand Down Expand Up @@ -92,18 +75,6 @@ class HybridSetBase;
class BloomFilterFuncBase;
class BitmapFilterFuncBase;

struct RuntimeFilterContext {
std::shared_ptr<MinMaxFuncBase> minmax_func;
std::shared_ptr<HybridSetBase> hybrid_set;
std::shared_ptr<BloomFilterFuncBase> bloom_filter_func;
std::shared_ptr<BitmapFilterFuncBase> bitmap_filter_func;
bool ignored = false;
bool disabled = false;
std::string err_msg;
};

using RuntimeFilterContextSPtr = std::shared_ptr<RuntimeFilterContext>;

class RuntimeState;
class QueryContext;
class RuntimeFilterMgr;
Expand Down
Loading

0 comments on commit 6f9bce6

Please sign in to comment.