diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 4c6e108a871aef..2594221e15305d 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -441,7 +441,7 @@ Status PartitionedAggSinkLocalState::revoke_memory( status = Status::InternalError( "fault_inject partitioned_agg_sink " "revoke_memory canceled"); - ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status); + state->get_query_ctx()->cancel(status); return status; }); Defer defer {[&]() { diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index 4869a02dc56d7d..9adff22d52fa85 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -302,7 +302,7 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b auto st = Status::InternalError( "fault_inject partitioned_agg_source " "merge spill data canceled"); - ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, st); + state->get_query_ctx()->cancel(st); return st; }); diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 40b13e77b591b5..223a7f24013500 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -231,12 +231,12 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat return Status::OK(); }; - auto exception_catch_func = [query_id, spill_func]() { + auto exception_catch_func = [query_id, state, spill_func]() { DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_cancel", { auto status = Status::InternalError( "fault_inject partitioned_hash_join_probe " "spill_probe_blocks canceled"); - ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status); + state->get_query_ctx()->cancel(status); return status; }); @@ -347,12 +347,13 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim return status; }; - auto exception_catch_func = [read_func, query_id]() { + auto exception_catch_func = [read_func, state, query_id]() { DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_build_blocks_cancel", { auto status = Status::InternalError( "fault_inject partitioned_hash_join_probe " "recover_build_blocks canceled"); - ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status); + + state->get_query_ctx()->cancel(status); return status; }); @@ -451,12 +452,12 @@ Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim return st; }; - auto exception_catch_func = [read_func, query_id]() { + auto exception_catch_func = [read_func, state, query_id]() { DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_probe_blocks_cancel", { auto status = Status::InternalError( "fault_inject partitioned_hash_join_probe " "recover_probe_blocks canceled"); - ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status); + state->get_query_ctx()->cancel(status); return status; }); diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index a2c1b7cefc2408..ae3ef2c4d57978 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -368,12 +368,12 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory( SpillSinkRunnable spill_runnable( state, nullptr, operator_profile(), - [this, query_id] { + [this, state, query_id] { DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::revoke_memory_cancel", { auto status = Status::InternalError( "fault_inject partitioned_hash_join_sink " "revoke_memory canceled"); - ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status); + state->get_query_ctx()->cancel(status); return status; }); SCOPED_TIMER(_spill_build_timer); diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index a2308ce415fff0..bac215e3f3c544 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -264,12 +264,12 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state, return Status::OK(); }; - auto exception_catch_func = [query_id, spill_func]() { + auto exception_catch_func = [query_id, state, spill_func]() { DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_cancel", { auto status = Status::InternalError( "fault_inject spill_sort_sink " "revoke_memory canceled"); - ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status); + state->get_query_ctx()->cancel(status); return status; }); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index e7f3ca3426c0f2..0e72f3f4f5da2c 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -912,6 +912,7 @@ void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) { return; } } + SCOPED_ATTACH_TASK(query_ctx->resource_ctx()); query_ctx->cancel(reason); remove_query_context(query_id); LOG(INFO) << "Query " << print_id(query_id) diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index 9577b87a4f1a5a..7e8354f6ff624a 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -122,19 +122,4 @@ AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() { ThreadLocalHandle::del_thread_local_if_count_is_zero(); } -AddThreadMemTrackerConsumerByHook::AddThreadMemTrackerConsumerByHook( - const std::shared_ptr& mem_tracker) - : _mem_tracker(mem_tracker) { - ThreadLocalHandle::create_thread_local_if_not_exits(); - DCHECK(mem_tracker != nullptr); - use_mem_hook = true; - thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(_mem_tracker.get()); -} - -AddThreadMemTrackerConsumerByHook::~AddThreadMemTrackerConsumerByHook() { - thread_context()->thread_mem_tracker_mgr->pop_consumer_tracker(); - use_mem_hook = false; - ThreadLocalHandle::del_thread_local_if_count_is_zero(); -} - } // namespace doris diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 94f50b48d17d6c..54e565cc424ff2 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -80,13 +80,6 @@ #define SCOPED_PEAK_MEM(peak_mem) \ auto VARNAME_LINENUM(scope_peak_mem) = doris::ScopedPeakMem(peak_mem) -// Count a code segment memory (memory malloc - memory free) to MemTracker. -// Compared to count `scope_mem`, MemTracker is easier to observe from the outside and is thread-safe. -// Usage example: std::unique_ptr tracker = std::make_unique("first_tracker"); -// { SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(_mem_tracker.get()); xxx; xxx; } -#define SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(mem_tracker) \ - auto VARNAME_LINENUM(add_mem_consumer) = doris::AddThreadMemTrackerConsumerByHook(mem_tracker) - #define SCOPED_SKIP_MEMORY_CHECK() \ auto VARNAME_LINENUM(scope_skip_memory_check) = doris::ScopeSkipMemoryCheck() @@ -154,8 +147,6 @@ static std::string NO_THREAD_CONTEXT_MSG = // Is true after ThreadContext construction. inline thread_local bool pthread_context_ptr_init = false; inline thread_local constinit ThreadContext* thread_context_ptr = nullptr; -// use mem hook to consume thread mem tracker. -inline thread_local bool use_mem_hook = false; // The thread context saves some info about a working thread. // 2 required info: @@ -383,15 +374,6 @@ class AddThreadMemTrackerConsumer { bool _need_pop = false; }; -class AddThreadMemTrackerConsumerByHook { -public: - explicit AddThreadMemTrackerConsumerByHook(const std::shared_ptr& mem_tracker); - ~AddThreadMemTrackerConsumerByHook(); - -private: - std::shared_ptr _mem_tracker; -}; - class ScopeSkipMemoryCheck { public: explicit ScopeSkipMemoryCheck() { @@ -409,7 +391,7 @@ class ScopeSkipMemoryCheck { // must call create_thread_local_if_not_exits() before use thread_context(). #define CONSUME_THREAD_MEM_TRACKER(size) \ do { \ - if (size == 0 || doris::use_mem_hook) { \ + if (size == 0) { \ break; \ } \ if (doris::pthread_context_ptr_init) { \