Skip to content

Commit f444546

Browse files
committed
[fix](cloud) shorten cache lock held time and add metrics
when update bvar metrics, we held block lock in the critical context of cache lock, make the later lock held too long and affect other cache logic. we use unsafe method to update the bvar to boost performance. some key metrics of lock and other meaningful metrics are also added for better monitoring cache time costs. Signed-off-by: zhengyu <[email protected]>
1 parent 780024a commit f444546

8 files changed

+123
-90
lines changed

be/src/common/config.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -1060,7 +1060,8 @@ DEFINE_Bool(enable_ttl_cache_evict_using_lru, "true");
10601060
DEFINE_mBool(enbale_dump_error_file, "false");
10611061
// limit the max size of error log on disk
10621062
DEFINE_mInt64(file_cache_error_log_limit_bytes, "209715200"); // 200MB
1063-
DEFINE_mInt64(cache_lock_long_tail_threshold, "1000");
1063+
DEFINE_mInt64(cache_lock_wait_long_tail_threshold_us, "30000000");
1064+
DEFINE_mInt64(cache_lock_held_long_tail_threshold_us, "30000000");
10641065
DEFINE_mBool(enable_file_cache_keep_base_compaction_output, "false");
10651066
DEFINE_mInt64(file_cache_remove_block_qps_limit, "1000");
10661067

be/src/common/config.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -1100,7 +1100,8 @@ DECLARE_Bool(enable_ttl_cache_evict_using_lru);
11001100
DECLARE_mBool(enbale_dump_error_file);
11011101
// limit the max size of error log on disk
11021102
DECLARE_mInt64(file_cache_error_log_limit_bytes);
1103-
DECLARE_mInt64(cache_lock_long_tail_threshold);
1103+
DECLARE_mInt64(cache_lock_wait_long_tail_threshold_us);
1104+
DECLARE_mInt64(cache_lock_held_long_tail_threshold_us);
11041105
// Base compaction may retrieve and produce some less frequently accessed data,
11051106
// potentially affecting the file cache hit rate.
11061107
// This configuration determines whether to retain the output within the file cache.

be/src/io/cache/block_file_cache.cpp

+76-57
Original file line numberDiff line numberDiff line change
@@ -202,10 +202,16 @@ BlockFileCache::BlockFileCache(const std::string& cache_base_path,
202202
_disk_limit_mode_metrics = std::make_shared<bvar::Status<size_t>>(
203203
_cache_base_path.c_str(), "file_cache_disk_limit_mode", 0);
204204

205-
_storage_sync_remove_latency = std::make_shared<bvar::LatencyRecorder>(
206-
_cache_base_path.c_str(), "file_cache_storage_sync_remove_latency_ns");
207-
_storage_async_remove_latency = std::make_shared<bvar::LatencyRecorder>(
208-
_cache_base_path.c_str(), "file_cache_storage_async_remove_latency_ns");
205+
_cache_lock_wait_time_us = std::make_shared<bvar::LatencyRecorder>(
206+
_cache_base_path.c_str(), "file_cache_cache_lock_wait_time_us");
207+
_get_or_set_latency_us = std::make_shared<bvar::LatencyRecorder>(
208+
_cache_base_path.c_str(), "file_cache_get_or_set_latency_us");
209+
_storage_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
210+
_cache_base_path.c_str(), "file_cache_storage_sync_remove_latency_us");
211+
_storage_retry_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
212+
_cache_base_path.c_str(), "file_cache_storage_retry_sync_remove_latency_us");
213+
_storage_async_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
214+
_cache_base_path.c_str(), "file_cache_storage_async_remove_latency_us");
209215

210216
_disposable_queue = LRUQueue(cache_settings.disposable_queue_size,
211217
cache_settings.disposable_queue_elements, 60 * 60);
@@ -259,7 +265,7 @@ FileCacheType BlockFileCache::string_to_cache_type(const std::string& str) {
259265

260266
BlockFileCache::QueryFileCacheContextHolderPtr BlockFileCache::get_query_context_holder(
261267
const TUniqueId& query_id) {
262-
SCOPED_CACHE_LOCK(_mutex);
268+
SCOPED_CACHE_LOCK(_mutex, this);
263269
if (!config::enable_file_cache_query_limit) {
264270
return {};
265271
}
@@ -277,7 +283,7 @@ BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_query_context(
277283
}
278284

279285
void BlockFileCache::remove_query_context(const TUniqueId& query_id) {
280-
SCOPED_CACHE_LOCK(_mutex);
286+
SCOPED_CACHE_LOCK(_mutex, this);
281287
const auto& query_iter = _query_map.find(query_id);
282288

283289
if (query_iter != _query_map.end() && query_iter->second.use_count() <= 1) {
@@ -322,7 +328,7 @@ void BlockFileCache::QueryFileCacheContext::reserve(const UInt128Wrapper& hash,
322328
}
323329

324330
Status BlockFileCache::initialize() {
325-
SCOPED_CACHE_LOCK(_mutex);
331+
SCOPED_CACHE_LOCK(_mutex, this);
326332
return initialize_unlocked(cache_lock);
327333
}
328334

@@ -541,7 +547,7 @@ std::string BlockFileCache::clear_file_cache_async() {
541547
int64_t num_files_all = 0;
542548
TEST_SYNC_POINT_CALLBACK("BlockFileCache::clear_file_cache_async");
543549
{
544-
SCOPED_CACHE_LOCK(_mutex);
550+
SCOPED_CACHE_LOCK(_mutex, this);
545551

546552
std::vector<FileBlockCell*> deleting_cells;
547553
for (auto& [_, offset_to_cell] : _files) {
@@ -699,35 +705,39 @@ FileBlocksHolder BlockFileCache::get_or_set(const UInt128Wrapper& hash, size_t o
699705
sw.start();
700706
std::lock_guard cache_lock(_mutex);
701707
stats->lock_wait_timer += sw.elapsed_time();
702-
703-
if (auto iter = _key_to_time.find(hash);
704-
context.cache_type == FileCacheType::INDEX && iter != _key_to_time.end()) {
705-
context.cache_type = FileCacheType::TTL;
706-
context.expiration_time = iter->second;
707-
}
708-
709-
/// Get all blocks which intersect with the given range.
710708
FileBlocks file_blocks;
709+
int64_t duration;
711710
{
712-
SCOPED_RAW_TIMER(&stats->get_timer);
713-
file_blocks = get_impl(hash, context, range, cache_lock);
714-
}
711+
SCOPED_RAW_TIMER(&duration);
712+
if (auto iter = _key_to_time.find(hash);
713+
context.cache_type == FileCacheType::INDEX && iter != _key_to_time.end()) {
714+
context.cache_type = FileCacheType::TTL;
715+
context.expiration_time = iter->second;
716+
}
715717

716-
if (file_blocks.empty()) {
717-
SCOPED_RAW_TIMER(&stats->set_timer);
718-
file_blocks = split_range_into_cells(hash, context, offset, size, FileBlock::State::EMPTY,
719-
cache_lock);
720-
} else {
721-
SCOPED_RAW_TIMER(&stats->set_timer);
722-
fill_holes_with_empty_file_blocks(file_blocks, hash, context, range, cache_lock);
723-
}
724-
DCHECK(!file_blocks.empty());
725-
*_num_read_blocks << file_blocks.size();
726-
for (auto& block : file_blocks) {
727-
if (block->state() == FileBlock::State::DOWNLOADED) {
728-
*_num_hit_blocks << 1;
718+
/// Get all blocks which intersect with the given range.
719+
{
720+
SCOPED_RAW_TIMER(&stats->get_timer);
721+
file_blocks = get_impl(hash, context, range, cache_lock);
722+
}
723+
724+
if (file_blocks.empty()) {
725+
SCOPED_RAW_TIMER(&stats->set_timer);
726+
file_blocks = split_range_into_cells(hash, context, offset, size,
727+
FileBlock::State::EMPTY, cache_lock);
728+
} else {
729+
SCOPED_RAW_TIMER(&stats->set_timer);
730+
fill_holes_with_empty_file_blocks(file_blocks, hash, context, range, cache_lock);
731+
}
732+
DCHECK(!file_blocks.empty());
733+
*_num_read_blocks << file_blocks.size();
734+
for (auto& block : file_blocks) {
735+
if (block->state_unsafe() == FileBlock::State::DOWNLOADED) {
736+
*_num_hit_blocks << 1;
737+
}
729738
}
730739
}
740+
*_get_or_set_latency_us << (duration / 1000);
731741
return FileBlocksHolder(std::move(file_blocks));
732742
}
733743

@@ -781,7 +791,7 @@ BlockFileCache::FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& ha
781791
}
782792

783793
size_t BlockFileCache::try_release() {
784-
SCOPED_CACHE_LOCK(_mutex);
794+
SCOPED_CACHE_LOCK(_mutex, this);
785795
std::vector<FileBlockCell*> trash;
786796
for (auto& [hash, blocks] : _files) {
787797
for (auto& [offset, cell] : blocks) {
@@ -1075,7 +1085,7 @@ bool BlockFileCache::remove_if_ttl_file_blocks(const UInt128Wrapper& file_key, b
10751085
// remove specific cache synchronously, for critical operations
10761086
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
10771087
void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) {
1078-
SCOPED_CACHE_LOCK(_mutex);
1088+
SCOPED_CACHE_LOCK(_mutex, this);
10791089
bool is_ttl_file = remove_if_ttl_file_blocks(file_key, true, cache_lock, true);
10801090
if (!is_ttl_file) {
10811091
auto iter = _files.find(file_key);
@@ -1097,7 +1107,7 @@ void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) {
10971107
// cache meta is deleted synchronously if not in use, and the block file is deleted asynchronously
10981108
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
10991109
void BlockFileCache::remove_if_cached_async(const UInt128Wrapper& file_key) {
1100-
SCOPED_CACHE_LOCK(_mutex);
1110+
SCOPED_CACHE_LOCK(_mutex, this);
11011111
bool is_ttl_file = remove_if_ttl_file_blocks(file_key, true, cache_lock, /*sync*/ false);
11021112
if (!is_ttl_file) {
11031113
auto iter = _files.find(file_key);
@@ -1321,9 +1331,12 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lo
13211331
key.meta.expiration_time = expiration_time;
13221332
if (sync) {
13231333
int64_t duration_ns = 0;
1324-
SCOPED_RAW_TIMER(&duration_ns);
1325-
Status st = _storage->remove(key);
1326-
*_storage_sync_remove_latency << duration_ns;
1334+
Status st;
1335+
{
1336+
SCOPED_RAW_TIMER(&duration_ns);
1337+
st = _storage->remove(key);
1338+
}
1339+
*_storage_sync_remove_latency_us << (duration_ns / 1000);
13271340
if (!st.ok()) {
13281341
LOG_WARNING("").error(st);
13291342
}
@@ -1335,9 +1348,12 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lo
13351348
if (!ret) {
13361349
LOG_WARNING("Failed to push recycle key to queue, do it synchronously");
13371350
int64_t duration_ns = 0;
1338-
SCOPED_RAW_TIMER(&duration_ns);
1339-
Status st = _storage->remove(key);
1340-
*_storage_sync_remove_latency << duration_ns;
1351+
Status st;
1352+
{
1353+
SCOPED_RAW_TIMER(&duration_ns);
1354+
st = _storage->remove(key);
1355+
}
1356+
*_storage_retry_sync_remove_latency_us << (duration_ns / 1000);
13411357
if (!st.ok()) {
13421358
LOG_WARNING("").error(st);
13431359
}
@@ -1360,7 +1376,7 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lo
13601376
}
13611377

13621378
size_t BlockFileCache::get_used_cache_size(FileCacheType cache_type) const {
1363-
SCOPED_CACHE_LOCK(_mutex);
1379+
SCOPED_CACHE_LOCK(_mutex, this);
13641380
return get_used_cache_size_unlocked(cache_type, cache_lock);
13651381
}
13661382

@@ -1370,7 +1386,7 @@ size_t BlockFileCache::get_used_cache_size_unlocked(FileCacheType cache_type,
13701386
}
13711387

13721388
size_t BlockFileCache::get_available_cache_size(FileCacheType cache_type) const {
1373-
SCOPED_CACHE_LOCK(_mutex);
1389+
SCOPED_CACHE_LOCK(_mutex, this);
13741390
return get_available_cache_size_unlocked(cache_type, cache_lock);
13751391
}
13761392

@@ -1381,7 +1397,7 @@ size_t BlockFileCache::get_available_cache_size_unlocked(
13811397
}
13821398

13831399
size_t BlockFileCache::get_file_blocks_num(FileCacheType cache_type) const {
1384-
SCOPED_CACHE_LOCK(_mutex);
1400+
SCOPED_CACHE_LOCK(_mutex, this);
13851401
return get_file_blocks_num_unlocked(cache_type, cache_lock);
13861402
}
13871403

@@ -1465,7 +1481,7 @@ std::string BlockFileCache::LRUQueue::to_string(
14651481
}
14661482

14671483
std::string BlockFileCache::dump_structure(const UInt128Wrapper& hash) {
1468-
SCOPED_CACHE_LOCK(_mutex);
1484+
SCOPED_CACHE_LOCK(_mutex, this);
14691485
return dump_structure_unlocked(hash, cache_lock);
14701486
}
14711487

@@ -1483,7 +1499,7 @@ std::string BlockFileCache::dump_structure_unlocked(const UInt128Wrapper& hash,
14831499
}
14841500

14851501
std::string BlockFileCache::dump_single_cache_type(const UInt128Wrapper& hash, size_t offset) {
1486-
SCOPED_CACHE_LOCK(_mutex);
1502+
SCOPED_CACHE_LOCK(_mutex, this);
14871503
return dump_single_cache_type_unlocked(hash, offset, cache_lock);
14881504
}
14891505

@@ -1546,7 +1562,7 @@ std::string BlockFileCache::reset_capacity(size_t new_capacity) {
15461562
ss << "finish reset_capacity, path=" << _cache_base_path;
15471563
auto start_time = steady_clock::time_point();
15481564
{
1549-
SCOPED_CACHE_LOCK(_mutex);
1565+
SCOPED_CACHE_LOCK(_mutex, this);
15501566
if (new_capacity < _capacity && new_capacity < _cur_cache_size) {
15511567
int64_t need_remove_size = _cur_cache_size - new_capacity;
15521568
auto remove_blocks = [&](LRUQueue& queue) -> int64_t {
@@ -1662,7 +1678,7 @@ void BlockFileCache::run_background_monitor() {
16621678
}
16631679
// report
16641680
{
1665-
SCOPED_CACHE_LOCK(_mutex);
1681+
SCOPED_CACHE_LOCK(_mutex, this);
16661682
_cur_cache_size_metrics->set_value(_cur_cache_size);
16671683
_cur_ttl_cache_size_metrics->set_value(_cur_cache_size -
16681684
_index_queue.get_capacity(cache_lock) -
@@ -1712,7 +1728,7 @@ void BlockFileCache::run_background_ttl_gc() { // TODO(zhengyu): fix!
17121728
}
17131729
{
17141730
int64_t cur_time = UnixSeconds();
1715-
SCOPED_CACHE_LOCK(_mutex);
1731+
SCOPED_CACHE_LOCK(_mutex, this);
17161732
while (!_time_to_key.empty()) {
17171733
auto begin = _time_to_key.begin();
17181734
if (cur_time < begin->first) {
@@ -1743,9 +1759,12 @@ void BlockFileCache::run_background_gc() {
17431759
}
17441760

17451761
int64_t duration_ns = 0;
1746-
SCOPED_RAW_TIMER(&duration_ns);
1747-
Status st = _storage->remove(key);
1748-
*_storage_async_remove_latency << duration_ns;
1762+
Status st;
1763+
{
1764+
SCOPED_RAW_TIMER(&duration_ns);
1765+
st = _storage->remove(key);
1766+
}
1767+
*_storage_async_remove_latency_us << (duration_ns / 1000);
17491768

17501769
if (!st.ok()) {
17511770
LOG_WARNING("").error(st);
@@ -1758,7 +1777,7 @@ void BlockFileCache::run_background_gc() {
17581777

17591778
void BlockFileCache::modify_expiration_time(const UInt128Wrapper& hash,
17601779
uint64_t new_expiration_time) {
1761-
SCOPED_CACHE_LOCK(_mutex);
1780+
SCOPED_CACHE_LOCK(_mutex, this);
17621781
// 1. If new_expiration_time is equal to zero
17631782
if (new_expiration_time == 0) {
17641783
remove_if_ttl_file_blocks(hash, false, cache_lock, false);
@@ -1818,7 +1837,7 @@ BlockFileCache::get_hot_blocks_meta(const UInt128Wrapper& hash) const {
18181837
int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
18191838
std::chrono::steady_clock::now().time_since_epoch())
18201839
.count();
1821-
SCOPED_CACHE_LOCK(_mutex);
1840+
SCOPED_CACHE_LOCK(_mutex, this);
18221841
std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>> blocks_meta;
18231842
if (auto iter = _files.find(hash); iter != _files.end()) {
18241843
for (auto& pair : _files.find(hash)->second) {
@@ -1887,7 +1906,7 @@ std::string BlockFileCache::clear_file_cache_directly() {
18871906
using namespace std::chrono;
18881907
std::stringstream ss;
18891908
auto start = steady_clock::now();
1890-
SCOPED_CACHE_LOCK(_mutex);
1909+
SCOPED_CACHE_LOCK(_mutex, this);
18911910
LOG_INFO("start clear_file_cache_directly").tag("path", _cache_base_path);
18921911

18931912
std::string clear_msg;
@@ -1925,7 +1944,7 @@ std::string BlockFileCache::clear_file_cache_directly() {
19251944

19261945
std::map<size_t, FileBlockSPtr> BlockFileCache::get_blocks_by_key(const UInt128Wrapper& hash) {
19271946
std::map<size_t, FileBlockSPtr> offset_to_block;
1928-
SCOPED_CACHE_LOCK(_mutex);
1947+
SCOPED_CACHE_LOCK(_mutex, this);
19291948
if (_files.contains(hash)) {
19301949
for (auto& [offset, cell] : _files[hash]) {
19311950
if (cell.file_block->state() == FileBlock::State::DOWNLOADED) {
@@ -1940,7 +1959,7 @@ std::map<size_t, FileBlockSPtr> BlockFileCache::get_blocks_by_key(const UInt128W
19401959
}
19411960

19421961
void BlockFileCache::update_ttl_atime(const UInt128Wrapper& hash) {
1943-
SCOPED_CACHE_LOCK(_mutex);
1962+
SCOPED_CACHE_LOCK(_mutex, this);
19441963
if (auto iter = _files.find(hash); iter != _files.end()) {
19451964
for (auto& [_, cell] : iter->second) {
19461965
cell.update_atime();

0 commit comments

Comments
 (0)