Skip to content

Commit

Permalink
[refactor](wal) refactor some wal code (apache#29434)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yukang-Lian authored Jan 3, 2024
1 parent 329d57f commit e3c9f53
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 69 deletions.
22 changes: 10 additions & 12 deletions be/src/http/action/http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,19 +334,17 @@ Status HttpStreamAction::process_put(HttpRequest* http_req,
ctx->label = ctx->put_result.params.import_label;
ctx->put_result.params.__set_wal_id(ctx->wal_id);
if (http_req != nullptr && http_req->header(HTTP_GROUP_COMMIT) == "async_mode") {
if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
size_t content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
content_length *= 3;
}
ctx->put_result.params.__set_content_length(content_length);
size_t content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
content_length *= 3;
}
ctx->put_result.params.__set_content_length(content_length);
}

return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
Expand Down
24 changes: 11 additions & 13 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -633,19 +633,17 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
return plan_status;
}
if (http_req->header(HTTP_GROUP_COMMIT) == "async_mode") {
if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
size_t content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
content_length *= 3;
}
ctx->put_result.params.__set_content_length(content_length);
}
size_t content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
content_length *= 3;
}
ctx->put_result.params.__set_content_length(content_length);
}

VLOG_NOTICE << "params is " << apache::thrift::ThriftDebugString(ctx->put_result.params);
Expand Down
2 changes: 1 addition & 1 deletion be/src/io/fs/local_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ Status LocalFileSystem::directory_size(const Path& dir_path, size_t* dir_size) {
try {
*dir_size += std::filesystem::file_size(entry);
} catch (const std::exception& e) {
LOG(INFO) << "{}", e.what();
LOG(INFO) << "failed to get file size, err: {}", e.what();
}
}
}
Expand Down
26 changes: 11 additions & 15 deletions be/src/olap/wal/wal_dirs_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,10 @@ void WalDirInfo::set_used(size_t used) {
_used = used;
}

void WalDirInfo::set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated) {
void WalDirInfo::set_pre_allocated(size_t increase_pre_allocated, size_t decrease_pre_allocated) {
std::unique_lock wlock(_lock);
if (is_add_pre_allocated) {
_pre_allocated += pre_allocated;
} else {
_pre_allocated -= pre_allocated;
}
_pre_allocated += increase_pre_allocated;
_pre_allocated -= decrease_pre_allocated;
}

size_t WalDirInfo::available() {
Expand All @@ -72,9 +69,6 @@ Status WalDirInfo::update_wal_dir_limit(size_t limit) {
if (wal_disk_limit <= 0) {
return Status::InternalError("Disk full! Please check your disk usage!");
}
size_t wal_dir_size = 0;
RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(_wal_dir, &wal_dir_size));
// TODO should be wal_disk_limit + wal_dir_size
set_limit(wal_disk_limit);
}
return Status::OK();
Expand All @@ -91,9 +85,9 @@ Status WalDirInfo::update_wal_dir_used(size_t used) {
return Status::OK();
}

Status WalDirInfo::update_wal_dir_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated) {
set_pre_allocated(pre_allocated, is_add_pre_allocated);
return Status::OK();
void WalDirInfo::update_wal_dir_pre_allocated(size_t increase_pre_allocated,
size_t decrease_pre_allocated) {
set_pre_allocated(increase_pre_allocated, decrease_pre_allocated);
}

Status WalDirsInfo::add(const std::string& wal_dir, size_t limit, size_t used,
Expand Down Expand Up @@ -178,11 +172,13 @@ Status WalDirsInfo::update_all_wal_dir_used() {
return Status::OK();
}

Status WalDirsInfo::update_wal_dir_pre_allocated(std::string wal_dir, size_t pre_allocated,
bool is_add_pre_allocated) {
Status WalDirsInfo::update_wal_dir_pre_allocated(std::string wal_dir, size_t increase_pre_allocated,
size_t decrease_pre_allocated) {
for (const auto& wal_dir_info : _wal_dirs_info_vec) {
if (wal_dir_info->get_wal_dir() == wal_dir) {
return wal_dir_info->update_wal_dir_pre_allocated(pre_allocated, is_add_pre_allocated);
wal_dir_info->update_wal_dir_pre_allocated(increase_pre_allocated,
decrease_pre_allocated);
return Status::OK();
}
}
return Status::InternalError("Can not find wal dir in wal disks info.");
Expand Down
9 changes: 4 additions & 5 deletions be/src/olap/wal/wal_dirs_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,11 @@ class WalDirInfo {
size_t get_limit();
void set_limit(size_t limit);
void set_used(size_t used);
// TODO increase_pre_allocated and decrease_pre_allocated
void set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated);
void set_pre_allocated(size_t increase_pre_allocated, size_t decrease_pre_allocated);
size_t available();
Status update_wal_dir_limit(size_t limit = -1);
Status update_wal_dir_used(size_t used = -1);
Status update_wal_dir_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated = true);
void update_wal_dir_pre_allocated(size_t increase_pre_allocated, size_t decrease_pre_allocated);

private:
std::string _wal_dir;
Expand All @@ -70,8 +69,8 @@ class WalDirsInfo {
Status update_all_wal_dir_limit();
Status update_wal_dir_used(std::string wal_dir, size_t used = -1);
Status update_all_wal_dir_used();
Status update_wal_dir_pre_allocated(std::string wal_dir, size_t pre_allocated,
bool is_add_pre_allocated);
Status update_wal_dir_pre_allocated(std::string wal_dir, size_t increase_pre_allocated,
size_t decrease_pre_allocated);
Status get_wal_dir_available_size(const std::string& wal_dir, size_t* available_bytes);

private:
Expand Down
22 changes: 9 additions & 13 deletions be/src/olap/wal/wal_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,8 @@ void WalManager::print_wal_status_queue() {
LOG(INFO) << ss.str();
}

Status WalManager::add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id,
const std::string& label, std::string& base_path) {
Status WalManager::create_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id,
const std::string& label, std::string& base_path) {
base_path = _wal_dirs_info->get_available_random_wal_dir();
std::stringstream ss;
ss << base_path << "/" << std::to_string(db_id) << "/" << std::to_string(table_id) << "/"
Expand Down Expand Up @@ -286,11 +286,6 @@ Status WalManager::create_wal_writer(int64_t wal_id, std::shared_ptr<WalWriter>&
LOG(INFO) << "create wal " << wal_path;
wal_writer = std::make_shared<WalWriter>(wal_path);
RETURN_IF_ERROR(wal_writer->init());
{
// TODO no use, should remove it
std::lock_guard<std::shared_mutex> wrlock(_wal_lock);
_wal_id_to_writer_map.emplace(wal_id, wal_writer);
}
return Status::OK();
}

Expand Down Expand Up @@ -431,8 +426,8 @@ Status WalManager::delete_wal(int64_t wal_id, size_t block_queue_pre_allocated)
_wal_path_map.erase(wal_id);
}
}
RETURN_IF_ERROR(update_wal_dir_pre_allocated(_get_base_wal_path(wal_path),
block_queue_pre_allocated, false));
RETURN_IF_ERROR(update_wal_dir_pre_allocated(_get_base_wal_path(wal_path), 0,
block_queue_pre_allocated));
return Status::OK();
}

Expand Down Expand Up @@ -481,10 +476,11 @@ Status WalManager::update_wal_dir_used(const std::string& wal_dir, size_t used)
return _wal_dirs_info->update_wal_dir_used(wal_dir, used);
}

Status WalManager::update_wal_dir_pre_allocated(const std::string& wal_dir, size_t pre_allocated,
bool is_add_pre_allocated) {
return _wal_dirs_info->update_wal_dir_pre_allocated(wal_dir, pre_allocated,
is_add_pre_allocated);
Status WalManager::update_wal_dir_pre_allocated(const std::string& wal_dir,
size_t increase_pre_allocated,
size_t decrease_pre_allocated) {
return _wal_dirs_info->update_wal_dir_pre_allocated(wal_dir, increase_pre_allocated,
decrease_pre_allocated);
}

Status WalManager::_update_wal_dir_info_thread() {
Expand Down
10 changes: 4 additions & 6 deletions be/src/olap/wal/wal_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ class WalManager {
// wal back pressure
Status update_wal_dir_limit(const std::string& wal_dir, size_t limit = -1);
Status update_wal_dir_used(const std::string& wal_dir, size_t used = -1);
Status update_wal_dir_pre_allocated(const std::string& wal_dir, size_t pre_allocated,
bool is_add_pre_allocated);
Status update_wal_dir_pre_allocated(const std::string& wal_dir, size_t increase_pre_allocated,
size_t decrease_pre_allocated);
Status get_wal_dir_available_size(const std::string& wal_dir, size_t* available_bytes);
size_t get_max_available_size();

// replay wal
Status add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, const std::string& label,
std::string& base_path);
Status create_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id,
const std::string& label, std::string& base_path);
Status get_wal_path(int64_t wal_id, std::string& wal_path);
Status delete_wal(int64_t wal_id, size_t block_queue_pre_allocated = 0);
Status add_recover_wal(int64_t db_id, int64_t table_id, int64_t wal_id, std::string wal);
Expand Down Expand Up @@ -130,8 +130,6 @@ class WalManager {

std::shared_mutex _wal_lock;
std::unordered_map<int64_t, std::string> _wal_path_map;
// TODO no use? need remove it. And the map dose not clear
std::unordered_map<int64_t, std::shared_ptr<WalWriter>> _wal_id_to_writer_map;

// TODO Now only used for debug wal status, consider remove it
std::shared_mutex _wal_status_lock;
Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/group_commit_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -491,8 +491,8 @@ Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& i
Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id,
const std::string& import_label, WalManager* wal_manager,
std::vector<TSlotDescriptor>& slot_desc, int be_exe_version) {
RETURN_IF_ERROR(ExecEnv::GetInstance()->wal_mgr()->add_wal_path(db_id, tb_id, wal_id,
import_label, _wal_base_path));
RETURN_IF_ERROR(ExecEnv::GetInstance()->wal_mgr()->create_wal_path(
db_id, tb_id, wal_id, import_label, _wal_base_path));
_v_wal_writer = std::make_shared<vectorized::VWalWriter>(
tb_id, wal_id, import_label, wal_manager, slot_desc, be_exe_version);
return _v_wal_writer->init();
Expand All @@ -515,7 +515,7 @@ bool LoadBlockQueue::has_enough_wal_disk_space(size_t pre_allocated) {
}
}
if (pre_allocated < available_bytes) {
Status st = wal_mgr->update_wal_dir_pre_allocated(_wal_base_path, pre_allocated, true);
Status st = wal_mgr->update_wal_dir_pre_allocated(_wal_base_path, pre_allocated, 0);
if (!st.ok()) {
LOG(WARNING) << "update wal dir pre_allocated failed, reason: " << st.to_string();
}
Expand Down
2 changes: 1 addition & 1 deletion be/test/vec/exec/vwal_scanner_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ void VWalScannerTest::init() {
_env->_wal_manager = WalManager::create_shared(_env, wal_dir);
std::string base_path;
auto st = _env->_wal_manager->_init_wal_dirs_info();
st = _env->_wal_manager->add_wal_path(db_id, tb_id, txn_id, label, base_path);
st = _env->_wal_manager->create_wal_path(db_id, tb_id, txn_id, label, base_path);
}

TEST_F(VWalScannerTest, normal) {
Expand Down

0 comments on commit e3c9f53

Please sign in to comment.