From 018a689ce90beecad2bcf5d053f0cda554870067 Mon Sep 17 00:00:00 2001 From: yiguolei Date: Sat, 22 Nov 2025 20:49:46 +0800 Subject: [PATCH] [bugfix](memory) should count memory when cancel query is called (#58252) *** Current BE git commitID: 4945aa0397 *** *** SIGABRT unknown detail explain (@0x3c54) received by PID 15444 (TID 16396 OR 0x7b2e9f39b700) from PID 15444; stack trace: *** 0# doris::signal::(anonymous namespace)::FailureSignalHandler(int, siginfo_t*, void*) at /root/doris/be/src/common/signal_handler.h:420 1# 0x00007F3421D1D420 in /lib/x86_64-linux-gnu/libpthread.so.0 2# raise at ../sysdeps/unix/sysv/linux/raise.c:51 3# abort at /build/glibc-SzIz7B/glibc-2.31/stdlib/abort.c:81 4# 0x000055A425A57985 in /home/work/unlimit_teamcity/TeamCity/Agents/20251120181904agent_172.16.0.6_1/work/60183217f6ee2a9c/output/be/lib/doris_be 5# 0x000055A425A4923A in /home/work/unlimit_teamcity/TeamCity/Agents/20251120181904agent_172.16.0.6_1/work/60183217f6ee2a9c/output/be/lib/doris_be 6# google::LogMessage::SendToLog() in /home/work/unlimit_teamcity/TeamCity/Agents/20251120181904agent_172.16.0.6_1/work/60183217f6ee2a9c/output/be/lib/doris_be 7# google::LogMessage::Flush() in /home/work/unlimit_teamcity/TeamCity/Agents/20251120181904agent_172.16.0.6_1/work/60183217f6ee2a9c/output/be/lib/doris_be 8# google::LogMessageFatal::~LogMessageFatal() in /home/work/unlimit_teamcity/TeamCity/Agents/20251120181904agent_172.16.0.6_1/work/60183217f6ee2a9c/output/be/lib/doris_be 9# doris::Status doris::Status::FatalError, std::allocator >&>(std::basic_string_view >, std::__cxx11::basic_string, std::allocator >&) at /root/doris/be/src/common/status.h:467 10# doris::io::LocalFileReader::read_at_impl(unsigned long, doris::Slice, unsigned long*, doris::io::IOContext const*) at /root/doris/be/src/io/fs/local_file_reader.cpp:151 11# doris::io::FileReader::read_at(unsigned long, doris::Slice, unsigned long*, doris::io::IOContext const*) at /root/doris/be/src/io/fs/file_reader.cpp:34 12# doris::io::S3FileSystem::upload_impl(std::filesystem::__cxx11::path const&, std::filesystem::__cxx11::path const&) at /root/doris/be/src/io/fs/s3_file_system.cpp:339 13# doris::io::RemoteFileSystem::upload(std::filesystem::__cxx11::path const&, std::filesystem::__cxx11::path const&) at /root/doris/be/src/io/fs/remote_file_system.cpp:34 14# doris::RuntimeState::get_error_log_file_path[abi:cxx11]() at /root/doris/be/src/runtime/runtime_state.cpp:418 15# doris::pipeline::PipelineFragmentContext::get_load_error_url[abi:cxx11]() at /root/doris/be/src/pipeline/pipeline_fragment_context.cpp:1806 16# doris::pipeline::PipelineFragmentContext::cancel(doris::Status) at /root/doris/be/src/pipeline/pipeline_fragment_context.cpp:202 17# doris::QueryContext::cancel_all_pipeline_context(doris::Status const&, int) in /home/work/unlimit_teamcity/TeamCity/Agents/20251120181904agent_172.16.0.6_1/work/60183217f6ee2a9c/output/be/lib/doris_be 18# doris::QueryContext::cancel(doris::Status, int) in /home/work/unlimit_teamcity/TeamCity/Agents/20251120181904agent_172.16.0.6_1/work/60183217f6ee2a9c/output/be/lib/doris_be 19# doris::FragmentMgr::cancel_query(doris::TUniqueId, doris::Status) at /root/doris/be/src/runtime/fragment_mgr.cpp:915 20# std::_Function_handler::_M_invoke(std::_Any_data const&) at /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/std_function.h:292 21# doris::WorkThreadPool::work_thread(int) in /home/work/unlimit_teamcity/TeamCity/Agents/20251120181904agent_172.16.0.6_1/work/60183217f6ee2a9c/output/be/lib/doris_be 22# execute_native_thread_routine in /home/work/unlimit_teamcity/TeamCity/Agents/20251120181904agent_172.16.0.6_1/work/60183217f6ee2a9c/output/be/lib/doris_be 23# asan_thread_start(void*) in /home/work/unlimit_teamcity/TeamCity/Agents/20251120181904agent_172.16.0.6_1/work/60183217f6ee2a9c/output/be/lib/doris_be ### Release note None ### Check List (For Author) - Test - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason - Behavior changed: - [ ] No. - [ ] Yes. - Does this need documentation? - [ ] No. - [ ] Yes. ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label --- .../partitioned_aggregation_sink_operator.cpp | 2 +- ...artitioned_aggregation_source_operator.cpp | 2 +- .../partitioned_hash_join_probe_operator.cpp | 13 ++++++------ .../partitioned_hash_join_sink_operator.cpp | 4 ++-- .../exec/spill_sort_sink_operator.cpp | 4 ++-- be/src/runtime/fragment_mgr.cpp | 1 + be/src/runtime/thread_context.cpp | 15 -------------- be/src/runtime/thread_context.h | 20 +------------------ 8 files changed, 15 insertions(+), 46 deletions(-) diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 4c6e108a871aef..2594221e15305d 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -441,7 +441,7 @@ Status PartitionedAggSinkLocalState::revoke_memory( status = Status::InternalError( "fault_inject partitioned_agg_sink " "revoke_memory canceled"); - ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status); + state->get_query_ctx()->cancel(status); return status; }); Defer defer {[&]() { diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index 4869a02dc56d7d..9adff22d52fa85 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -302,7 +302,7 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b auto st = Status::InternalError( "fault_inject partitioned_agg_source " "merge spill data canceled"); - ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, st); + state->get_query_ctx()->cancel(st); return st; }); diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 40b13e77b591b5..223a7f24013500 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -231,12 +231,12 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat return Status::OK(); }; - auto exception_catch_func = [query_id, spill_func]() { + auto exception_catch_func = [query_id, state, spill_func]() { DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_cancel", { auto status = Status::InternalError( "fault_inject partitioned_hash_join_probe " "spill_probe_blocks canceled"); - ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status); + state->get_query_ctx()->cancel(status); return status; }); @@ -347,12 +347,13 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim return status; }; - auto exception_catch_func = [read_func, query_id]() { + auto exception_catch_func = [read_func, state, query_id]() { DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_build_blocks_cancel", { auto status = Status::InternalError( "fault_inject partitioned_hash_join_probe " "recover_build_blocks canceled"); - ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status); + + state->get_query_ctx()->cancel(status); return status; }); @@ -451,12 +452,12 @@ Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim return st; }; - auto exception_catch_func = [read_func, query_id]() { + auto exception_catch_func = [read_func, state, query_id]() { DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_probe_blocks_cancel", { auto status = Status::InternalError( "fault_inject partitioned_hash_join_probe " "recover_probe_blocks canceled"); - ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status); + state->get_query_ctx()->cancel(status); return status; }); diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index a2c1b7cefc2408..ae3ef2c4d57978 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -368,12 +368,12 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory( SpillSinkRunnable spill_runnable( state, nullptr, operator_profile(), - [this, query_id] { + [this, state, query_id] { DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::revoke_memory_cancel", { auto status = Status::InternalError( "fault_inject partitioned_hash_join_sink " "revoke_memory canceled"); - ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status); + state->get_query_ctx()->cancel(status); return status; }); SCOPED_TIMER(_spill_build_timer); diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index a2308ce415fff0..bac215e3f3c544 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -264,12 +264,12 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state, return Status::OK(); }; - auto exception_catch_func = [query_id, spill_func]() { + auto exception_catch_func = [query_id, state, spill_func]() { DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_cancel", { auto status = Status::InternalError( "fault_inject spill_sort_sink " "revoke_memory canceled"); - ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status); + state->get_query_ctx()->cancel(status); return status; }); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index e7f3ca3426c0f2..0e72f3f4f5da2c 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -912,6 +912,7 @@ void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) { return; } } + SCOPED_ATTACH_TASK(query_ctx->resource_ctx()); query_ctx->cancel(reason); remove_query_context(query_id); LOG(INFO) << "Query " << print_id(query_id) diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index 9577b87a4f1a5a..7e8354f6ff624a 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -122,19 +122,4 @@ AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() { ThreadLocalHandle::del_thread_local_if_count_is_zero(); } -AddThreadMemTrackerConsumerByHook::AddThreadMemTrackerConsumerByHook( - const std::shared_ptr& mem_tracker) - : _mem_tracker(mem_tracker) { - ThreadLocalHandle::create_thread_local_if_not_exits(); - DCHECK(mem_tracker != nullptr); - use_mem_hook = true; - thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(_mem_tracker.get()); -} - -AddThreadMemTrackerConsumerByHook::~AddThreadMemTrackerConsumerByHook() { - thread_context()->thread_mem_tracker_mgr->pop_consumer_tracker(); - use_mem_hook = false; - ThreadLocalHandle::del_thread_local_if_count_is_zero(); -} - } // namespace doris diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 94f50b48d17d6c..54e565cc424ff2 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -80,13 +80,6 @@ #define SCOPED_PEAK_MEM(peak_mem) \ auto VARNAME_LINENUM(scope_peak_mem) = doris::ScopedPeakMem(peak_mem) -// Count a code segment memory (memory malloc - memory free) to MemTracker. -// Compared to count `scope_mem`, MemTracker is easier to observe from the outside and is thread-safe. -// Usage example: std::unique_ptr tracker = std::make_unique("first_tracker"); -// { SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(_mem_tracker.get()); xxx; xxx; } -#define SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(mem_tracker) \ - auto VARNAME_LINENUM(add_mem_consumer) = doris::AddThreadMemTrackerConsumerByHook(mem_tracker) - #define SCOPED_SKIP_MEMORY_CHECK() \ auto VARNAME_LINENUM(scope_skip_memory_check) = doris::ScopeSkipMemoryCheck() @@ -154,8 +147,6 @@ static std::string NO_THREAD_CONTEXT_MSG = // Is true after ThreadContext construction. inline thread_local bool pthread_context_ptr_init = false; inline thread_local constinit ThreadContext* thread_context_ptr = nullptr; -// use mem hook to consume thread mem tracker. -inline thread_local bool use_mem_hook = false; // The thread context saves some info about a working thread. // 2 required info: @@ -383,15 +374,6 @@ class AddThreadMemTrackerConsumer { bool _need_pop = false; }; -class AddThreadMemTrackerConsumerByHook { -public: - explicit AddThreadMemTrackerConsumerByHook(const std::shared_ptr& mem_tracker); - ~AddThreadMemTrackerConsumerByHook(); - -private: - std::shared_ptr _mem_tracker; -}; - class ScopeSkipMemoryCheck { public: explicit ScopeSkipMemoryCheck() { @@ -409,7 +391,7 @@ class ScopeSkipMemoryCheck { // must call create_thread_local_if_not_exits() before use thread_context(). #define CONSUME_THREAD_MEM_TRACKER(size) \ do { \ - if (size == 0 || doris::use_mem_hook) { \ + if (size == 0) { \ break; \ } \ if (doris::pthread_context_ptr_init) { \