Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor](execenv) remove shared ptr from exec env #46034

Merged
merged 2 commits into from
Dec 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_stream_load_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
namespace doris {

class CloudStreamLoadExecutor final : public StreamLoadExecutor {
ENABLE_FACTORY_CREATOR(CloudStreamLoadExecutor);

public:
CloudStreamLoadExecutor(ExecEnv* exec_env);

Expand Down
6 changes: 0 additions & 6 deletions be/src/runtime/exec_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@

namespace doris {

ExecEnv::ExecEnv() = default;

ExecEnv::~ExecEnv() {
destroy();
}

#ifdef BE_TEST
void ExecEnv::set_inverted_index_searcher_cache(
segment_v2::InvertedIndexSearcherCache* inverted_index_searcher_cache) {
Expand Down
30 changes: 12 additions & 18 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,14 +244,14 @@ class ExecEnv {
}
LoadChannelMgr* load_channel_mgr() { return _load_channel_mgr; }
LoadStreamMgr* load_stream_mgr() { return _load_stream_mgr.get(); }
std::shared_ptr<NewLoadStreamMgr> new_load_stream_mgr() { return _new_load_stream_mgr; }
NewLoadStreamMgr* new_load_stream_mgr() { return _new_load_stream_mgr.get(); }
SmallFileMgr* small_file_mgr() { return _small_file_mgr; }
doris::vectorized::SpillStreamManager* spill_stream_mgr() { return _spill_stream_mgr; }
GroupCommitMgr* group_commit_mgr() { return _group_commit_mgr; }

const std::vector<StorePath>& store_paths() const { return _store_paths; }

std::shared_ptr<StreamLoadExecutor> stream_load_executor() { return _stream_load_executor; }
StreamLoadExecutor* stream_load_executor() { return _stream_load_executor.get(); }
RoutineLoadTaskExecutor* routine_load_task_executor() { return _routine_load_task_executor; }
HeartbeatFlags* heartbeat_flags() { return _heartbeat_flags; }
vectorized::ScannerScheduler* scanner_scheduler() { return _scanner_scheduler; }
Expand All @@ -273,12 +273,10 @@ class ExecEnv {
_memtable_memory_limiter.reset(limiter);
}
void set_cluster_info(ClusterInfo* cluster_info) { this->_cluster_info = cluster_info; }
void set_new_load_stream_mgr(std::shared_ptr<NewLoadStreamMgr> new_load_stream_mgr) {
this->_new_load_stream_mgr = new_load_stream_mgr;
}
void set_stream_load_executor(std::shared_ptr<StreamLoadExecutor> stream_load_executor) {
this->_stream_load_executor = stream_load_executor;
}
void set_new_load_stream_mgr(std::unique_ptr<NewLoadStreamMgr>&& new_load_stream_mgr);
void clear_new_load_stream_mgr();
void set_stream_load_executor(std::unique_ptr<StreamLoadExecutor>&& stream_load_executor);
void clear_stream_load_executor();

void set_storage_engine(std::unique_ptr<BaseStorageEngine>&& engine);
void set_inverted_index_searcher_cache(
Expand All @@ -294,10 +292,9 @@ class ExecEnv {
void set_routine_load_task_executor(RoutineLoadTaskExecutor* r) {
this->_routine_load_task_executor = r;
}
void set_wal_mgr(std::shared_ptr<WalManager> wm) { this->_wal_manager = wm; }
void set_dummy_lru_cache(std::shared_ptr<DummyLRUCache> dummy_lru_cache) {
this->_dummy_lru_cache = dummy_lru_cache;
}
void set_wal_mgr(std::unique_ptr<WalManager>&& wm);
void clear_wal_mgr();

void set_write_cooldown_meta_executors();
static void set_tracking_memory(bool tracking_memory) {
_s_tracking_memory.store(tracking_memory, std::memory_order_release);
Expand Down Expand Up @@ -331,7 +328,6 @@ class ExecEnv {
return _inverted_index_query_cache;
}
QueryCache* get_query_cache() { return _query_cache; }
std::shared_ptr<DummyLRUCache> get_dummy_lru_cache() { return _dummy_lru_cache; }

pipeline::RuntimeFilterTimerQueue* runtime_filter_timer_queue() {
return _runtime_filter_timer_queue;
Expand Down Expand Up @@ -429,13 +425,12 @@ class ExecEnv {
BrokerMgr* _broker_mgr = nullptr;
LoadChannelMgr* _load_channel_mgr = nullptr;
std::unique_ptr<LoadStreamMgr> _load_stream_mgr;
// TODO(zhiqiang): Do not use shared_ptr in exec_env, we can not control its life cycle.
std::shared_ptr<NewLoadStreamMgr> _new_load_stream_mgr;
std::unique_ptr<NewLoadStreamMgr> _new_load_stream_mgr;
BrpcClientCache<PBackendService_Stub>* _internal_client_cache = nullptr;
BrpcClientCache<PBackendService_Stub>* _streaming_client_cache = nullptr;
BrpcClientCache<PFunctionService_Stub>* _function_client_cache = nullptr;

std::shared_ptr<StreamLoadExecutor> _stream_load_executor;
std::unique_ptr<StreamLoadExecutor> _stream_load_executor;
RoutineLoadTaskExecutor* _routine_load_task_executor = nullptr;
SmallFileMgr* _small_file_mgr = nullptr;
HeartbeatFlags* _heartbeat_flags = nullptr;
Expand All @@ -446,7 +441,7 @@ class ExecEnv {
std::unique_ptr<MemTableMemoryLimiter> _memtable_memory_limiter;
std::unique_ptr<LoadStreamMapPool> _load_stream_map_pool;
std::unique_ptr<vectorized::DeltaWriterV2Pool> _delta_writer_v2_pool;
std::shared_ptr<WalManager> _wal_manager;
std::unique_ptr<WalManager> _wal_manager;
DNSCache* _dns_cache = nullptr;
std::unique_ptr<WriteCooldownMetaExecutors> _write_cooldown_meta_executors;

Expand All @@ -473,7 +468,6 @@ class ExecEnv {
segment_v2::InvertedIndexSearcherCache* _inverted_index_searcher_cache = nullptr;
segment_v2::InvertedIndexQueryCache* _inverted_index_query_cache = nullptr;
QueryCache* _query_cache = nullptr;
std::shared_ptr<DummyLRUCache> _dummy_lru_cache = nullptr;
std::unique_ptr<io::FDCache> _file_cache_open_fd_cache;

pipeline::RuntimeFilterTimerQueue* _runtime_filter_timer_queue = nullptr;
Expand Down
39 changes: 33 additions & 6 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ ThreadPool* ExecEnv::non_block_close_thread_pool() {
#endif
}

ExecEnv::ExecEnv() = default;

ExecEnv::~ExecEnv() {
destroy();
}

Status ExecEnv::init(ExecEnv* env, const std::vector<StorePath>& store_paths,
const std::vector<StorePath>& spill_store_paths,
const std::set<std::string>& broken_paths) {
Expand Down Expand Up @@ -290,16 +296,16 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
_store_paths.size() * config::flush_thread_num_per_store,
static_cast<size_t>(CpuInfo::num_cores()) * config::max_flush_thread_num_per_cpu);
_load_stream_mgr = std::make_unique<LoadStreamMgr>(num_flush_threads);
_new_load_stream_mgr = NewLoadStreamMgr::create_shared();
_new_load_stream_mgr = NewLoadStreamMgr::create_unique();
_internal_client_cache = new BrpcClientCache<PBackendService_Stub>();
_streaming_client_cache =
new BrpcClientCache<PBackendService_Stub>("baidu_std", "single", "streaming");
_function_client_cache =
new BrpcClientCache<PFunctionService_Stub>(config::function_service_protocol);
if (config::is_cloud_mode()) {
_stream_load_executor = std::make_shared<CloudStreamLoadExecutor>(this);
_stream_load_executor = CloudStreamLoadExecutor::create_unique(this);
} else {
_stream_load_executor = StreamLoadExecutor::create_shared(this);
_stream_load_executor = StreamLoadExecutor::create_unique(this);
}
_routine_load_task_executor = new RoutineLoadTaskExecutor(this);
RETURN_IF_ERROR(_routine_load_task_executor->init(MemInfo::mem_limit()));
Expand All @@ -309,7 +315,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
_load_stream_map_pool = std::make_unique<LoadStreamMapPool>();
_delta_writer_v2_pool = std::make_unique<vectorized::DeltaWriterV2Pool>();
_file_cache_open_fd_cache = std::make_unique<io::FDCache>();
_wal_manager = WalManager::create_shared(this, config::group_commit_wal_path);
_wal_manager = WalManager::create_unique(this, config::group_commit_wal_path);
_dns_cache = new DNSCache();
_write_cooldown_meta_executors = std::make_unique<WriteCooldownMetaExecutors>();
_spill_stream_mgr = new vectorized::SpillStreamManager(std::move(spill_store_map));
Expand Down Expand Up @@ -464,8 +470,6 @@ Status ExecEnv::_init_mem_env() {
return Status::InternalError(ss.str());
}

_dummy_lru_cache = std::make_shared<DummyLRUCache>();

_cache_manager = CacheManager::create_global_instance();

int64_t storage_cache_limit =
Expand Down Expand Up @@ -681,7 +685,30 @@ void ExecEnv::_deregister_metrics() {
DEREGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num);
DEREGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size);
}
#ifdef BE_TEST
void ExecEnv::set_new_load_stream_mgr(std::unique_ptr<NewLoadStreamMgr>&& new_load_stream_mgr) {
this->_new_load_stream_mgr = std::move(new_load_stream_mgr);
}

void ExecEnv::clear_new_load_stream_mgr() {
this->_new_load_stream_mgr.reset();
}

void ExecEnv::set_stream_load_executor(std::unique_ptr<StreamLoadExecutor>&& stream_load_executor) {
this->_stream_load_executor = std::move(stream_load_executor);
}

void ExecEnv::clear_stream_load_executor() {
this->_stream_load_executor.reset();
}

void ExecEnv::set_wal_mgr(std::unique_ptr<WalManager>&& wm) {
this->_wal_manager = std::move(wm);
}
void ExecEnv::clear_wal_mgr() {
this->_wal_manager.reset();
}
#endif
// TODO(zhiqiang): Need refactor all thread pool. Each thread pool must have a Stop method.
// We need to stop all threads before releasing resource.
void ExecEnv::destroy() {
Expand Down
12 changes: 5 additions & 7 deletions be/src/runtime/memory/lru_cache_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ class LRUCachePolicy : public CachePolicy {
new ShardedLRUCache(type_string(type), capacity, lru_cache_type, num_shards,
element_count_capacity, is_lru_k));
} else {
CHECK(ExecEnv::GetInstance()->get_dummy_lru_cache());
_cache = ExecEnv::GetInstance()->get_dummy_lru_cache();
_cache = std::make_shared<doris::DummyLRUCache>();
}
_init_mem_tracker(lru_cache_type_string(lru_cache_type));
}
Expand All @@ -64,8 +63,7 @@ class LRUCachePolicy : public CachePolicy {
cache_value_time_extractor, cache_value_check_timestamp,
element_count_capacity, is_lru_k));
} else {
CHECK(ExecEnv::GetInstance()->get_dummy_lru_cache());
_cache = ExecEnv::GetInstance()->get_dummy_lru_cache();
_cache = std::make_shared<doris::DummyLRUCache>();
}
_init_mem_tracker(lru_cache_type_string(lru_cache_type));
}
Expand Down Expand Up @@ -157,7 +155,7 @@ class LRUCachePolicy : public CachePolicy {
std::lock_guard<std::mutex> l(_lock);
COUNTER_SET(_freed_entrys_counter, (int64_t)0);
COUNTER_SET(_freed_memory_counter, (int64_t)0);
if (_stale_sweep_time_s <= 0 || _cache == ExecEnv::GetInstance()->get_dummy_lru_cache()) {
if (_stale_sweep_time_s <= 0 || std::dynamic_pointer_cast<doris::DummyLRUCache>(_cache)) {
return;
}
if (exceed_prune_limit()) {
Expand Down Expand Up @@ -204,7 +202,7 @@ class LRUCachePolicy : public CachePolicy {
std::lock_guard<std::mutex> l(_lock);
COUNTER_SET(_freed_entrys_counter, (int64_t)0);
COUNTER_SET(_freed_memory_counter, (int64_t)0);
if (_cache == ExecEnv::GetInstance()->get_dummy_lru_cache()) {
if (std::dynamic_pointer_cast<doris::DummyLRUCache>(_cache)) {
return;
}
if ((force && mem_consumption() != 0) || exceed_prune_limit()) {
Expand Down Expand Up @@ -246,7 +244,7 @@ class LRUCachePolicy : public CachePolicy {
COUNTER_SET(_freed_entrys_counter, (int64_t)0);
COUNTER_SET(_freed_memory_counter, (int64_t)0);
COUNTER_SET(_cost_timer, (int64_t)0);
if (_cache == ExecEnv::GetInstance()->get_dummy_lru_cache()) {
if (std::dynamic_pointer_cast<doris::DummyLRUCache>(_cache)) {
return 0;
}

Expand Down
4 changes: 2 additions & 2 deletions be/test/http/stream_load_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ void http_request_done_cb(struct evhttp_request* req, void* arg) {

TEST_F(StreamLoadTest, TestHeader) {
// 1G
auto wal_mgr = WalManager::create_shared(ExecEnv::GetInstance(), config::group_commit_wal_path);
auto wal_mgr = WalManager::create_unique(ExecEnv::GetInstance(), config::group_commit_wal_path);
static_cast<void>(wal_mgr->_wal_dirs_info->add("test_path_1", 1000, 0, 0));
static_cast<void>(wal_mgr->_wal_dirs_info->add("test_path_2", 10000, 0, 0));
static_cast<void>(wal_mgr->_wal_dirs_info->add("test_path_3", 100000, 0, 0));
ExecEnv::GetInstance()->set_wal_mgr(wal_mgr);
ExecEnv::GetInstance()->set_wal_mgr(std::move(wal_mgr));
// 1. empty info
{
auto* evhttp_req = evhttp_request_new(nullptr, nullptr);
Expand Down
13 changes: 8 additions & 5 deletions be/test/olap/wal/wal_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ class WalManagerTest : public testing::Test {
_env->_cluster_info->master_fe_addr.hostname = "host name";
_env->_cluster_info->master_fe_addr.port = 1234;
_env->_cluster_info->backend_id = 1001;
_env->new_load_stream_mgr() = NewLoadStreamMgr::create_shared();
_env->set_new_load_stream_mgr(NewLoadStreamMgr::create_unique());
_env->_internal_client_cache = new BrpcClientCache<PBackendService_Stub>();
_env->_function_client_cache = new BrpcClientCache<PFunctionService_Stub>();
_env->_stream_load_executor = StreamLoadExecutor::create_shared(_env);
_env->_stream_load_executor = StreamLoadExecutor::create_unique(_env);
_env->_store_paths = {StorePath(std::filesystem::current_path(), 0)};
_env->_wal_manager = WalManager::create_shared(_env, wal_dir.string());
_env->set_wal_mgr(WalManager::create_unique(_env, wal_dir.string()));
k_stream_load_begin_result = TLoadTxnBeginResult();
}
void TearDown() override {
Expand All @@ -78,6 +78,9 @@ class WalManagerTest : public testing::Test {
SAFE_DELETE(_env->_function_client_cache);
SAFE_DELETE(_env->_internal_client_cache);
SAFE_DELETE(_env->_cluster_info);
_env->clear_new_load_stream_mgr();
_env->clear_stream_load_executor();
//_env->clear_wal_mgr();
}

void prepare() {
Expand Down Expand Up @@ -155,9 +158,9 @@ TEST_F(WalManagerTest, recovery_normal) {
}

TEST_F(WalManagerTest, TestDynamicWalSpaceLimt) {
auto wal_mgr = WalManager::create_shared(_env, config::group_commit_wal_path);
auto wal_mgr = WalManager::create_unique(_env, config::group_commit_wal_path);
static_cast<void>(wal_mgr->init());
_env->set_wal_mgr(wal_mgr);
_env->set_wal_mgr(std::move(wal_mgr));

// 1T
size_t available_bytes = 1099511627776;
Expand Down
19 changes: 12 additions & 7 deletions be/test/runtime/routine_load_task_executor_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,28 @@ class RoutineLoadTaskExecutorTest : public testing::Test {
RoutineLoadTaskExecutorTest() = default;
~RoutineLoadTaskExecutorTest() override = default;

ExecEnv* _env = nullptr;

void SetUp() override {
_env = ExecEnv::GetInstance();
k_stream_load_begin_result = TLoadTxnBeginResult();
k_stream_load_commit_result = TLoadTxnCommitResult();
k_stream_load_rollback_result = TLoadTxnRollbackResult();
k_stream_load_put_result = TStreamLoadPutResult();

_env.set_cluster_info(new ClusterInfo());
_env.set_new_load_stream_mgr(NewLoadStreamMgr::create_unique());
_env.set_stream_load_executor(StreamLoadExecutor::create_unique(&_env));
_env->set_cluster_info(new ClusterInfo());
_env->set_new_load_stream_mgr(NewLoadStreamMgr::create_unique());
_env->set_stream_load_executor(StreamLoadExecutor::create_unique(_env));

config::max_routine_load_thread_pool_size = 1024;
config::max_consumer_num_per_group = 3;
}

void TearDown() override { delete _env.cluster_info(); }

ExecEnv _env;
void TearDown() override {
delete _env->cluster_info();
_env->clear_new_load_stream_mgr();
_env->clear_stream_load_executor();
}
};

TEST_F(RoutineLoadTaskExecutorTest, exec_task) {
Expand All @@ -92,7 +97,7 @@ TEST_F(RoutineLoadTaskExecutorTest, exec_task) {

task.__set_kafka_load_info(k_info);

RoutineLoadTaskExecutor executor(&_env);
RoutineLoadTaskExecutor executor(_env);
Status st;
st = executor.init(1024 * 1024);
EXPECT_TRUE(st.ok());
Expand Down
1 change: 0 additions & 1 deletion be/test/testutil/run_all_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ int main(int argc, char** argv) {
doris::ExecEnv::GetInstance()->set_cache_manager(doris::CacheManager::create_global_instance());
doris::ExecEnv::GetInstance()->set_process_profile(
doris::ProcessProfile::create_global_instance());
doris::ExecEnv::GetInstance()->set_dummy_lru_cache(std::make_shared<doris::DummyLRUCache>());
doris::ExecEnv::GetInstance()->set_storage_page_cache(
doris::StoragePageCache::create_global_cache(1 << 30, 10, 0));
doris::ExecEnv::GetInstance()->set_segment_loader(new doris::SegmentLoader(1000, 1000));
Expand Down
3 changes: 2 additions & 1 deletion be/test/vec/exec/vwal_scanner_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class VWalScannerTest : public testing::Test {
WARN_IF_ERROR(io::global_local_filesystem()->delete_directory(_wal_dir),
fmt::format("fail to delete dir={}", _wal_dir));
SAFE_STOP(_env->_wal_manager);
_env->clear_wal_mgr();
}

protected:
Expand Down Expand Up @@ -286,7 +287,7 @@ void VWalScannerTest::init() {
_env->_cluster_info->master_fe_addr.hostname = "host name";
_env->_cluster_info->master_fe_addr.port = _backend_id;
_env->_cluster_info->backend_id = 1001;
_env->_wal_manager = WalManager::create_shared(_env, _wal_dir);
_env->set_wal_mgr(WalManager::create_unique(_env, _wal_dir));
std::string base_path;
auto st = _env->_wal_manager->_init_wal_dirs_info();
st = _env->_wal_manager->create_wal_path(_db_id, _tb_id, _txn_id_1, _label_1, base_path,
Expand Down
Loading