diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 587862beffd9ca..342638786eee90 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -211,6 +211,7 @@ DEFINE_Int32(check_consistency_worker_count, "1"); DEFINE_Int32(upload_worker_count, "1"); // the count of thread to download DEFINE_Int32(download_worker_count, "1"); +DEFINE_Int32(num_query_ctx_map_partitions, "128"); // the count of thread to make snapshot DEFINE_Int32(make_snapshot_worker_count, "5"); // the count of thread to release snapshot diff --git a/be/src/common/config.h b/be/src/common/config.h index bcad3ee29a2b39..5706a38b4b0636 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1350,6 +1350,8 @@ DECLARE_Int32(spill_io_thread_pool_queue_size); DECLARE_mBool(check_segment_when_build_rowset_meta); +DECLARE_Int32(num_query_ctx_map_partitions); + DECLARE_mBool(enable_s3_rate_limiter); DECLARE_mInt64(s3_get_bucket_tokens); DECLARE_mInt64(s3_get_token_per_second); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 87c165222fc69c..bfe24459c415d3 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -225,6 +225,89 @@ static std::map> _get_all_running_queries return result; } +inline uint32_t get_map_id(const TUniqueId& query_id, size_t capacity) { + uint32_t value = HashUtil::hash(&query_id.lo, 8, 0); + value = HashUtil::hash(&query_id.hi, 8, value); + return value % capacity; +} + +inline uint32_t get_map_id(std::pair key, size_t capacity) { + uint32_t value = HashUtil::hash(&key.first.lo, 8, 0); + value = HashUtil::hash(&key.first.hi, 8, value); + return value % capacity; +} + +template +ConcurrentContextMap::ConcurrentContextMap() { + _internal_map.resize(config::num_query_ctx_map_partitions); + for (size_t i = 0; i < config::num_query_ctx_map_partitions; i++) { + _internal_map[i] = {std::make_unique(), + phmap::flat_hash_map()}; + } +} + +template +Value ConcurrentContextMap::find(const Key& query_id) { + auto id = get_map_id(query_id, _internal_map.size()); + { + std::shared_lock lock(*_internal_map[id].first); + auto& map = _internal_map[id].second; + auto search = map.find(query_id); + if (search != map.end()) { + return search->second; + } + return std::shared_ptr(nullptr); + } +} + +template +Status ConcurrentContextMap::apply_if_not_exists( + const Key& query_id, std::shared_ptr& query_ctx, ApplyFunction&& function) { + auto id = get_map_id(query_id, _internal_map.size()); + { + std::unique_lock lock(*_internal_map[id].first); + auto& map = _internal_map[id].second; + auto search = map.find(query_id); + if (search != map.end()) { + query_ctx = search->second.lock(); + } + if (!query_ctx) { + return function(map); + } + return Status::OK(); + } +} + +template +void ConcurrentContextMap::erase(const Key& query_id) { + auto id = get_map_id(query_id, _internal_map.size()); + { + std::unique_lock lock(*_internal_map[id].first); + auto& map = _internal_map[id].second; + map.erase(query_id); + } +} + +template +void ConcurrentContextMap::insert(const Key& query_id, + std::shared_ptr query_ctx) { + auto id = get_map_id(query_id, _internal_map.size()); + { + std::unique_lock lock(*_internal_map[id].first); + auto& map = _internal_map[id].second; + map.insert({query_id, query_ctx}); + } +} + +template +void ConcurrentContextMap::clear() { + for (auto& pair : _internal_map) { + std::unique_lock lock(*pair.first); + auto& map = pair.second; + map.clear(); + } +} + FragmentMgr::FragmentMgr(ExecEnv* exec_env) : _exec_env(exec_env), _stop_background_threads_latch(1) { _entity = DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr"); @@ -253,14 +336,8 @@ void FragmentMgr::stop() { } // Only me can delete - { - std::unique_lock lock(_query_ctx_map_mutex); - _query_ctx_map.clear(); - } - { - std::unique_lock lock(_pipeline_map_mutex); - _pipeline_map.clear(); - } + _query_ctx_map.clear(); + _pipeline_map.clear(); _thread_pool->shutdown(); } @@ -640,17 +717,13 @@ void FragmentMgr::remove_pipeline_context( g_fragment_executing_count << -1; g_fragment_last_active_time.set_value(now); - std::unique_lock lock(_pipeline_map_mutex); _pipeline_map.erase({query_id, f_context->get_fragment_id()}); } std::shared_ptr FragmentMgr::get_query_ctx(const TUniqueId& query_id) { - std::shared_lock lock(_query_ctx_map_mutex); - auto search = _query_ctx_map.find(query_id); - if (search != _query_ctx_map.end()) { - if (auto q_ctx = search->second.lock()) { - return q_ctx; - } + auto val = _query_ctx_map.find(query_id); + if (auto q_ctx = val.lock()) { + return q_ctx; } return nullptr; } @@ -677,67 +750,66 @@ Status FragmentMgr::_get_or_create_query_ctx(const TPipelineFragmentParams& para } } else { if (!query_ctx) { - std::unique_lock lock(_query_ctx_map_mutex); - // Only one thread need create query ctx. other thread just get query_ctx in _query_ctx_map. - auto search = _query_ctx_map.find(query_id); - if (search != _query_ctx_map.end()) { - query_ctx = search->second.lock(); - } - - if (!query_ctx) { - WorkloadGroupPtr workload_group_ptr = nullptr; - std::string wg_info_str = "Workload Group not set"; - if (params.__isset.workload_groups && !params.workload_groups.empty()) { - uint64_t wg_id = params.workload_groups[0].id; - workload_group_ptr = _exec_env->workload_group_mgr()->get_group(wg_id); - if (workload_group_ptr != nullptr) { - wg_info_str = workload_group_ptr->debug_string(); - } else { - wg_info_str = "set wg but not find it in be"; - } - } + RETURN_IF_ERROR(_query_ctx_map.apply_if_not_exists( + query_id, query_ctx, + [&](phmap::flat_hash_map>& map) + -> Status { + WorkloadGroupPtr workload_group_ptr = nullptr; + std::string wg_info_str = "Workload Group not set"; + if (params.__isset.workload_groups && !params.workload_groups.empty()) { + uint64_t wg_id = params.workload_groups[0].id; + workload_group_ptr = _exec_env->workload_group_mgr()->get_group(wg_id); + if (workload_group_ptr != nullptr) { + wg_info_str = workload_group_ptr->debug_string(); + } else { + wg_info_str = "set wg but not find it in be"; + } + } - // First time a fragment of a query arrived. print logs. - LOG(INFO) << "query_id: " << print_id(query_id) << ", coord_addr: " << params.coord - << ", total fragment num on current host: " << params.fragment_num_on_host - << ", fe process uuid: " << params.query_options.fe_process_uuid - << ", query type: " << params.query_options.query_type - << ", report audit fe:" << params.current_connect_fe - << ", use wg:" << wg_info_str; - - // This may be a first fragment request of the query. - // Create the query fragments context. - query_ctx = QueryContext::create_shared(query_id, _exec_env, params.query_options, - params.coord, pipeline, params.is_nereids, - params.current_connect_fe, query_source); - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker); - RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), params.desc_tbl, - &(query_ctx->desc_tbl))); - // set file scan range params - if (params.__isset.file_scan_params) { - query_ctx->file_scan_range_params_map = params.file_scan_params; - } + // First time a fragment of a query arrived. print logs. + LOG(INFO) << "query_id: " << print_id(query_id) + << ", coord_addr: " << params.coord + << ", total fragment num on current host: " + << params.fragment_num_on_host + << ", fe process uuid: " << params.query_options.fe_process_uuid + << ", query type: " << params.query_options.query_type + << ", report audit fe:" << params.current_connect_fe + << ", use wg:" << wg_info_str; + + // This may be a first fragment request of the query. + // Create the query fragments context. + query_ctx = QueryContext::create_shared( + query_id, _exec_env, params.query_options, params.coord, pipeline, + params.is_nereids, params.current_connect_fe, query_source); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker); + RETURN_IF_ERROR(DescriptorTbl::create( + &(query_ctx->obj_pool), params.desc_tbl, &(query_ctx->desc_tbl))); + // set file scan range params + if (params.__isset.file_scan_params) { + query_ctx->file_scan_range_params_map = params.file_scan_params; + } - query_ctx->query_globals = params.query_globals; + query_ctx->query_globals = params.query_globals; - if (params.__isset.resource_info) { - query_ctx->user = params.resource_info.user; - query_ctx->group = params.resource_info.group; - query_ctx->set_rsc_info = true; - } + if (params.__isset.resource_info) { + query_ctx->user = params.resource_info.user; + query_ctx->group = params.resource_info.group; + query_ctx->set_rsc_info = true; + } - _set_scan_concurrency(params, query_ctx.get()); + _set_scan_concurrency(params, query_ctx.get()); - if (workload_group_ptr != nullptr) { - RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, query_ctx)); - query_ctx->set_workload_group(workload_group_ptr); - _exec_env->runtime_query_statistics_mgr()->set_workload_group_id( - print_id(query_id), workload_group_ptr->id()); - } - // There is some logic in query ctx's dctor, we could not check if exists and delete the - // temp query ctx now. For example, the query id maybe removed from workload group's queryset. - _query_ctx_map.insert({query_id, query_ctx}); - } + if (workload_group_ptr != nullptr) { + RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, query_ctx)); + query_ctx->set_workload_group(workload_group_ptr); + _exec_env->runtime_query_statistics_mgr()->set_workload_group_id( + print_id(query_id), workload_group_ptr->id()); + } + // There is some logic in query ctx's dctor, we could not check if exists and delete the + // temp query ctx now. For example, the query id maybe removed from workload group's queryset. + map.insert({query_id, query_ctx}); + return Status::OK(); + })); } } return Status::OK(); @@ -754,24 +826,30 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) { { fmt::format_to(debug_string_buffer, "{} pipeline fragment contexts are still running! duration_limit={}\n", - _pipeline_map.size(), duration); + _pipeline_map.num_items(), duration); timespec now; clock_gettime(CLOCK_MONOTONIC, &now); - std::shared_lock lock(_pipeline_map_mutex); - for (auto& it : _pipeline_map) { - auto elapsed = it.second->elapsed_time() / 1000000000.0; - if (elapsed < duration) { - // Only display tasks which has been running for more than {duration} seconds. - continue; + _pipeline_map.apply([&](phmap::flat_hash_map< + std::pair, + std::shared_ptr>& map) + -> Status { + for (auto& it : map) { + auto elapsed = it.second->elapsed_time() / 1000000000.0; + if (elapsed < duration) { + // Only display tasks which has been running for more than {duration} seconds. + continue; + } + auto timeout_second = it.second->timeout_second(); + fmt::format_to( + debug_string_buffer, + "No.{} (elapse_second={}s, query_timeout_second={}s, is_timeout={}) : {}\n", + i, elapsed, timeout_second, it.second->is_timeout(now), + it.second->debug_string()); + i++; } - auto timeout_second = it.second->timeout_second(); - fmt::format_to( - debug_string_buffer, - "No.{} (elapse_second={}s, query_timeout_second={}s, is_timeout={}) : {}\n", i, - elapsed, timeout_second, it.second->is_timeout(now), it.second->debug_string()); - i++; - } + return Status::OK(); + }); } return fmt::to_string(debug_string_buffer); } @@ -842,14 +920,13 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, g_fragment_last_active_time.set_value(now); // (query_id, fragment_id) is executed only on one BE, locks _pipeline_map. - std::unique_lock lock(_pipeline_map_mutex); - auto iter = _pipeline_map.find({params.query_id, params.fragment_id}); - if (iter != _pipeline_map.end()) { + auto res = _pipeline_map.find({params.query_id, params.fragment_id}); + if (res != nullptr) { return Status::InternalError( "exec_plan_fragment query_id({}) input duplicated fragment_id({})", print_id(params.query_id), params.fragment_id); } - _pipeline_map.insert({{params.query_id, params.fragment_id}, context}); + _pipeline_map.insert({params.query_id, params.fragment_id}, context); } if (!params.__isset.need_wait_execution_trigger || !params.need_wait_execution_trigger) { @@ -890,10 +967,7 @@ void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) { } } query_ctx->cancel(reason); - { - std::unique_lock l(_query_ctx_map_mutex); - _query_ctx_map.erase(query_id); - } + _query_ctx_map.erase(query_id); LOG(INFO) << "Query " << print_id(query_id) << " is cancelled and removed. Reason: " << reason.to_string(); } @@ -926,119 +1000,130 @@ void FragmentMgr::cancel_worker() { } std::vector> ctx; - { - std::shared_lock lock(_pipeline_map_mutex); - ctx.reserve(_pipeline_map.size()); - for (auto& pipeline_itr : _pipeline_map) { - ctx.push_back(pipeline_itr.second); - } - } + _pipeline_map.apply( + [&](phmap::flat_hash_map, + std::shared_ptr>& map) + -> Status { + ctx.reserve(ctx.size() + map.size()); + for (auto& pipeline_itr : map) { + ctx.push_back(pipeline_itr.second); + } + return Status::OK(); + }); for (auto& c : ctx) { c->clear_finished_tasks(); } { - { - // TODO: Now only the cancel worker do the GC the _query_ctx_map. each query must - // do erase the finish query unless in _query_ctx_map. Rethink the logic is ok - std::unique_lock lock(_query_ctx_map_mutex); - for (auto it = _query_ctx_map.begin(); it != _query_ctx_map.end();) { - if (auto q_ctx = it->second.lock()) { - if (q_ctx->is_timeout(now)) { - LOG_WARNING("Query {} is timeout", print_id(it->first)); - queries_timeout.push_back(it->first); + _query_ctx_map.apply( + [&](phmap::flat_hash_map>& map) + -> Status { + for (auto it = map.begin(); it != map.end();) { + if (auto q_ctx = it->second.lock()) { + if (q_ctx->is_timeout(now)) { + LOG_WARNING("Query {} is timeout", print_id(it->first)); + queries_timeout.push_back(it->first); + } + ++it; + } else { + it = map.erase(it); + } } - ++it; - } else { - it = _query_ctx_map.erase(it); - } - } - } + return Status::OK(); + }); - std::shared_lock lock(_query_ctx_map_mutex); // We use a very conservative cancel strategy. // 0. If there are no running frontends, do not cancel any queries. // 1. If query's process uuid is zero, do not cancel // 2. If same process uuid, do not cancel // 3. If fe has zero process uuid, do not cancel - if (running_fes.empty() && !_query_ctx_map.empty()) { + if (running_fes.empty() && _query_ctx_map.num_items() != 0) { LOG_EVERY_N(WARNING, 10) << "Could not find any running frontends, maybe we are upgrading or " "starting? " << "We will not cancel any outdated queries in this situation."; } else { - for (const auto& it : _query_ctx_map) { - if (auto q_ctx = it.second.lock()) { - const int64_t fe_process_uuid = q_ctx->get_fe_process_uuid(); - - if (fe_process_uuid == 0) { - // zero means this query is from a older version fe or - // this fe is starting - continue; - } - - // If the query is not running on the any frontends, cancel it. - if (auto itr = running_queries_on_all_fes.find(fe_process_uuid); - itr != running_queries_on_all_fes.end()) { - // Query not found on this frontend, and the query arrives before the last check - if (itr->second.find(it.first) == itr->second.end() && - // tv_nsec represents the number of nanoseconds that have elapsed since the time point stored in tv_sec. - // tv_sec is enough, we do not need to check tv_nsec. - q_ctx->get_query_arrival_timestamp().tv_sec < - check_invalid_query_last_timestamp.tv_sec && - q_ctx->get_query_source() == QuerySource::INTERNAL_FRONTEND) { - queries_pipeline_task_leak.push_back(q_ctx->query_id()); - LOG_INFO( - "Query {}, type {} is not found on any frontends, maybe it " - "is leaked.", - print_id(q_ctx->query_id()), - toString(q_ctx->get_query_source())); + _query_ctx_map.apply([&](phmap::flat_hash_map>& map) + -> Status { + for (const auto& it : map) { + if (auto q_ctx = it.second.lock()) { + const int64_t fe_process_uuid = q_ctx->get_fe_process_uuid(); + + if (fe_process_uuid == 0) { + // zero means this query is from a older version fe or + // this fe is starting continue; } - } - auto itr = running_fes.find(q_ctx->coord_addr); - if (itr != running_fes.end()) { - if (fe_process_uuid == itr->second.info.process_uuid || - itr->second.info.process_uuid == 0) { - continue; - } else { - LOG_WARNING( - "Coordinator of query {} restarted, going to cancel it.", - print_id(q_ctx->query_id())); + // If the query is not running on the any frontends, cancel it. + if (auto itr = running_queries_on_all_fes.find(fe_process_uuid); + itr != running_queries_on_all_fes.end()) { + // Query not found on this frontend, and the query arrives before the last check + if (itr->second.find(it.first) == itr->second.end() && + // tv_nsec represents the number of nanoseconds that have elapsed since the time point stored in tv_sec. + // tv_sec is enough, we do not need to check tv_nsec. + q_ctx->get_query_arrival_timestamp().tv_sec < + check_invalid_query_last_timestamp.tv_sec && + q_ctx->get_query_source() == QuerySource::INTERNAL_FRONTEND) { + queries_pipeline_task_leak.push_back(q_ctx->query_id()); + LOG_INFO( + "Query {}, type {} is not found on any frontends, " + "maybe it " + "is leaked.", + print_id(q_ctx->query_id()), + toString(q_ctx->get_query_source())); + continue; + } } - } else { - // In some rear cases, the rpc port of follower is not updated in time, - // then the port of this follower will be zero, but acutally it is still running, - // and be has already received the query from follower. - // So we need to check if host is in running_fes. - bool fe_host_is_standing = std::any_of( - running_fes.begin(), running_fes.end(), - [&q_ctx](const auto& fe) { - return fe.first.hostname == q_ctx->coord_addr.hostname && - fe.first.port == 0; - }); - if (fe_host_is_standing) { - LOG_WARNING( - "Coordinator {}:{} is not found, but its host is still " - "running with an unstable brpc port, not going to cancel " - "it.", - q_ctx->coord_addr.hostname, q_ctx->coord_addr.port, - print_id(q_ctx->query_id())); - continue; + + auto itr = running_fes.find(q_ctx->coord_addr); + if (itr != running_fes.end()) { + if (fe_process_uuid == itr->second.info.process_uuid || + itr->second.info.process_uuid == 0) { + continue; + } else { + LOG_WARNING( + "Coordinator of query {} restarted, going to cancel " + "it.", + print_id(q_ctx->query_id())); + } } else { - LOG_WARNING( - "Could not find target coordinator {}:{} of query {}, " - "going to " - "cancel it.", - q_ctx->coord_addr.hostname, q_ctx->coord_addr.port, - print_id(q_ctx->query_id())); + // In some rear cases, the rpc port of follower is not updated in time, + // then the port of this follower will be zero, but acutally it is still running, + // and be has already received the query from follower. + // So we need to check if host is in running_fes. + bool fe_host_is_standing = + std::any_of(running_fes.begin(), running_fes.end(), + [&q_ctx](const auto& fe) { + return fe.first.hostname == + q_ctx->coord_addr.hostname && + fe.first.port == 0; + }); + if (fe_host_is_standing) { + LOG_WARNING( + "Coordinator {}:{} is not found, but its host is still " + "running with an unstable brpc port, not going to " + "cancel " + "it.", + q_ctx->coord_addr.hostname, q_ctx->coord_addr.port, + print_id(q_ctx->query_id())); + continue; + } else { + LOG_WARNING( + "Could not find target coordinator {}:{} of query {}, " + "going to " + "cancel it.", + q_ctx->coord_addr.hostname, q_ctx->coord_addr.port, + print_id(q_ctx->query_id())); + } } } + // Coordinator of this query has already dead or query context has been released. + queries_lost_coordinator.push_back(it.first); } - // Coordinator of this query has already dead or query context has been released. - queries_lost_coordinator.push_back(it.first); - } + return Status::OK(); + }); } } @@ -1179,24 +1264,18 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request, const auto& fragment_ids = request->fragment_ids(); { - std::shared_lock lock(_pipeline_map_mutex); for (auto fragment_id : fragment_ids) { - if (is_pipeline) { - auto iter = _pipeline_map.find( - {UniqueId(request->query_id()).to_thrift(), fragment_id}); - if (iter == _pipeline_map.end()) { - continue; - } - pip_context = iter->second; - - DCHECK(pip_context != nullptr); - runtime_filter_mgr = pip_context->get_query_ctx()->runtime_filter_mgr(); - query_thread_context = {pip_context->get_query_ctx()->query_id(), - pip_context->get_query_ctx()->query_mem_tracker, - pip_context->get_query_ctx()->workload_group()}; - } else { - return Status::InternalError("Non-pipeline is disabled!"); + pip_context = + _pipeline_map.find({UniqueId(request->query_id()).to_thrift(), fragment_id}); + if (pip_context == nullptr) { + continue; } + + DCHECK(pip_context != nullptr); + runtime_filter_mgr = pip_context->get_query_ctx()->runtime_filter_mgr(); + query_thread_context = {pip_context->get_query_ctx()->query_id(), + pip_context->get_query_ctx()->query_mem_tracker, + pip_context->get_query_ctx()->workload_group()}; break; } } @@ -1294,22 +1373,24 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, } void FragmentMgr::get_runtime_query_info(std::vector* query_info_list) { - { - std::unique_lock lock(_query_ctx_map_mutex); - for (auto iter = _query_ctx_map.begin(); iter != _query_ctx_map.end();) { - if (auto q_ctx = iter->second.lock()) { - WorkloadQueryInfo workload_query_info; - workload_query_info.query_id = print_id(iter->first); - workload_query_info.tquery_id = iter->first; - workload_query_info.wg_id = - q_ctx->workload_group() == nullptr ? -1 : q_ctx->workload_group()->id(); - query_info_list->push_back(workload_query_info); - iter++; - } else { - iter = _query_ctx_map.erase(iter); - } - } - } + _query_ctx_map.apply( + [&](phmap::flat_hash_map>& map) -> Status { + for (auto iter = map.begin(); iter != map.end();) { + if (auto q_ctx = iter->second.lock()) { + WorkloadQueryInfo workload_query_info; + workload_query_info.query_id = print_id(iter->first); + workload_query_info.tquery_id = iter->first; + workload_query_info.wg_id = q_ctx->workload_group() == nullptr + ? -1 + : q_ctx->workload_group()->id(); + query_info_list->push_back(workload_query_info); + iter++; + } else { + iter = map.erase(iter); + } + } + return Status::OK(); + }); } Status FragmentMgr::get_realtime_exec_status(const TUniqueId& query_id, diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 0e7691647dd47d..fb01c899104a98 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -69,6 +69,46 @@ class WorkloadQueryInfo; std::string to_load_error_http_path(const std::string& file_name); +template +class ConcurrentContextMap { +public: + using ApplyFunction = std::function&)>; + ConcurrentContextMap(); + Value find(const Key& query_id); + void insert(const Key& query_id, std::shared_ptr); + void clear(); + void erase(const Key& query_id); + size_t num_items() const { + size_t n = 0; + for (auto& pair : _internal_map) { + std::shared_lock lock(*pair.first); + auto& map = pair.second; + n += map.size(); + } + return n; + } + void apply(ApplyFunction&& function) { + for (auto& pair : _internal_map) { + // TODO: Now only the cancel worker do the GC the _query_ctx_map. each query must + // do erase the finish query unless in _query_ctx_map. Rethink the logic is ok + std::unique_lock lock(*pair.first); + static_cast(function(pair.second)); + } + } + + Status apply_if_not_exists(const Key& query_id, std::shared_ptr& query_ctx, + ApplyFunction&& function); + +private: + // The lock should only be used to protect the structures in fragment manager. Has to be + // used in a very small scope because it may dead lock. For example, if the _lock is used + // in prepare stage, the call path is prepare --> expr prepare --> may call allocator + // when allocate failed, allocator may call query_is_cancelled, query is callced will also + // call _lock, so that there is dead lock. + std::vector, phmap::flat_hash_map>> + _internal_map; +}; + // This class used to manage all the fragment execute in this instance class FragmentMgr : public RestMonitorIface { public: @@ -131,10 +171,7 @@ class FragmentMgr : public RestMonitorIface { ThreadPool* get_thread_pool() { return _thread_pool.get(); } - int32_t running_query_num() { - std::shared_lock lock(_query_ctx_map_mutex); - return _query_ctx_map.size(); - } + int32_t running_query_num() { return _query_ctx_map.num_items(); } std::string dump_pipeline_tasks(int64_t duration = 0); std::string dump_pipeline_tasks(TUniqueId& query_id); @@ -164,21 +201,14 @@ class FragmentMgr : public RestMonitorIface { // This is input params ExecEnv* _exec_env = nullptr; - // The lock protect the `_pipeline_map` - std::shared_mutex _pipeline_map_mutex; // (QueryID, FragmentID) -> PipelineFragmentContext - phmap::flat_hash_map, - std::shared_ptr> + ConcurrentContextMap, + std::shared_ptr, + pipeline::PipelineFragmentContext> _pipeline_map; - // The lock should only be used to protect the structures in fragment manager. Has to be - // used in a very small scope because it may dead lock. For example, if the _lock is used - // in prepare stage, the call path is prepare --> expr prepare --> may call allocator - // when allocate failed, allocator may call query_is_cancelled, query is callced will also - // call _lock, so that there is dead lock. - std::shared_mutex _query_ctx_map_mutex; // query id -> QueryContext - phmap::flat_hash_map> _query_ctx_map; + ConcurrentContextMap, QueryContext> _query_ctx_map; std::unordered_map> _bf_size_map; CountDownLatch _stop_background_threads_latch;