From 2064296f48d69007086fc38850ee3aa2e7c6a610 Mon Sep 17 00:00:00 2001 From: greatsharp Date: Thu, 7 Aug 2025 21:55:08 +0800 Subject: [PATCH] hotkey analyze --- kvrocks.conf | 25 ++ src/commands/cmd_server.cc | 157 ++++++++++++- src/config/config.cc | 39 ++++ src/config/config.h | 10 + src/server/redis_connection.cc | 41 ++++ src/server/server.cc | 13 ++ src/server/server.h | 3 + src/stats/hot_key.cc | 402 +++++++++++++++++++++++++++++++++ src/stats/hot_key.h | 87 +++++++ 9 files changed, 776 insertions(+), 1 deletion(-) create mode 100644 src/stats/hot_key.cc create mode 100644 src/stats/hot_key.h diff --git a/kvrocks.conf b/kvrocks.conf index 65a820e9ffa..c2a5d96c599 100644 --- a/kvrocks.conf +++ b/kvrocks.conf @@ -1095,3 +1095,28 @@ rocksdb.sst_file_delete_rate_bytes_per_sec 0 ################################ NAMESPACE ##################################### # namespace.test change.me + +################################ HOTKEY ANALYZE ##################################### +# Server will bootstrap hotkey analyze when startup if set yes, default no +hotkey-bootstrap no + +# LRU cache init capacity +hotkey-init-lru-capacity 200000 + +# LRU cache maximum capacity +hotkey-max-lru-capacity 500000 + +# Hotkey deque init size +hotkey-init-deque-size 500000 + +# Hotkey deque maximum size +hotkey-max-deque-size 1000000 + +# Hotkey init threshold +hotkey-init-threshold 5000 + +# Hotkey maximum threshold +hotkey-max-threshold 50000 + +# The maximum hotkey entries return when execute fetch commands +hotkey-max-fetch-entries 10000 \ No newline at end of file diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc index 9bb3461aa2b..86b000ac406 100644 --- a/src/commands/cmd_server.cc +++ b/src/commands/cmd_server.cc @@ -1549,6 +1549,160 @@ class CommandFlushBlockCache : public Commander { } }; +// hotkey enable {capacity} {deque_size} {threshold} +// hotkey disable +// hotkey stats +// hotkey threshold {threshold} +// hotkey getbykey {key} {begin_timestamp_ms} {end_timestamp_ms} +// hotkey getbythreshold {threshold} {begin_timestamp_ms} {end_timestamp_ms} +// hotkey dumplogfile {off|info|warning} +// hotkey timerange {begin_timestamp_ms} {end_timestamp_ms} +class CommandHotkey : public Commander { + public: + Status Execute([[maybe_unused]] engine::Context &ctx, Server *srv, [[maybe_unused]] Connection *conn, + std::string *output) override { + std::string sub_command = util::ToLower(args_[1]); + if ((sub_command == "enable" && args_.size() != 5) || (sub_command == "disable" && args_.size() != 2) || + (sub_command == "stats" && args_.size() != 2) || (sub_command == "threshold" && args_.size() != 3) || + (sub_command == "getbykey" && args_.size() != 3 && args_.size() != 5) || + (sub_command == "getbythreshold" && args_.size() != 3 && args_.size() != 5) || + (sub_command == "dumplogfile" && args_.size() != 3) || + (sub_command == "timerange" && args_.size() != 2 && args_.size() != 4)) { + return {Status::RedisExecErr, errWrongNumOfArguments}; + } + + Config *config = srv->GetConfig(); + int min_capacity = 100000, min_deque_size = 100000, min_threshold = 1; + if (sub_command == "enable") { + auto capacity = ParseInt(args_[2], 10); + if (!capacity || *capacity < min_capacity || *capacity > config->hotkey_max_lru_capacity) { + return {Status::RedisParseErr, fmt::format("capacity must be an integer between {} and {}", min_capacity, + config->hotkey_max_lru_capacity)}; + } + auto deque_size = ParseInt(args_[3], 10); + if (!deque_size || *deque_size < min_deque_size || *deque_size > config->hotkey_max_deque_size) { + return {Status::RedisParseErr, fmt::format("deque size must be an integer between {} and {}", min_deque_size, + config->hotkey_max_deque_size)}; + } + auto threshold = ParseInt(args_[4], 10); + if (!threshold || *threshold < min_threshold || *threshold > config->hotkey_max_threshold) { + return {Status::RedisParseErr, fmt::format("threshold must be an integer between {} and {}", min_threshold, + config->hotkey_max_threshold)}; + } + auto s = srv->hotkey.Enable(*capacity, *deque_size, *threshold); + if (!s.IsOK()) { + return s; + } + *output = redis::RESP_OK; + } else if (sub_command == "disable") { + auto s = srv->hotkey.Disable(); + if (!s.IsOK()) { + return s; + } + *output = redis::RESP_OK; + } else if (sub_command == "stats") { + if (!srv->hotkey.enable_analyze) { + return {Status::RedisExecErr, "please enable hotkey analyze at first"}; + } + *output = srv->hotkey.GetStats(); + } else if (sub_command == "threshold") { + if (!srv->hotkey.enable_analyze) { + return {Status::RedisExecErr, "please enable hotkey analyze at first"}; + } + auto threshold = ParseInt(args_[2], 10); + if (!threshold || *threshold < min_threshold || *threshold > config->hotkey_max_threshold) { + return {Status::RedisParseErr, fmt::format("threshold must be an integer between {} and {}", min_threshold, + config->hotkey_max_threshold)}; + } + srv->hotkey.SetThreshold(*threshold); + *output = redis::RESP_OK; + } else if (sub_command == "timerange") { + if (!srv->hotkey.enable_analyze) { + return {Status::RedisExecErr, "please enable hotkey analyze at first"}; + } + if (args_.size() == 2) { + auto now = util::GetTimeStamp(); + uint64_t begin_timestamp_ms = (now - 1) * 1000; + uint64_t end_timestamp_ms = now * 1000; + *output = srv->hotkey.SearchByTimeRange(config->hotkey_max_fetch_entries, begin_timestamp_ms, end_timestamp_ms); + } else { + auto begin_timestamp_ms = ParseInt(args_[2], 10); + if (!begin_timestamp_ms || *begin_timestamp_ms < 1) { + return {Status::RedisExecErr, "begin timestamp invalid"}; + } + auto end_timestamp_ms = ParseInt(args_[3], 10); + if (!end_timestamp_ms || *end_timestamp_ms < 1) { + return {Status::RedisExecErr, "end timestamp invalid"}; + } + *output = + srv->hotkey.SearchByTimeRange(config->hotkey_max_fetch_entries, *begin_timestamp_ms, *end_timestamp_ms); + } + } else if (sub_command == "getbykey") { + if (!srv->hotkey.enable_analyze) { + return {Status::RedisExecErr, "please enable hotkey analyze at first"}; + } + if (args_.size() == 3) { + *output = srv->hotkey.GetByKeyOrThreshold(config->hotkey_max_fetch_entries, args_[2], 0, 0, 0); + } else { + auto begin_timestamp_ms = ParseInt(args_[3], 10); + if (!begin_timestamp_ms || *begin_timestamp_ms < 1) { + return {Status::RedisExecErr, "begin timestamp invalid"}; + } + auto end_timestamp_ms = ParseInt(args_[4], 10); + if (!end_timestamp_ms || *end_timestamp_ms < 1) { + return {Status::RedisExecErr, "end timestamp invalid"}; + } + *output = srv->hotkey.GetByKeyOrThreshold(config->hotkey_max_fetch_entries, args_[2], 0, *begin_timestamp_ms, + *end_timestamp_ms); + } + } else if (sub_command == "getbythreshold") { + if (!srv->hotkey.enable_analyze) { + return {Status::RedisExecErr, "please enable hotkey analyze at first"}; + } + auto threshold = ParseInt(args_[2], 10); + if (!threshold || *threshold < min_threshold || *threshold > config->hotkey_max_threshold) { + return {Status::RedisParseErr, fmt::format("threshold must be an integer between {} and {}", min_threshold, + config->hotkey_max_threshold)}; + } + if (args_.size() == 3) { + *output = srv->hotkey.GetByKeyOrThreshold(config->hotkey_max_fetch_entries, "", *threshold, 0, 0); + } else { + auto begin_timestamp_ms = ParseInt(args_[3], 10); + if (!begin_timestamp_ms || *begin_timestamp_ms < 1) { + return {Status::RedisExecErr, "begin timestamp invalid"}; + } + auto end_timestamp_ms = ParseInt(args_[4], 10); + if (!end_timestamp_ms || *end_timestamp_ms < 1) { + return {Status::RedisExecErr, "end timestamp invalid"}; + } + *output = srv->hotkey.GetByKeyOrThreshold(config->hotkey_max_fetch_entries, "", *threshold, *begin_timestamp_ms, + *end_timestamp_ms); + } + } else if (sub_command == "dumplogfile") { + if (!srv->hotkey.enable_analyze) { + return {Status::RedisExecErr, "please enable hotkey analyze at first"}; + } + spdlog::level::level_enum level = spdlog::level::off; + if (args_[2] == "info") { + level = spdlog::level::info; + } else if (args_[2] == "warning") { + level = spdlog::level::warn; + } else if (args_[2] == "off") { + level = spdlog::level::off; + } else { + return {Status::RedisExecErr, "dump logfile level should be one of off,info,warning"}; + } + srv->hotkey.SetDumpToLogfileLevel(level); + *output = redis::RESP_OK; + } else { + return {Status::RedisExecErr, + "HOTKEY subcommand must be one of ENABLE, DISABLE, TIMERANGE, THRESHOLD, GETBYKEY, GETBYTHRESHOLD, " + "DUMPLOGFILE, STATS"}; + } + return Status::OK(); + } +}; + REDIS_REGISTER_COMMANDS( Server, MakeCmdAttr("auth", 2, "read-only ok-loading auth", NO_KEY), MakeCmdAttr("ping", -1, "read-only", NO_KEY), @@ -1592,5 +1746,6 @@ REDIS_REGISTER_COMMANDS( MakeCmdAttr("pollupdates", -2, "read-only admin", NO_KEY), MakeCmdAttr("sst", -3, "write exclusive admin", 1, 1, 1), MakeCmdAttr("flushmemtable", -1, "exclusive write", NO_KEY), - MakeCmdAttr("flushblockcache", 1, "exclusive write", NO_KEY), ) + MakeCmdAttr("flushblockcache", 1, "exclusive write", NO_KEY), + MakeCmdAttr("hotkey", -2, "read-only", NO_KEY), ) } // namespace redis diff --git a/src/config/config.cc b/src/config/config.cc index 09770eede06..6ac156e1cc1 100644 --- a/src/config/config.cc +++ b/src/config/config.cc @@ -243,6 +243,14 @@ Config::Config() { {"txn-context-enabled", true, new YesNoField(&txn_context_enabled, false)}, {"skip-block-cache-deallocation-on-close", false, new YesNoField(&skip_block_cache_deallocation_on_close, false)}, {"histogram-bucket-boundaries", true, new StringField(&histogram_bucket_boundaries_str_, "")}, + {"hotkey-bootstrap", false, new YesNoField(&hotkey_bootstrap, false)}, + {"hotkey-init-lru-capacity", false, new IntField(&hotkey_init_lru_capacity, 200000, 100000, INT_MAX)}, + {"hotkey-max-lru-capacity", false, new IntField(&hotkey_max_lru_capacity, 500000, 100000, INT_MAX)}, + {"hotkey-init-deque-size", false, new IntField(&hotkey_init_deque_size, 500000, 100000, INT_MAX)}, + {"hotkey-max-deque-size", false, new IntField(&hotkey_max_deque_size, 1000000, 100000, INT_MAX)}, + {"hotkey-init-threshold", false, new IntField(&hotkey_init_threshold, 5000, 1, INT_MAX)}, + {"hotkey-max-threshold", false, new IntField(&hotkey_max_threshold, 50000, 1, INT_MAX)}, + {"hotkey-max-fetch-entries", false, new IntField(&hotkey_max_fetch_entries, 10000, 1, INT_MAX)}, /* rocksdb options */ {"rocksdb.compression", false, @@ -470,6 +478,31 @@ void Config::initFieldCallback() { return Status::OK(); }; + auto set_hotkey_cb = [this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string &k, + [[maybe_unused]] const std::string &v) -> Status { + if ((k == "hotkey-init-lru-capacity" || k == "hotkey-max-lru-capacity") && + hotkey_init_lru_capacity > hotkey_max_lru_capacity) { + if (k == "hotkey-init-lru-capacity") { + return {Status::NotOK, "hotkey-init-lru-capacity should <= hotkey-max-lru-capacity"}; + } + return {Status::NotOK, "hotkey-max-lru-capacity should >= hotkey-init-lru-capacity"}; + } + if ((k == "hotkey-init-deque-size" || k == "hotkey-max-deque-size") && + hotkey_init_deque_size > hotkey_max_deque_size) { + if (k == "hotkey-init-deque-size") { + return {Status::NotOK, "hotkey-init-deque-size should <= hotkey-max-deque-size"}; + } + return {Status::NotOK, "hotkey-max-deque-size should >= hotkey-init-deque-size"}; + } + if ((k == "hotkey-init-threshold" || k == "hotkey-max-threshold") && hotkey_init_threshold > hotkey_max_threshold) { + if (k == "hotkey-init-threshold") { + return {Status::NotOK, "hotkey-init-threshold should <= hotkey-max-threshold"}; + } + return {Status::NotOK, "hotkey-max-threshold should >= hotkey-init-threshold"}; + } + return Status::OK(); + }; + std::map callbacks = { {"workers", @@ -803,6 +836,12 @@ void Config::initFieldCallback() { } return Status::OK(); }}, + {"hotkey-init-lru-capacity", set_hotkey_cb}, + {"hotkey-max-lru-capacity", set_hotkey_cb}, + {"hotkey-init-deque-size", set_hotkey_cb}, + {"hotkey-max-deque-size", set_hotkey_cb}, + {"hotkey-init-threshold", set_hotkey_cb}, + {"hotkey-max-threshold", set_hotkey_cb}, }; for (const auto &iter : callbacks) { auto field_iter = fields_.find(iter.first); diff --git a/src/config/config.h b/src/config/config.h index 2e774c97045..e4e67bbd513 100644 --- a/src/config/config.h +++ b/src/config/config.h @@ -195,6 +195,16 @@ struct Config { std::vector histogram_bucket_boundaries; + // hotkey analyze + bool hotkey_bootstrap = false; + int hotkey_init_lru_capacity = 200000; + int hotkey_max_lru_capacity = 500000; + int hotkey_init_deque_size = 500000; + int hotkey_max_deque_size = 1000000; + int hotkey_init_threshold = 5000; + int hotkey_max_threshold = 50000; + int hotkey_max_fetch_entries = 10000; + struct RocksDB { int block_size; bool cache_index_and_filter_blocks; diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc index f113e004183..3934f850f7f 100644 --- a/src/server/redis_connection.cc +++ b/src/server/redis_connection.cc @@ -387,6 +387,39 @@ static bool IsCmdAllowedInStaleData(const std::string &cmd_name) { return cmd_name == "info" || cmd_name == "slaveof" || cmd_name == "config"; } +static std::string GetRedisTypeOfUserKey([[maybe_unused]] const std::string &cmd_name, CommandCategory cmd_cat) { + switch (cmd_cat) { + case CommandCategory::Bit: + return "BitMap"; + case CommandCategory::BloomFilter: + return "BloomFilter"; + case CommandCategory::Geo: + return "Geo"; + case CommandCategory::Hash: + return "Hash"; + case CommandCategory::HLL: + return "HyperLogLog"; + case CommandCategory::JSON: + return "JSON"; + case CommandCategory::List: + return "List"; + case CommandCategory::Set: + return "Set"; + case CommandCategory::SortedInt: + return "SortedInt"; + case CommandCategory::Stream: + return "Stream"; + case CommandCategory::String: + return "String"; + case CommandCategory::ZSet: + return "ZSet"; + case CommandCategory::TDigest: + return "TDigest"; + default: + return "none"; + } +} + void Connection::ExecuteCommands(std::deque *to_process_cmds) { const Config *config = srv_->GetConfig(); std::string reply; @@ -596,6 +629,14 @@ void Connection::ExecuteCommands(std::deque *to_process_cmds) { if (!reply.empty()) Reply(reply); reply.clear(); + + // Hotkey analyze + if (srv_->hotkey.enable_analyze) { + std::string redis_type = GetRedisTypeOfUserKey(cmd_name, attributes->category); + if (redis_type != "none") { + srv_->hotkey.UpdateCounter(cmd_tokens[1], redis_type); + } + } } } diff --git a/src/server/server.cc b/src/server/server.cc index 90d793391cd..a7f64d002bd 100644 --- a/src/server/server.cc +++ b/src/server/server.cc @@ -118,6 +118,19 @@ Server::Server(engine::Storage *storage, Config *config) slow_log_.SetMaxEntries(config->slowlog_max_len); slow_log_.SetDumpToLogfileLevel(config->slowlog_dump_logfile_level); perf_log_.SetMaxEntries(config->profiling_sample_record_max_len); + + if (config->hotkey_bootstrap) { + uint32_t capacity = config->hotkey_init_lru_capacity, deque_size = config->hotkey_init_deque_size, + threshold = config->hotkey_init_threshold; + Status s = hotkey.Enable(capacity, deque_size, threshold); + if (!s.IsOK()) { + error("[server] Failed to enable hotkey analyze with capacity: {}, deque size: {}, threshold: {}", capacity, + deque_size, threshold); + exit(1); + } + info("[server] Enable hotkey analyze with capacity: {}, deque size: {}, threshold: {}", capacity, deque_size, + threshold); + } } Server::~Server() { diff --git a/src/server/server.h b/src/server/server.h index ecaa4c94b11..526931334f1 100644 --- a/src/server/server.h +++ b/src/server/server.h @@ -50,6 +50,7 @@ #include "search/index_manager.h" #include "search/indexer.h" #include "server/redis_connection.h" +#include "stats/hot_key.h" #include "stats/log_collector.h" #include "stats/stats.h" #include "storage/redis_metadata.h" @@ -327,6 +328,8 @@ class Server { std::shared_lock WorkConcurrencyGuard(); std::unique_lock WorkExclusivityGuard(); + Hotkey hotkey; + Stats stats; engine::Storage *storage; MemoryProfiler memory_profiler; diff --git a/src/stats/hot_key.cc b/src/stats/hot_key.cc new file mode 100644 index 00000000000..34c5e0844fb --- /dev/null +++ b/src/stats/hot_key.cc @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "hot_key.h" + +#include "common/time_util.h" +#include "server/redis_reply.h" +#include "thread_util.h" + +Hotkey::~Hotkey() { + if (enable_analyze) { + enable_analyze = false; + if (auto s = util::ThreadJoin(refresh_ts_thread_); !s) { + warn("[hotkey] refresh timestamp thread operation failed: {}", s.Msg()); + } + } +} + +void Hotkey::UpdateCounter(const std::string& key, const std::string& redis_type) { + std::lock_guard lg(mutex_); + if (!enable_analyze) { + return; + } + + auto now = cached_timestamp_ms_; + + if (auto it = map_.find(key); it != map_.end()) { + auto key_counter = it->second; + uint32_t current = key_counter->counter; + if (isInSameSecond(now, key_counter->last_access_timestamp_ms)) { + ++key_counter->counter; + } else { + if (current >= threshold_) { + deque_.emplace_back(HotkeyEntry{current, threshold_, key_counter->last_access_timestamp_ms, redis_type, key}); + if (deque_.size() > deque_size_) { + deque_.pop_front(); + } + key_counter->enqueued = true; + dumpToLogFile(deque_.back()); + } + // reset the counter + key_counter->counter = 1; + } + key_counter->last_access_timestamp_ms = now; + // move to list head + list_.splice(list_.begin(), list_, key_counter); + return; + } + + // insert new element at list head + list_.emplace_front(KeyCounter{false, 1, now, redis_type, key}); + map_[key] = list_.begin(); + + if (list_.size() > capacity_) { + auto key_counter = list_.back(); + if (key_counter.counter >= threshold_ && !key_counter.enqueued) { + deque_.emplace_back(HotkeyEntry{key_counter.counter, threshold_, key_counter.last_access_timestamp_ms, redis_type, + key_counter.key}); + if (deque_.size() > deque_size_) { + deque_.pop_front(); + } + dumpToLogFile(deque_.back()); + } + map_.erase(key_counter.key); + list_.pop_back(); + } +} + +void Hotkey::SetThreshold(uint32_t threshold) { + std::lock_guard lg(mutex_); + if (!enable_analyze) { + return; + } + + threshold_ = threshold; +} + +void Hotkey::SetDumpToLogfileLevel(spdlog::level::level_enum level) { + std::lock_guard lg(mutex_); + if (!enable_analyze) { + return; + } + + dump_to_logfile_level_ = level; +} + +void Hotkey::dumpToLogFile(const HotkeyEntry& entry) const { + if (dump_to_logfile_level_ == spdlog::level::off) { + return; + } + + log(dump_to_logfile_level_, "[hotkey] key: {}, redis_type: {}, count: {}, threshold: {}, timestamp: {}", entry.key, + entry.redis_type, entry.count, entry.threshold, entry.timestamp_ms); +} + +Status Hotkey::Enable(uint32_t capacity, uint32_t deque_size, uint32_t threshold) { + std::lock_guard lg(mutex_); + if (enable_analyze) { + return {Status::NotOK, "please disable hotkey analyze at first"}; + } + + capacity_ = capacity; + deque_size_ = deque_size; + threshold_ = threshold; + cached_timestamp_ms_ = util::GetTimeStampMS(); + clean(); + enable_analyze = true; + refresh_ts_thread_ = GET_OR_RET(util::CreateThread("hotkey-ts", [this] { + while (enable_analyze) { + cached_timestamp_ms_ = util::GetTimeStampMS(); + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + } + })); + + return Status::OK(); +} + +Status Hotkey::Disable() { + std::lock_guard lg(mutex_); + if (enable_analyze) { + enable_analyze = false; + clean(); + if (auto s = util::ThreadJoin(refresh_ts_thread_); !s) { + warn("[hotkey] timestamp refresh thread operation failed: {}", s.Msg()); + return s; + } + } + + return Status::OK(); +} + +std::string Hotkey::SearchByTimeRange(uint32_t max_fetch_entries, uint64_t begin_timestamp_ms, + uint64_t end_timestamp_ms) { + std::string output; + + if (max_fetch_entries == 0 || begin_timestamp_ms >= end_timestamp_ms) { + return redis::MultiLen(0); + } + + auto begin = std::chrono::high_resolution_clock::now(); + std::unique_lock lock(mutex_); + if (!enable_analyze) { + return redis::MultiLen(0); + } + if (deque_.empty()) { + return redis::MultiLen(0); + } + + if (deque_.size() < max_fetch_entries) { + max_fetch_entries = deque_.size(); + } + std::vector hotkeys; + hotkeys.reserve(max_fetch_entries); + uint32_t numbers = 0; + for (auto& it : deque_) { + if (it.timestamp_ms >= begin_timestamp_ms && it.timestamp_ms < end_timestamp_ms) { + hotkeys.emplace_back(it); + ++numbers; + if (numbers == max_fetch_entries) { + break; + } + } + } + lock.unlock(); + auto elapsed1 = + std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - begin).count(); + info("[hotkey] deque size {} filter between {} and {} cost {} us", deque_.size(), begin_timestamp_ms, + end_timestamp_ms, elapsed1); + + if (hotkeys.empty()) { + return redis::MultiLen(0); + } + + std::sort(hotkeys.begin(), hotkeys.end(), + [](const HotkeyEntry& a, const HotkeyEntry& b) { return a.timestamp_ms < b.timestamp_ms; }); + + std::string entries; + for (auto& it : hotkeys) { + entries.append(redis::MultiLen(8)); + entries.append(redis::SimpleString("key")); + entries.append(redis::SimpleString(it.key)); + entries.append(redis::SimpleString("redis_type")); + entries.append(redis::SimpleString(it.redis_type)); + entries.append(redis::SimpleString("count")); + entries.append(redis::Integer(it.count)); + entries.append(redis::SimpleString("timestamp")); + entries.append(redis::Integer(it.timestamp_ms)); + } + output.append(redis::MultiLen(hotkeys.size())); + output.append(entries); + auto elapsed2 = + std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - begin).count(); + info("[hotkey] deque size {} filter and serialize between {} and {} cost {} us", deque_.size(), begin_timestamp_ms, + end_timestamp_ms, elapsed2); + + return output; +} + +std::string Hotkey::GetByKeyOrThreshold(uint32_t max_fetch_entries, const std::string& key, uint32_t threshold, + uint64_t begin_timestamp_ms, uint64_t end_timestamp_ms) { + std::string output; + + if (max_fetch_entries == 0) { + return redis::MultiLen(0); + } + if (begin_timestamp_ms != 0 && end_timestamp_ms != 0 && begin_timestamp_ms >= end_timestamp_ms) { + return redis::MultiLen(0); + } + + bool by_threshold = false, by_key = false; + if (key != "" && threshold == 0) { + by_key = true; + } else if (key == "" && threshold > 0) { + by_threshold = true; + } else { + return redis::MultiLen(0); + } + + auto begin = std::chrono::high_resolution_clock::now(); + std::unique_lock lock(mutex_); + if (!enable_analyze) { + return redis::MultiLen(0); + } + if (deque_.empty()) { + return redis::MultiLen(0); + } + + if (deque_.size() < max_fetch_entries) { + max_fetch_entries = deque_.size(); + } + std::vector hotkeys; + hotkeys.reserve(max_fetch_entries); + uint32_t numbers = 0; + if (by_threshold) { + if (begin_timestamp_ms != 0 && end_timestamp_ms != 0) { + for (auto& it : deque_) { + if (it.count >= threshold && it.timestamp_ms >= begin_timestamp_ms && it.timestamp_ms < end_timestamp_ms) { + hotkeys.emplace_back(it); + ++numbers; + if (numbers == max_fetch_entries) { + break; + } + } + } + } else { + for (auto& it : deque_) { + if (it.count >= threshold) { + hotkeys.emplace_back(it); + ++numbers; + if (numbers == max_fetch_entries) { + break; + } + } + } + } + } else { + if (begin_timestamp_ms != 0 && end_timestamp_ms != 0) { + for (auto& it : deque_) { + if (it.key == key && it.timestamp_ms >= begin_timestamp_ms && it.timestamp_ms < end_timestamp_ms) { + hotkeys.emplace_back(it); + ++numbers; + if (numbers == max_fetch_entries) { + break; + } + } + } + } else { + for (auto& it : deque_) { + if (it.key == key) { + hotkeys.emplace_back(it); + ++numbers; + if (numbers == max_fetch_entries) { + break; + } + } + } + } + } + lock.unlock(); + auto elapsed1 = + std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - begin).count(); + if (by_key) { + if (begin_timestamp_ms != 0 && end_timestamp_ms != 0) { + info("[hotkey] deque size {} filter by key {} between {} and {} cost {} us", deque_.size(), key, + begin_timestamp_ms, end_timestamp_ms, elapsed1); + } else { + info("[hotkey] deque size {} filter by key {} cost {} us", deque_.size(), key, elapsed1); + } + } else { + if (begin_timestamp_ms != 0 && end_timestamp_ms != 0) { + info("[hotkey] deque size {} filter by threshold {} between {} and {} cost {} us", deque_.size(), threshold, + begin_timestamp_ms, end_timestamp_ms, elapsed1); + } else { + info("[hotkey] deque size {} filter by threshold {} cost {} us", deque_.size(), threshold, elapsed1); + } + } + + if (hotkeys.empty()) { + return redis::MultiLen(0); + } + + std::sort(hotkeys.begin(), hotkeys.end(), + [](const HotkeyEntry& a, const HotkeyEntry& b) { return a.timestamp_ms < b.timestamp_ms; }); + + std::string entries; + if (by_threshold) { + for (auto& it : hotkeys) { + entries.append(redis::MultiLen(8)); + entries.append(redis::SimpleString("key")); + entries.append(redis::SimpleString(it.key)); + entries.append(redis::SimpleString("redis_type")); + entries.append(redis::SimpleString(it.redis_type)); + entries.append(redis::SimpleString("count")); + entries.append(redis::Integer(it.count)); + entries.append(redis::SimpleString("timestamp")); + entries.append(redis::Integer(it.timestamp_ms)); + } + } else { + for (auto& it : hotkeys) { + entries.append(redis::MultiLen(6)); + entries.append(redis::SimpleString("redis_type")); + entries.append(redis::SimpleString(it.redis_type)); + entries.append(redis::SimpleString("count")); + entries.append(redis::Integer(it.count)); + entries.append(redis::SimpleString("timestamp")); + entries.append(redis::Integer(it.timestamp_ms)); + } + } + output.append(redis::MultiLen(hotkeys.size())); + output.append(entries); + auto elapsed2 = + std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - begin).count(); + if (by_key) { + if (begin_timestamp_ms != 0 && end_timestamp_ms != 0) { + info("[hotkey] deque size {} filter by key {} between {} and {} and serialize cost {} us", deque_.size(), key, + begin_timestamp_ms, end_timestamp_ms, elapsed2); + } else { + info("[hotkey] deque size {} filter by key {} and serialize cost {} us", deque_.size(), key, elapsed2); + } + } else { + if (begin_timestamp_ms != 0 && end_timestamp_ms != 0) { + info("[hotkey] deque size {} filter by threshold {} between {} and {} and serialize cost {} us", deque_.size(), + threshold, begin_timestamp_ms, end_timestamp_ms, elapsed2); + } else { + info("[hotkey] deque size {} filter by threshold {} and serialize cost {} us", deque_.size(), threshold, + elapsed2); + } + } + + return output; +} + +std::string Hotkey::GetStats() { + std::lock_guard lg(mutex_); + if (!enable_analyze) { + return redis::MultiLen(0); + } + + std::string output; + output.append(redis::MultiLen(12)); + output.append(redis::SimpleString("lru_cache_capacity")); + output.append(redis::Integer(capacity_)); + output.append(redis::SimpleString("lru_cache_keys")); + output.append(redis::Integer(list_.size())); + output.append(redis::SimpleString("threshold")); + output.append(redis::Integer(threshold_)); + output.append(redis::SimpleString("hotkey_deque_size")); + output.append(redis::Integer(deque_size_)); + output.append(redis::SimpleString("hotkey_entries")); + output.append(redis::Integer(deque_.size())); + output.append(redis::SimpleString("hotkey_first_entry_timestamp")); + if (deque_.empty()) { + output.append(redis::Integer(0)); + } else { + output.append(redis::Integer(deque_.front().timestamp_ms)); + } + + return output; +} + +void Hotkey::clean() { + list_.clear(); + map_.clear(); + deque_.clear(); +} \ No newline at end of file diff --git a/src/stats/hot_key.h b/src/stats/hot_key.h new file mode 100644 index 00000000000..4ab195b7542 --- /dev/null +++ b/src/stats/hot_key.h @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "status.h" + +struct HotkeyEntry { + uint32_t count = 0; + uint32_t threshold = 0; + uint64_t timestamp_ms = 0; + std::string redis_type; + std::string key; +}; + +struct KeyCounter { + bool enqueued = false; + uint32_t counter = 0; + uint64_t last_access_timestamp_ms = 0; + std::string redis_type; + std::string key; +}; + +class Hotkey { + public: + Hotkey() = default; + ~Hotkey(); + Hotkey(const Hotkey &) = delete; + Hotkey &operator=(const Hotkey &) = delete; + Hotkey(Hotkey &&) = delete; + Hotkey &operator=(Hotkey &&) = delete; + + Status Enable(uint32_t capacity, uint32_t deque_size, uint32_t threshold); + Status Disable(); + std::string SearchByTimeRange(uint32_t max_fetch_entries, uint64_t begin_timestamp_ms, uint64_t end_timestamp_ms); + std::string GetByKeyOrThreshold(uint32_t max_fetch_entries, const std::string &key, uint32_t threshold, + uint64_t begin_timestamp_ms, uint64_t end_timestamp_ms); + std::string GetStats(); + void SetThreshold(uint32_t threshold); + void UpdateCounter(const std::string &key, const std::string &redis_type); + void SetDumpToLogfileLevel(spdlog::level::level_enum level); + + bool enable_analyze = false; // guard by mutext_ + + private: + void clean(); + static inline bool isInSameSecond(uint64_t timestamp_ms1, uint64_t timestamp_ms2) { + return timestamp_ms1 / 1000 == timestamp_ms2 / 1000; + } + void dumpToLogFile(const HotkeyEntry &entry) const; + + mutable std::mutex mutex_; + uint32_t capacity_ = 0; + uint32_t deque_size_ = 0; + uint32_t threshold_ = 0; + spdlog::level::level_enum dump_to_logfile_level_ = spdlog::level::off; + std::list list_; + std::unordered_map::iterator> map_; + std::deque deque_; + + uint64_t cached_timestamp_ms_ = 0; + std::thread refresh_ts_thread_; +}; \ No newline at end of file