Skip to content

Commit

Permalink
f
Browse files Browse the repository at this point in the history
  • Loading branch information
yiguolei committed Dec 28, 2024
1 parent 9ceced5 commit 7c7c43b
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 3 deletions.
3 changes: 3 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,9 @@ class ExecEnv {
this->_cluster_info = std::move(cluster_info);
}
void set_new_load_stream_mgr(std::unique_ptr<NewLoadStreamMgr>&& new_load_stream_mgr);
void clear_new_load_stream_mgr();
void set_stream_load_executor(std::unique_ptr<StreamLoadExecutor>&& stream_load_executor);
void clear_stream_load_executor();

void set_storage_engine(std::unique_ptr<BaseStorageEngine>&& engine);
void set_inverted_index_searcher_cache(
Expand All @@ -295,6 +297,7 @@ class ExecEnv {
this->_routine_load_task_executor = r;
}
void set_wal_mgr(std::unique_ptr<WalManager>&& wm);
void clear_wal_mgr();

void set_write_cooldown_meta_executors();
static void set_tracking_memory(bool tracking_memory) {
Expand Down
11 changes: 11 additions & 0 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -695,13 +695,24 @@ void ExecEnv::set_new_load_stream_mgr(std::unique_ptr<NewLoadStreamMgr>&& new_lo
this->_new_load_stream_mgr = std::move(new_load_stream_mgr);
}

void ExecEnv::clear_new_load_stream_mgr() {
this->_new_load_stream_mgr.reset();
}

void ExecEnv::set_stream_load_executor(std::unique_ptr<StreamLoadExecutor>&& stream_load_executor) {
this->_stream_load_executor = std::move(stream_load_executor);
}

void ExecEnv::clear_stream_load_executor() {
this->_stream_load_executor.reset();
}

void ExecEnv::set_wal_mgr(std::unique_ptr<WalManager>&& wm) {
this->_wal_manager = std::move(wm);
}
void ExecEnv::clear_wal_mgr() {
this->_wal_manager.reset();
}
#endif
// TODO(zhiqiang): Need refactor all thread pool. Each thread pool must have a Stop method.
// We need to stop all threads before releasing resource.
Expand Down
2 changes: 1 addition & 1 deletion be/test/olap/wal/wal_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ class WalManagerTest : public testing::Test {
_env->_stream_load_executor = StreamLoadExecutor::create_unique(_env);
_env->_store_paths = {StorePath(std::filesystem::current_path(), 0)};
_env->set_wal_mgr(WalManager::create_unique(_env, wal_dir.string()));
_env->set_ready();
k_stream_load_begin_result = TLoadTxnBeginResult();
}
void TearDown() override {
Expand All @@ -78,6 +77,7 @@ class WalManagerTest : public testing::Test {
SAFE_STOP(_env->_wal_manager);
SAFE_DELETE(_env->_function_client_cache);
SAFE_DELETE(_env->_internal_client_cache);
_env->clear_wal_mgr();
}

void prepare() {
Expand Down
7 changes: 5 additions & 2 deletions be/test/runtime/routine_load_task_executor_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,15 @@ class RoutineLoadTaskExecutorTest : public testing::Test {
_env->set_cluster_info(std::make_unique<ClusterInfo>());
_env->set_new_load_stream_mgr(NewLoadStreamMgr::create_unique());
_env->set_stream_load_executor(StreamLoadExecutor::create_unique(_env));
_env->set_ready();

config::max_routine_load_thread_pool_size = 1024;
config::max_consumer_num_per_group = 3;
}

//void TearDown() override { delete _env.cluster_info(); }
void TearDown() override {
_env->clear_new_load_stream_mgr();
_env->clear_stream_load_executor();
}
};

TEST_F(RoutineLoadTaskExecutorTest, exec_task) {
Expand Down
1 change: 1 addition & 0 deletions be/test/vec/exec/vwal_scanner_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class VWalScannerTest : public testing::Test {
WARN_IF_ERROR(io::global_local_filesystem()->delete_directory(_wal_dir),
fmt::format("fail to delete dir={}", _wal_dir));
SAFE_STOP(_env->_wal_manager);
_env->clear_wal_mgr();
}

protected:
Expand Down

0 comments on commit 7c7c43b

Please sign in to comment.