diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index b97ce2976eb8c8..f1017703570c77 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -334,19 +334,17 @@ Status HttpStreamAction::process_put(HttpRequest* http_req, ctx->label = ctx->put_result.params.import_label; ctx->put_result.params.__set_wal_id(ctx->wal_id); if (http_req != nullptr && http_req->header(HTTP_GROUP_COMMIT) == "async_mode") { - if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) { - size_t content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH)); - if (ctx->format == TFileFormatType::FORMAT_CSV_GZ || - ctx->format == TFileFormatType::FORMAT_CSV_LZO || - ctx->format == TFileFormatType::FORMAT_CSV_BZ2 || - ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME || - ctx->format == TFileFormatType::FORMAT_CSV_LZOP || - ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK || - ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) { - content_length *= 3; - } - ctx->put_result.params.__set_content_length(content_length); + size_t content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH)); + if (ctx->format == TFileFormatType::FORMAT_CSV_GZ || + ctx->format == TFileFormatType::FORMAT_CSV_LZO || + ctx->format == TFileFormatType::FORMAT_CSV_BZ2 || + ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME || + ctx->format == TFileFormatType::FORMAT_CSV_LZOP || + ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK || + ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) { + content_length *= 3; } + ctx->put_result.params.__set_content_length(content_length); } return _exec_env->stream_load_executor()->execute_plan_fragment(ctx); diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 88e12e19dca449..db8e56ec77b58c 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -633,19 +633,17 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, return plan_status; } if (http_req->header(HTTP_GROUP_COMMIT) == "async_mode") { - if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) { - size_t content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH)); - if (ctx->format == TFileFormatType::FORMAT_CSV_GZ || - ctx->format == TFileFormatType::FORMAT_CSV_LZO || - ctx->format == TFileFormatType::FORMAT_CSV_BZ2 || - ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME || - ctx->format == TFileFormatType::FORMAT_CSV_LZOP || - ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK || - ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) { - content_length *= 3; - } - ctx->put_result.params.__set_content_length(content_length); - } + size_t content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH)); + if (ctx->format == TFileFormatType::FORMAT_CSV_GZ || + ctx->format == TFileFormatType::FORMAT_CSV_LZO || + ctx->format == TFileFormatType::FORMAT_CSV_BZ2 || + ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME || + ctx->format == TFileFormatType::FORMAT_CSV_LZOP || + ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK || + ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) { + content_length *= 3; + } + ctx->put_result.params.__set_content_length(content_length); } VLOG_NOTICE << "params is " << apache::thrift::ThriftDebugString(ctx->put_result.params); diff --git a/be/src/io/fs/local_file_system.cpp b/be/src/io/fs/local_file_system.cpp index 78ba222d1ffd8c..1828c7c3d2c681 100644 --- a/be/src/io/fs/local_file_system.cpp +++ b/be/src/io/fs/local_file_system.cpp @@ -186,7 +186,7 @@ Status LocalFileSystem::directory_size(const Path& dir_path, size_t* dir_size) { try { *dir_size += std::filesystem::file_size(entry); } catch (const std::exception& e) { - LOG(INFO) << "{}", e.what(); + LOG(INFO) << "failed to get file size, err: {}", e.what(); } } } diff --git a/be/src/olap/wal/wal_dirs_info.cpp b/be/src/olap/wal/wal_dirs_info.cpp index 19ad2562778047..79bd024e7d08d9 100644 --- a/be/src/olap/wal/wal_dirs_info.cpp +++ b/be/src/olap/wal/wal_dirs_info.cpp @@ -43,13 +43,10 @@ void WalDirInfo::set_used(size_t used) { _used = used; } -void WalDirInfo::set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated) { +void WalDirInfo::set_pre_allocated(size_t increase_pre_allocated, size_t decrease_pre_allocated) { std::unique_lock wlock(_lock); - if (is_add_pre_allocated) { - _pre_allocated += pre_allocated; - } else { - _pre_allocated -= pre_allocated; - } + _pre_allocated += increase_pre_allocated; + _pre_allocated -= decrease_pre_allocated; } size_t WalDirInfo::available() { @@ -72,9 +69,6 @@ Status WalDirInfo::update_wal_dir_limit(size_t limit) { if (wal_disk_limit <= 0) { return Status::InternalError("Disk full! Please check your disk usage!"); } - size_t wal_dir_size = 0; - RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(_wal_dir, &wal_dir_size)); - // TODO should be wal_disk_limit + wal_dir_size set_limit(wal_disk_limit); } return Status::OK(); @@ -91,9 +85,9 @@ Status WalDirInfo::update_wal_dir_used(size_t used) { return Status::OK(); } -Status WalDirInfo::update_wal_dir_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated) { - set_pre_allocated(pre_allocated, is_add_pre_allocated); - return Status::OK(); +void WalDirInfo::update_wal_dir_pre_allocated(size_t increase_pre_allocated, + size_t decrease_pre_allocated) { + set_pre_allocated(increase_pre_allocated, decrease_pre_allocated); } Status WalDirsInfo::add(const std::string& wal_dir, size_t limit, size_t used, @@ -178,11 +172,13 @@ Status WalDirsInfo::update_all_wal_dir_used() { return Status::OK(); } -Status WalDirsInfo::update_wal_dir_pre_allocated(std::string wal_dir, size_t pre_allocated, - bool is_add_pre_allocated) { +Status WalDirsInfo::update_wal_dir_pre_allocated(std::string wal_dir, size_t increase_pre_allocated, + size_t decrease_pre_allocated) { for (const auto& wal_dir_info : _wal_dirs_info_vec) { if (wal_dir_info->get_wal_dir() == wal_dir) { - return wal_dir_info->update_wal_dir_pre_allocated(pre_allocated, is_add_pre_allocated); + wal_dir_info->update_wal_dir_pre_allocated(increase_pre_allocated, + decrease_pre_allocated); + return Status::OK(); } } return Status::InternalError("Can not find wal dir in wal disks info."); diff --git a/be/src/olap/wal/wal_dirs_info.h b/be/src/olap/wal/wal_dirs_info.h index 91a26af2b5d02a..eda9cc72d30d6f 100644 --- a/be/src/olap/wal/wal_dirs_info.h +++ b/be/src/olap/wal/wal_dirs_info.h @@ -42,12 +42,11 @@ class WalDirInfo { size_t get_limit(); void set_limit(size_t limit); void set_used(size_t used); - // TODO increase_pre_allocated and decrease_pre_allocated - void set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated); + void set_pre_allocated(size_t increase_pre_allocated, size_t decrease_pre_allocated); size_t available(); Status update_wal_dir_limit(size_t limit = -1); Status update_wal_dir_used(size_t used = -1); - Status update_wal_dir_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated = true); + void update_wal_dir_pre_allocated(size_t increase_pre_allocated, size_t decrease_pre_allocated); private: std::string _wal_dir; @@ -70,8 +69,8 @@ class WalDirsInfo { Status update_all_wal_dir_limit(); Status update_wal_dir_used(std::string wal_dir, size_t used = -1); Status update_all_wal_dir_used(); - Status update_wal_dir_pre_allocated(std::string wal_dir, size_t pre_allocated, - bool is_add_pre_allocated); + Status update_wal_dir_pre_allocated(std::string wal_dir, size_t increase_pre_allocated, + size_t decrease_pre_allocated); Status get_wal_dir_available_size(const std::string& wal_dir, size_t* available_bytes); private: diff --git a/be/src/olap/wal/wal_manager.cpp b/be/src/olap/wal/wal_manager.cpp index 9af91cf7b55348..9d92f5c9e6ad5f 100644 --- a/be/src/olap/wal/wal_manager.cpp +++ b/be/src/olap/wal/wal_manager.cpp @@ -232,8 +232,8 @@ void WalManager::print_wal_status_queue() { LOG(INFO) << ss.str(); } -Status WalManager::add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, - const std::string& label, std::string& base_path) { +Status WalManager::create_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, + const std::string& label, std::string& base_path) { base_path = _wal_dirs_info->get_available_random_wal_dir(); std::stringstream ss; ss << base_path << "/" << std::to_string(db_id) << "/" << std::to_string(table_id) << "/" @@ -286,11 +286,6 @@ Status WalManager::create_wal_writer(int64_t wal_id, std::shared_ptr& LOG(INFO) << "create wal " << wal_path; wal_writer = std::make_shared(wal_path); RETURN_IF_ERROR(wal_writer->init()); - { - // TODO no use, should remove it - std::lock_guard wrlock(_wal_lock); - _wal_id_to_writer_map.emplace(wal_id, wal_writer); - } return Status::OK(); } @@ -431,8 +426,8 @@ Status WalManager::delete_wal(int64_t wal_id, size_t block_queue_pre_allocated) _wal_path_map.erase(wal_id); } } - RETURN_IF_ERROR(update_wal_dir_pre_allocated(_get_base_wal_path(wal_path), - block_queue_pre_allocated, false)); + RETURN_IF_ERROR(update_wal_dir_pre_allocated(_get_base_wal_path(wal_path), 0, + block_queue_pre_allocated)); return Status::OK(); } @@ -481,10 +476,11 @@ Status WalManager::update_wal_dir_used(const std::string& wal_dir, size_t used) return _wal_dirs_info->update_wal_dir_used(wal_dir, used); } -Status WalManager::update_wal_dir_pre_allocated(const std::string& wal_dir, size_t pre_allocated, - bool is_add_pre_allocated) { - return _wal_dirs_info->update_wal_dir_pre_allocated(wal_dir, pre_allocated, - is_add_pre_allocated); +Status WalManager::update_wal_dir_pre_allocated(const std::string& wal_dir, + size_t increase_pre_allocated, + size_t decrease_pre_allocated) { + return _wal_dirs_info->update_wal_dir_pre_allocated(wal_dir, increase_pre_allocated, + decrease_pre_allocated); } Status WalManager::_update_wal_dir_info_thread() { diff --git a/be/src/olap/wal/wal_manager.h b/be/src/olap/wal/wal_manager.h index f4e28445f8f786..f6a3bfef79891b 100644 --- a/be/src/olap/wal/wal_manager.h +++ b/be/src/olap/wal/wal_manager.h @@ -63,14 +63,14 @@ class WalManager { // wal back pressure Status update_wal_dir_limit(const std::string& wal_dir, size_t limit = -1); Status update_wal_dir_used(const std::string& wal_dir, size_t used = -1); - Status update_wal_dir_pre_allocated(const std::string& wal_dir, size_t pre_allocated, - bool is_add_pre_allocated); + Status update_wal_dir_pre_allocated(const std::string& wal_dir, size_t increase_pre_allocated, + size_t decrease_pre_allocated); Status get_wal_dir_available_size(const std::string& wal_dir, size_t* available_bytes); size_t get_max_available_size(); // replay wal - Status add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, const std::string& label, - std::string& base_path); + Status create_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, + const std::string& label, std::string& base_path); Status get_wal_path(int64_t wal_id, std::string& wal_path); Status delete_wal(int64_t wal_id, size_t block_queue_pre_allocated = 0); Status add_recover_wal(int64_t db_id, int64_t table_id, int64_t wal_id, std::string wal); @@ -130,8 +130,6 @@ class WalManager { std::shared_mutex _wal_lock; std::unordered_map _wal_path_map; - // TODO no use? need remove it. And the map dose not clear - std::unordered_map> _wal_id_to_writer_map; // TODO Now only used for debug wal status, consider remove it std::shared_mutex _wal_status_lock; diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 2971138d5b6d53..8a81388ecd2948 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -491,8 +491,8 @@ Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& i Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id, const std::string& import_label, WalManager* wal_manager, std::vector& slot_desc, int be_exe_version) { - RETURN_IF_ERROR(ExecEnv::GetInstance()->wal_mgr()->add_wal_path(db_id, tb_id, wal_id, - import_label, _wal_base_path)); + RETURN_IF_ERROR(ExecEnv::GetInstance()->wal_mgr()->create_wal_path( + db_id, tb_id, wal_id, import_label, _wal_base_path)); _v_wal_writer = std::make_shared( tb_id, wal_id, import_label, wal_manager, slot_desc, be_exe_version); return _v_wal_writer->init(); @@ -515,7 +515,7 @@ bool LoadBlockQueue::has_enough_wal_disk_space(size_t pre_allocated) { } } if (pre_allocated < available_bytes) { - Status st = wal_mgr->update_wal_dir_pre_allocated(_wal_base_path, pre_allocated, true); + Status st = wal_mgr->update_wal_dir_pre_allocated(_wal_base_path, pre_allocated, 0); if (!st.ok()) { LOG(WARNING) << "update wal dir pre_allocated failed, reason: " << st.to_string(); } diff --git a/be/test/vec/exec/vwal_scanner_test.cpp b/be/test/vec/exec/vwal_scanner_test.cpp index 2d786679d4e6cd..cf9e1733643f12 100644 --- a/be/test/vec/exec/vwal_scanner_test.cpp +++ b/be/test/vec/exec/vwal_scanner_test.cpp @@ -218,7 +218,7 @@ void VWalScannerTest::init() { _env->_wal_manager = WalManager::create_shared(_env, wal_dir); std::string base_path; auto st = _env->_wal_manager->_init_wal_dirs_info(); - st = _env->_wal_manager->add_wal_path(db_id, tb_id, txn_id, label, base_path); + st = _env->_wal_manager->create_wal_path(db_id, tb_id, txn_id, label, base_path); } TEST_F(VWalScannerTest, normal) {