Skip to content

Commit

Permalink
f
Browse files Browse the repository at this point in the history
  • Loading branch information
yiguolei committed Dec 28, 2024
1 parent bfca486 commit 1e38e9f
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 6 deletions.
3 changes: 3 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,9 @@ class ExecEnv {
}
void set_cluster_info(ClusterInfo* cluster_info) { this->_cluster_info = cluster_info; }
void set_new_load_stream_mgr(std::unique_ptr<NewLoadStreamMgr>&& new_load_stream_mgr);
void clear_new_load_stream_mgr();
void set_stream_load_executor(std::unique_ptr<StreamLoadExecutor>&& stream_load_executor);
void clear_stream_load_executor();

void set_storage_engine(std::unique_ptr<BaseStorageEngine>&& engine);
void set_inverted_index_searcher_cache(
Expand All @@ -291,6 +293,7 @@ class ExecEnv {
this->_routine_load_task_executor = r;
}
void set_wal_mgr(std::unique_ptr<WalManager>&& wm);
void clear_wal_mgr();

void set_write_cooldown_meta_executors();
static void set_tracking_memory(bool tracking_memory) {
Expand Down
11 changes: 11 additions & 0 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -690,13 +690,24 @@ void ExecEnv::set_new_load_stream_mgr(std::unique_ptr<NewLoadStreamMgr>&& new_lo
this->_new_load_stream_mgr = std::move(new_load_stream_mgr);
}

void ExecEnv::clear_new_load_stream_mgr() {
this->_new_load_stream_mgr.reset();
}

void ExecEnv::set_stream_load_executor(std::unique_ptr<StreamLoadExecutor>&& stream_load_executor) {
this->_stream_load_executor = std::move(stream_load_executor);
}

void ExecEnv::clear_stream_load_executor() {
this->_stream_load_executor.reset();
}

void ExecEnv::set_wal_mgr(std::unique_ptr<WalManager>&& wm) {
this->_wal_manager = std::move(wm);
}
void ExecEnv::clear_wal_mgr() {
this->_wal_manager.reset();
}
#endif
// TODO(zhiqiang): Need refactor all thread pool. Each thread pool must have a Stop method.
// We need to stop all threads before releasing resource.
Expand Down
3 changes: 3 additions & 0 deletions be/test/olap/wal/wal_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ class WalManagerTest : public testing::Test {
SAFE_DELETE(_env->_function_client_cache);
SAFE_DELETE(_env->_internal_client_cache);
SAFE_DELETE(_env->_cluster_info);
_env->clear_new_load_stream_mgr();
_env->clear_stream_load_executor();
_env->clear_wal_mgr();
}

void prepare() {
Expand Down
16 changes: 10 additions & 6 deletions be/test/runtime/routine_load_task_executor_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,27 @@ class RoutineLoadTaskExecutorTest : public testing::Test {
RoutineLoadTaskExecutorTest() = default;
~RoutineLoadTaskExecutorTest() override = default;

ExecEnv* _env = nullptr;

void SetUp() override {
_env = ExecEnv::GetInstance();
k_stream_load_begin_result = TLoadTxnBeginResult();
k_stream_load_commit_result = TLoadTxnCommitResult();
k_stream_load_rollback_result = TLoadTxnRollbackResult();
k_stream_load_put_result = TStreamLoadPutResult();

_env.set_cluster_info(new ClusterInfo());
_env.set_new_load_stream_mgr(NewLoadStreamMgr::create_unique());
_env.set_stream_load_executor(StreamLoadExecutor::create_unique(&_env));
_env->set_cluster_info(new ClusterInfo());
_env->set_new_load_stream_mgr(NewLoadStreamMgr::create_unique());
_env->set_stream_load_executor(StreamLoadExecutor::create_unique(&_env));

config::max_routine_load_thread_pool_size = 1024;
config::max_consumer_num_per_group = 3;
}

void TearDown() override { delete _env.cluster_info(); }

ExecEnv _env;
void TearDown() override {
_env->clear_new_load_stream_mgr();
_env->clear_stream_load_executor();
}
};

TEST_F(RoutineLoadTaskExecutorTest, exec_task) {
Expand Down
1 change: 1 addition & 0 deletions be/test/vec/exec/vwal_scanner_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class VWalScannerTest : public testing::Test {
WARN_IF_ERROR(io::global_local_filesystem()->delete_directory(_wal_dir),
fmt::format("fail to delete dir={}", _wal_dir));
SAFE_STOP(_env->_wal_manager);
_env->clear_wal_mgr();
}

protected:
Expand Down

0 comments on commit 1e38e9f

Please sign in to comment.