From 1e38e9f61f90a2a151f3f2687c9ac088c16c5365 Mon Sep 17 00:00:00 2001 From: yiguolei Date: Sat, 28 Dec 2024 20:41:15 +0800 Subject: [PATCH] f --- be/src/runtime/exec_env.h | 3 +++ be/src/runtime/exec_env_init.cpp | 11 +++++++++++ be/test/olap/wal/wal_manager_test.cpp | 3 +++ .../runtime/routine_load_task_executor_test.cpp | 16 ++++++++++------ be/test/vec/exec/vwal_scanner_test.cpp | 1 + 5 files changed, 28 insertions(+), 6 deletions(-) diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index e29ab97593c5e5d..0c9a4158ebc9cba 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -274,7 +274,9 @@ class ExecEnv { } void set_cluster_info(ClusterInfo* cluster_info) { this->_cluster_info = cluster_info; } void set_new_load_stream_mgr(std::unique_ptr&& new_load_stream_mgr); + void clear_new_load_stream_mgr(); void set_stream_load_executor(std::unique_ptr&& stream_load_executor); + void clear_stream_load_executor(); void set_storage_engine(std::unique_ptr&& engine); void set_inverted_index_searcher_cache( @@ -291,6 +293,7 @@ class ExecEnv { this->_routine_load_task_executor = r; } void set_wal_mgr(std::unique_ptr&& wm); + void clear_wal_mgr(); void set_write_cooldown_meta_executors(); static void set_tracking_memory(bool tracking_memory) { diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index aae3ecba048cf1e..df66315ff059af8 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -690,13 +690,24 @@ void ExecEnv::set_new_load_stream_mgr(std::unique_ptr&& new_lo this->_new_load_stream_mgr = std::move(new_load_stream_mgr); } +void ExecEnv::clear_new_load_stream_mgr() { + this->_new_load_stream_mgr.reset(); +} + void ExecEnv::set_stream_load_executor(std::unique_ptr&& stream_load_executor) { this->_stream_load_executor = std::move(stream_load_executor); } +void ExecEnv::clear_stream_load_executor() { + this->_stream_load_executor.reset(); +} + void ExecEnv::set_wal_mgr(std::unique_ptr&& wm) { this->_wal_manager = std::move(wm); } +void ExecEnv::clear_wal_mgr() { + this->_wal_manager.reset(); +} #endif // TODO(zhiqiang): Need refactor all thread pool. Each thread pool must have a Stop method. // We need to stop all threads before releasing resource. diff --git a/be/test/olap/wal/wal_manager_test.cpp b/be/test/olap/wal/wal_manager_test.cpp index aaef13d6528796e..1bf1cfb02acecdf 100644 --- a/be/test/olap/wal/wal_manager_test.cpp +++ b/be/test/olap/wal/wal_manager_test.cpp @@ -78,6 +78,9 @@ class WalManagerTest : public testing::Test { SAFE_DELETE(_env->_function_client_cache); SAFE_DELETE(_env->_internal_client_cache); SAFE_DELETE(_env->_cluster_info); + _env->clear_new_load_stream_mgr(); + _env->clear_stream_load_executor(); + _env->clear_wal_mgr(); } void prepare() { diff --git a/be/test/runtime/routine_load_task_executor_test.cpp b/be/test/runtime/routine_load_task_executor_test.cpp index 5c2b39bce1f1bd4..5cd61e78c8e3466 100644 --- a/be/test/runtime/routine_load_task_executor_test.cpp +++ b/be/test/runtime/routine_load_task_executor_test.cpp @@ -49,23 +49,27 @@ class RoutineLoadTaskExecutorTest : public testing::Test { RoutineLoadTaskExecutorTest() = default; ~RoutineLoadTaskExecutorTest() override = default; + ExecEnv* _env = nullptr; + void SetUp() override { + _env = ExecEnv::GetInstance(); k_stream_load_begin_result = TLoadTxnBeginResult(); k_stream_load_commit_result = TLoadTxnCommitResult(); k_stream_load_rollback_result = TLoadTxnRollbackResult(); k_stream_load_put_result = TStreamLoadPutResult(); - _env.set_cluster_info(new ClusterInfo()); - _env.set_new_load_stream_mgr(NewLoadStreamMgr::create_unique()); - _env.set_stream_load_executor(StreamLoadExecutor::create_unique(&_env)); + _env->set_cluster_info(new ClusterInfo()); + _env->set_new_load_stream_mgr(NewLoadStreamMgr::create_unique()); + _env->set_stream_load_executor(StreamLoadExecutor::create_unique(&_env)); config::max_routine_load_thread_pool_size = 1024; config::max_consumer_num_per_group = 3; } - void TearDown() override { delete _env.cluster_info(); } - - ExecEnv _env; + void TearDown() override { + _env->clear_new_load_stream_mgr(); + _env->clear_stream_load_executor(); + } }; TEST_F(RoutineLoadTaskExecutorTest, exec_task) { diff --git a/be/test/vec/exec/vwal_scanner_test.cpp b/be/test/vec/exec/vwal_scanner_test.cpp index ce8c3bcbbb9188a..2e6d4bf5cdea768 100644 --- a/be/test/vec/exec/vwal_scanner_test.cpp +++ b/be/test/vec/exec/vwal_scanner_test.cpp @@ -79,6 +79,7 @@ class VWalScannerTest : public testing::Test { WARN_IF_ERROR(io::global_local_filesystem()->delete_directory(_wal_dir), fmt::format("fail to delete dir={}", _wal_dir)); SAFE_STOP(_env->_wal_manager); + _env->clear_wal_mgr(); } protected: