From ff37b66d8aef032bde766067410b6a8dbfe655f7 Mon Sep 17 00:00:00 2001 From: yiguolei Date: Thu, 26 Dec 2024 14:09:35 +0800 Subject: [PATCH] [refactor](execenv) remove shared ptr from exec env --- be/src/cloud/cloud_stream_load_executor.h | 2 + be/src/runtime/exec_env.cpp | 6 --- be/src/runtime/exec_env.h | 30 ++++++-------- be/src/runtime/exec_env_init.cpp | 39 ++++++++++++++++--- be/src/runtime/memory/lru_cache_policy.h | 12 +++--- be/test/http/stream_load_test.cpp | 4 +- be/test/olap/wal/wal_manager_test.cpp | 13 ++++--- .../routine_load_task_executor_test.cpp | 18 +++++---- be/test/testutil/run_all_tests.cpp | 1 - be/test/vec/exec/vwal_scanner_test.cpp | 3 +- 10 files changed, 75 insertions(+), 53 deletions(-) diff --git a/be/src/cloud/cloud_stream_load_executor.h b/be/src/cloud/cloud_stream_load_executor.h index b0cb91d06ac42a..d04e55feba552e 100644 --- a/be/src/cloud/cloud_stream_load_executor.h +++ b/be/src/cloud/cloud_stream_load_executor.h @@ -21,6 +21,8 @@ namespace doris { class CloudStreamLoadExecutor final : public StreamLoadExecutor { + ENABLE_FACTORY_CREATOR(CloudStreamLoadExecutor); + public: CloudStreamLoadExecutor(ExecEnv* exec_env); diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index ab24d7ca192689..e3a71261b677eb 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -38,12 +38,6 @@ namespace doris { -ExecEnv::ExecEnv() = default; - -ExecEnv::~ExecEnv() { - destroy(); -} - #ifdef BE_TEST void ExecEnv::set_inverted_index_searcher_cache( segment_v2::InvertedIndexSearcherCache* inverted_index_searcher_cache) { diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 636ce2bf288b58..0c9a4158ebc9cb 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -244,14 +244,14 @@ class ExecEnv { } LoadChannelMgr* load_channel_mgr() { return _load_channel_mgr; } LoadStreamMgr* load_stream_mgr() { return _load_stream_mgr.get(); } - std::shared_ptr new_load_stream_mgr() { return _new_load_stream_mgr; } + NewLoadStreamMgr* new_load_stream_mgr() { return _new_load_stream_mgr.get(); } SmallFileMgr* small_file_mgr() { return _small_file_mgr; } doris::vectorized::SpillStreamManager* spill_stream_mgr() { return _spill_stream_mgr; } GroupCommitMgr* group_commit_mgr() { return _group_commit_mgr; } const std::vector& store_paths() const { return _store_paths; } - std::shared_ptr stream_load_executor() { return _stream_load_executor; } + StreamLoadExecutor* stream_load_executor() { return _stream_load_executor.get(); } RoutineLoadTaskExecutor* routine_load_task_executor() { return _routine_load_task_executor; } HeartbeatFlags* heartbeat_flags() { return _heartbeat_flags; } vectorized::ScannerScheduler* scanner_scheduler() { return _scanner_scheduler; } @@ -273,12 +273,10 @@ class ExecEnv { _memtable_memory_limiter.reset(limiter); } void set_cluster_info(ClusterInfo* cluster_info) { this->_cluster_info = cluster_info; } - void set_new_load_stream_mgr(std::shared_ptr new_load_stream_mgr) { - this->_new_load_stream_mgr = new_load_stream_mgr; - } - void set_stream_load_executor(std::shared_ptr stream_load_executor) { - this->_stream_load_executor = stream_load_executor; - } + void set_new_load_stream_mgr(std::unique_ptr&& new_load_stream_mgr); + void clear_new_load_stream_mgr(); + void set_stream_load_executor(std::unique_ptr&& stream_load_executor); + void clear_stream_load_executor(); void set_storage_engine(std::unique_ptr&& engine); void set_inverted_index_searcher_cache( @@ -294,10 +292,9 @@ class ExecEnv { void set_routine_load_task_executor(RoutineLoadTaskExecutor* r) { this->_routine_load_task_executor = r; } - void set_wal_mgr(std::shared_ptr wm) { this->_wal_manager = wm; } - void set_dummy_lru_cache(std::shared_ptr dummy_lru_cache) { - this->_dummy_lru_cache = dummy_lru_cache; - } + void set_wal_mgr(std::unique_ptr&& wm); + void clear_wal_mgr(); + void set_write_cooldown_meta_executors(); static void set_tracking_memory(bool tracking_memory) { _s_tracking_memory.store(tracking_memory, std::memory_order_release); @@ -331,7 +328,6 @@ class ExecEnv { return _inverted_index_query_cache; } QueryCache* get_query_cache() { return _query_cache; } - std::shared_ptr get_dummy_lru_cache() { return _dummy_lru_cache; } pipeline::RuntimeFilterTimerQueue* runtime_filter_timer_queue() { return _runtime_filter_timer_queue; @@ -429,13 +425,12 @@ class ExecEnv { BrokerMgr* _broker_mgr = nullptr; LoadChannelMgr* _load_channel_mgr = nullptr; std::unique_ptr _load_stream_mgr; - // TODO(zhiqiang): Do not use shared_ptr in exec_env, we can not control its life cycle. - std::shared_ptr _new_load_stream_mgr; + std::unique_ptr _new_load_stream_mgr; BrpcClientCache* _internal_client_cache = nullptr; BrpcClientCache* _streaming_client_cache = nullptr; BrpcClientCache* _function_client_cache = nullptr; - std::shared_ptr _stream_load_executor; + std::unique_ptr _stream_load_executor; RoutineLoadTaskExecutor* _routine_load_task_executor = nullptr; SmallFileMgr* _small_file_mgr = nullptr; HeartbeatFlags* _heartbeat_flags = nullptr; @@ -446,7 +441,7 @@ class ExecEnv { std::unique_ptr _memtable_memory_limiter; std::unique_ptr _load_stream_map_pool; std::unique_ptr _delta_writer_v2_pool; - std::shared_ptr _wal_manager; + std::unique_ptr _wal_manager; DNSCache* _dns_cache = nullptr; std::unique_ptr _write_cooldown_meta_executors; @@ -473,7 +468,6 @@ class ExecEnv { segment_v2::InvertedIndexSearcherCache* _inverted_index_searcher_cache = nullptr; segment_v2::InvertedIndexQueryCache* _inverted_index_query_cache = nullptr; QueryCache* _query_cache = nullptr; - std::shared_ptr _dummy_lru_cache = nullptr; std::unique_ptr _file_cache_open_fd_cache; pipeline::RuntimeFilterTimerQueue* _runtime_filter_timer_queue = nullptr; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 2d7554e702969f..df66315ff059af 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -185,6 +185,12 @@ ThreadPool* ExecEnv::non_block_close_thread_pool() { #endif } +ExecEnv::ExecEnv() = default; + +ExecEnv::~ExecEnv() { + destroy(); +} + Status ExecEnv::init(ExecEnv* env, const std::vector& store_paths, const std::vector& spill_store_paths, const std::set& broken_paths) { @@ -290,16 +296,16 @@ Status ExecEnv::_init(const std::vector& store_paths, _store_paths.size() * config::flush_thread_num_per_store, static_cast(CpuInfo::num_cores()) * config::max_flush_thread_num_per_cpu); _load_stream_mgr = std::make_unique(num_flush_threads); - _new_load_stream_mgr = NewLoadStreamMgr::create_shared(); + _new_load_stream_mgr = NewLoadStreamMgr::create_unique(); _internal_client_cache = new BrpcClientCache(); _streaming_client_cache = new BrpcClientCache("baidu_std", "single", "streaming"); _function_client_cache = new BrpcClientCache(config::function_service_protocol); if (config::is_cloud_mode()) { - _stream_load_executor = std::make_shared(this); + _stream_load_executor = CloudStreamLoadExecutor::create_unique(this); } else { - _stream_load_executor = StreamLoadExecutor::create_shared(this); + _stream_load_executor = StreamLoadExecutor::create_unique(this); } _routine_load_task_executor = new RoutineLoadTaskExecutor(this); RETURN_IF_ERROR(_routine_load_task_executor->init(MemInfo::mem_limit())); @@ -309,7 +315,7 @@ Status ExecEnv::_init(const std::vector& store_paths, _load_stream_map_pool = std::make_unique(); _delta_writer_v2_pool = std::make_unique(); _file_cache_open_fd_cache = std::make_unique(); - _wal_manager = WalManager::create_shared(this, config::group_commit_wal_path); + _wal_manager = WalManager::create_unique(this, config::group_commit_wal_path); _dns_cache = new DNSCache(); _write_cooldown_meta_executors = std::make_unique(); _spill_stream_mgr = new vectorized::SpillStreamManager(std::move(spill_store_map)); @@ -464,8 +470,6 @@ Status ExecEnv::_init_mem_env() { return Status::InternalError(ss.str()); } - _dummy_lru_cache = std::make_shared(); - _cache_manager = CacheManager::create_global_instance(); int64_t storage_cache_limit = @@ -681,7 +685,30 @@ void ExecEnv::_deregister_metrics() { DEREGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num); DEREGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size); } +#ifdef BE_TEST +void ExecEnv::set_new_load_stream_mgr(std::unique_ptr&& new_load_stream_mgr) { + this->_new_load_stream_mgr = std::move(new_load_stream_mgr); +} + +void ExecEnv::clear_new_load_stream_mgr() { + this->_new_load_stream_mgr.reset(); +} +void ExecEnv::set_stream_load_executor(std::unique_ptr&& stream_load_executor) { + this->_stream_load_executor = std::move(stream_load_executor); +} + +void ExecEnv::clear_stream_load_executor() { + this->_stream_load_executor.reset(); +} + +void ExecEnv::set_wal_mgr(std::unique_ptr&& wm) { + this->_wal_manager = std::move(wm); +} +void ExecEnv::clear_wal_mgr() { + this->_wal_manager.reset(); +} +#endif // TODO(zhiqiang): Need refactor all thread pool. Each thread pool must have a Stop method. // We need to stop all threads before releasing resource. void ExecEnv::destroy() { diff --git a/be/src/runtime/memory/lru_cache_policy.h b/be/src/runtime/memory/lru_cache_policy.h index 7e73f2dd76b566..7e02247efb89bc 100644 --- a/be/src/runtime/memory/lru_cache_policy.h +++ b/be/src/runtime/memory/lru_cache_policy.h @@ -44,8 +44,7 @@ class LRUCachePolicy : public CachePolicy { new ShardedLRUCache(type_string(type), capacity, lru_cache_type, num_shards, element_count_capacity, is_lru_k)); } else { - CHECK(ExecEnv::GetInstance()->get_dummy_lru_cache()); - _cache = ExecEnv::GetInstance()->get_dummy_lru_cache(); + _cache = std::make_shared(); } _init_mem_tracker(lru_cache_type_string(lru_cache_type)); } @@ -64,8 +63,7 @@ class LRUCachePolicy : public CachePolicy { cache_value_time_extractor, cache_value_check_timestamp, element_count_capacity, is_lru_k)); } else { - CHECK(ExecEnv::GetInstance()->get_dummy_lru_cache()); - _cache = ExecEnv::GetInstance()->get_dummy_lru_cache(); + _cache = std::make_shared(); } _init_mem_tracker(lru_cache_type_string(lru_cache_type)); } @@ -157,7 +155,7 @@ class LRUCachePolicy : public CachePolicy { std::lock_guard l(_lock); COUNTER_SET(_freed_entrys_counter, (int64_t)0); COUNTER_SET(_freed_memory_counter, (int64_t)0); - if (_stale_sweep_time_s <= 0 || _cache == ExecEnv::GetInstance()->get_dummy_lru_cache()) { + if (_stale_sweep_time_s <= 0 || std::dynamic_pointer_cast(_cache)) { return; } if (exceed_prune_limit()) { @@ -204,7 +202,7 @@ class LRUCachePolicy : public CachePolicy { std::lock_guard l(_lock); COUNTER_SET(_freed_entrys_counter, (int64_t)0); COUNTER_SET(_freed_memory_counter, (int64_t)0); - if (_cache == ExecEnv::GetInstance()->get_dummy_lru_cache()) { + if (std::dynamic_pointer_cast(_cache)) { return; } if ((force && mem_consumption() != 0) || exceed_prune_limit()) { @@ -246,7 +244,7 @@ class LRUCachePolicy : public CachePolicy { COUNTER_SET(_freed_entrys_counter, (int64_t)0); COUNTER_SET(_freed_memory_counter, (int64_t)0); COUNTER_SET(_cost_timer, (int64_t)0); - if (_cache == ExecEnv::GetInstance()->get_dummy_lru_cache()) { + if (std::dynamic_pointer_cast(_cache)) { return 0; } diff --git a/be/test/http/stream_load_test.cpp b/be/test/http/stream_load_test.cpp index d797c081f41995..faa582704d11cc 100644 --- a/be/test/http/stream_load_test.cpp +++ b/be/test/http/stream_load_test.cpp @@ -54,11 +54,11 @@ void http_request_done_cb(struct evhttp_request* req, void* arg) { TEST_F(StreamLoadTest, TestHeader) { // 1G - auto wal_mgr = WalManager::create_shared(ExecEnv::GetInstance(), config::group_commit_wal_path); + auto wal_mgr = WalManager::create_unique(ExecEnv::GetInstance(), config::group_commit_wal_path); static_cast(wal_mgr->_wal_dirs_info->add("test_path_1", 1000, 0, 0)); static_cast(wal_mgr->_wal_dirs_info->add("test_path_2", 10000, 0, 0)); static_cast(wal_mgr->_wal_dirs_info->add("test_path_3", 100000, 0, 0)); - ExecEnv::GetInstance()->set_wal_mgr(wal_mgr); + ExecEnv::GetInstance()->set_wal_mgr(std::move(wal_mgr)); // 1. empty info { auto* evhttp_req = evhttp_request_new(nullptr, nullptr); diff --git a/be/test/olap/wal/wal_manager_test.cpp b/be/test/olap/wal/wal_manager_test.cpp index 32162593fc05c4..5a6ce49067bf46 100644 --- a/be/test/olap/wal/wal_manager_test.cpp +++ b/be/test/olap/wal/wal_manager_test.cpp @@ -59,12 +59,12 @@ class WalManagerTest : public testing::Test { _env->_cluster_info->master_fe_addr.hostname = "host name"; _env->_cluster_info->master_fe_addr.port = 1234; _env->_cluster_info->backend_id = 1001; - _env->new_load_stream_mgr() = NewLoadStreamMgr::create_shared(); + _env->set_new_load_stream_mgr(NewLoadStreamMgr::create_unique()); _env->_internal_client_cache = new BrpcClientCache(); _env->_function_client_cache = new BrpcClientCache(); - _env->_stream_load_executor = StreamLoadExecutor::create_shared(_env); + _env->_stream_load_executor = StreamLoadExecutor::create_unique(_env); _env->_store_paths = {StorePath(std::filesystem::current_path(), 0)}; - _env->_wal_manager = WalManager::create_shared(_env, wal_dir.string()); + _env->set_wal_mgr(WalManager::create_unique(_env, wal_dir.string())); k_stream_load_begin_result = TLoadTxnBeginResult(); } void TearDown() override { @@ -78,6 +78,9 @@ class WalManagerTest : public testing::Test { SAFE_DELETE(_env->_function_client_cache); SAFE_DELETE(_env->_internal_client_cache); SAFE_DELETE(_env->_cluster_info); + _env->clear_new_load_stream_mgr(); + _env->clear_stream_load_executor(); + //_env->clear_wal_mgr(); } void prepare() { @@ -155,9 +158,9 @@ TEST_F(WalManagerTest, recovery_normal) { } TEST_F(WalManagerTest, TestDynamicWalSpaceLimt) { - auto wal_mgr = WalManager::create_shared(_env, config::group_commit_wal_path); + auto wal_mgr = WalManager::create_unique(_env, config::group_commit_wal_path); static_cast(wal_mgr->init()); - _env->set_wal_mgr(wal_mgr); + _env->set_wal_mgr(std::move(wal_mgr)); // 1T size_t available_bytes = 1099511627776; diff --git a/be/test/runtime/routine_load_task_executor_test.cpp b/be/test/runtime/routine_load_task_executor_test.cpp index 5c2b39bce1f1bd..1b758aa7aa0427 100644 --- a/be/test/runtime/routine_load_task_executor_test.cpp +++ b/be/test/runtime/routine_load_task_executor_test.cpp @@ -49,23 +49,27 @@ class RoutineLoadTaskExecutorTest : public testing::Test { RoutineLoadTaskExecutorTest() = default; ~RoutineLoadTaskExecutorTest() override = default; + ExecEnv* _env = nullptr; + void SetUp() override { + _env = ExecEnv::GetInstance(); k_stream_load_begin_result = TLoadTxnBeginResult(); k_stream_load_commit_result = TLoadTxnCommitResult(); k_stream_load_rollback_result = TLoadTxnRollbackResult(); k_stream_load_put_result = TStreamLoadPutResult(); - _env.set_cluster_info(new ClusterInfo()); - _env.set_new_load_stream_mgr(NewLoadStreamMgr::create_unique()); - _env.set_stream_load_executor(StreamLoadExecutor::create_unique(&_env)); + _env->set_cluster_info(new ClusterInfo()); + _env->set_new_load_stream_mgr(NewLoadStreamMgr::create_unique()); + _env->set_stream_load_executor(StreamLoadExecutor::create_unique(_env)); config::max_routine_load_thread_pool_size = 1024; config::max_consumer_num_per_group = 3; } - void TearDown() override { delete _env.cluster_info(); } - - ExecEnv _env; + void TearDown() override { + _env->clear_new_load_stream_mgr(); + _env->clear_stream_load_executor(); + } }; TEST_F(RoutineLoadTaskExecutorTest, exec_task) { @@ -92,7 +96,7 @@ TEST_F(RoutineLoadTaskExecutorTest, exec_task) { task.__set_kafka_load_info(k_info); - RoutineLoadTaskExecutor executor(&_env); + RoutineLoadTaskExecutor executor(_env); Status st; st = executor.init(1024 * 1024); EXPECT_TRUE(st.ok()); diff --git a/be/test/testutil/run_all_tests.cpp b/be/test/testutil/run_all_tests.cpp index 59933db80e5bb9..1208141a8fa857 100644 --- a/be/test/testutil/run_all_tests.cpp +++ b/be/test/testutil/run_all_tests.cpp @@ -58,7 +58,6 @@ int main(int argc, char** argv) { doris::ExecEnv::GetInstance()->set_cache_manager(doris::CacheManager::create_global_instance()); doris::ExecEnv::GetInstance()->set_process_profile( doris::ProcessProfile::create_global_instance()); - doris::ExecEnv::GetInstance()->set_dummy_lru_cache(std::make_shared()); doris::ExecEnv::GetInstance()->set_storage_page_cache( doris::StoragePageCache::create_global_cache(1 << 30, 10, 0)); doris::ExecEnv::GetInstance()->set_segment_loader(new doris::SegmentLoader(1000, 1000)); diff --git a/be/test/vec/exec/vwal_scanner_test.cpp b/be/test/vec/exec/vwal_scanner_test.cpp index 5c4056a8c24104..2e6d4bf5cdea76 100644 --- a/be/test/vec/exec/vwal_scanner_test.cpp +++ b/be/test/vec/exec/vwal_scanner_test.cpp @@ -79,6 +79,7 @@ class VWalScannerTest : public testing::Test { WARN_IF_ERROR(io::global_local_filesystem()->delete_directory(_wal_dir), fmt::format("fail to delete dir={}", _wal_dir)); SAFE_STOP(_env->_wal_manager); + _env->clear_wal_mgr(); } protected: @@ -286,7 +287,7 @@ void VWalScannerTest::init() { _env->_cluster_info->master_fe_addr.hostname = "host name"; _env->_cluster_info->master_fe_addr.port = _backend_id; _env->_cluster_info->backend_id = 1001; - _env->_wal_manager = WalManager::create_shared(_env, _wal_dir); + _env->set_wal_mgr(WalManager::create_unique(_env, _wal_dir)); std::string base_path; auto st = _env->_wal_manager->_init_wal_dirs_info(); st = _env->_wal_manager->create_wal_path(_db_id, _tb_id, _txn_id_1, _label_1, base_path,