From de4272b782db451d03d4fff04de82f84437690ed Mon Sep 17 00:00:00 2001 From: Rivers Jin Date: Sun, 23 Mar 2025 19:48:13 +0800 Subject: [PATCH 01/14] feat(cluster): disconnect replication connections to orphan slaves --- src/cluster/cluster.cc | 156 +++++++++------- src/cluster/cluster.h | 13 +- src/cluster/cluster_defs.h | 1 + src/cluster/replication.cc | 67 +++++-- src/cluster/replication.h | 3 + src/commands/cmd_cluster.cc | 2 +- src/commands/cmd_replication.cc | 174 +++++++++++------- src/common/parse_util.cc | 5 +- src/common/parse_util.h | 128 +++++-------- src/common/type_util.h | 11 ++ src/server/redis_connection.h | 68 ++++++- src/server/server.cc | 38 +++- src/server/server.h | 1 + src/server/worker.cc | 2 +- src/storage/storage.h | 1 - .../integration/cluster/cluster_test.go | 5 + 16 files changed, 422 insertions(+), 253 deletions(-) diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc index 10dbf0e8380..46cfc9c00c8 100644 --- a/src/cluster/cluster.cc +++ b/src/cluster/cluster.cc @@ -23,14 +23,18 @@ #include #include +#include #include #include #include +#include +#include #include #include "cluster/cluster_defs.h" #include "commands/commander.h" #include "common/io_util.h" +#include "fmt/base.h" #include "fmt/format.h" #include "parse_util.h" #include "replication.h" @@ -38,9 +42,14 @@ #include "string_util.h" #include "time_util.h" -ClusterNode::ClusterNode(std::string id, std::string host, int port, int role, std::string master_id, - const std::bitset &slots) - : id(std::move(id)), host(std::move(host)), port(port), role(role), master_id(std::move(master_id)), slots(slots) {} +ClusterNode::ClusterNode(std::string &&id, std::string &&host, int port, int role, std::string &&master_id, + std::bitset &&slots) + : id(std::move(id)), + host(std::move(host)), + port(port), + role(role), + master_id(std::move(master_id)), + slots(std::move(slots)) {} Cluster::Cluster(Server *srv, std::vector binds, int port) : srv_(srv), binds_(std::move(binds)), port_(port) { @@ -68,8 +77,8 @@ Status Cluster::SetNodeId(const std::string &node_id) { myid_ = node_id; // Already has cluster topology - if (version_ >= 0 && nodes_.find(node_id) != nodes_.end()) { - myself_ = nodes_[myid_]; + if (version_ >= 0 && nodes_->count(node_id) > 0) { + myself_ = nodes_->at(node_id); } else { myself_ = nullptr; } @@ -96,11 +105,13 @@ Status Cluster::SetSlotRanges(const std::vector &slot_ranges, const s } // Get the node which we want to assign slots into it - std::shared_ptr to_assign_node = nodes_[node_id]; - if (to_assign_node == nullptr) { + auto it = nodes_->find(node_id); + if (it == nodes_->end()) { return {Status::NotOK, "No this node in the cluster"}; } + auto to_assign_node = it->second; + if (to_assign_node->role != kClusterMaster) { return {Status::NotOK, errNoMasterNode}; } @@ -160,26 +171,32 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b if (version_ == version) return Status::OK(); } - ClusterNodes nodes; + // ClusterNodes nodes; + auto nodes = std::make_unique(); std::unordered_map slots_nodes; - Status s = parseClusterNodes(nodes_str, &nodes, &slots_nodes); + Status s = parseClusterNodes(nodes_str, nodes.get(), &slots_nodes); if (!s.IsOK()) return s; // Update version and cluster topology version_ = version; - nodes_ = nodes; + nodes_ = std::move(nodes); size_ = 0; // Update slots to nodes for (const auto &[slot, node_id] : slots_nodes) { - slots_nodes_[slot] = nodes_[node_id]; + auto it = nodes_->find(node_id); + if (it == nodes_->end()) { + return {Status::NotOK, "No this node in the cluster"}; + } + slots_nodes_[slot] = it->second; } // Update replicas info and size - for (const auto &[node_id, node] : nodes_) { + for (const auto &[node_id, node] : *nodes_) { if (node->role == kClusterSlave) { - if (nodes_.find(node->master_id) != nodes_.end()) { - nodes_[node->master_id]->replicas.push_back(node_id); + auto it = nodes_->find(node->master_id); + if (it != nodes_->end()) { + it->second->replicas.emplace_back(node_id); } } if (node->role == kClusterMaster && node->slots.count() > 0) { @@ -188,7 +205,7 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b } if (myid_.empty() || force) { - for (const auto &[node_id, node] : nodes_) { + for (const auto &[node_id, node] : *nodes_) { if (node->port == port_ && util::MatchListeningIP(binds_, node->host)) { myid_ = node_id; break; @@ -197,8 +214,11 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b } myself_ = nullptr; - if (!myid_.empty() && nodes_.find(myid_) != nodes_.end()) { - myself_ = nodes_[myid_]; + if (!myid_.empty()) { + auto it = nodes_->find(myid_); + if (it != nodes_->end()) { + myself_ = it->second; + } } // Set replication relationship @@ -252,11 +272,14 @@ Status Cluster::SetMasterSlaveRepl() { srv_->slot_migrator->SetStopMigrationFlag(false); LOG(INFO) << "Change server role to master, restart migration task"; } + if (!is_slave) { + srv_->CleanupOrphanSlaves(version_, *nodes_); + } return Status::OK(); } - auto it = nodes_.find(myself_->master_id); - if (it != nodes_.end()) { + auto it = nodes_->find(myself_->master_id); + if (it != nodes_->end()) { // Replica mode and master node is existing std::shared_ptr master = it->second; auto s = srv_->AddMaster(master->host, master->port, false); @@ -308,10 +331,13 @@ Status Cluster::SetSlotRangeImported(const SlotRange &slot_range) { Status Cluster::MigrateSlotRange(const SlotRange &slot_range, const std::string &dst_node_id, SyncMigrateContext *blocking_ctx) { - if (nodes_.find(dst_node_id) == nodes_.end()) { + auto dst_node_it = nodes_->find(dst_node_id); + if (dst_node_it == nodes_->end()) { return {Status::NotOK, "Can't find the destination node id"}; } + const auto &dst_node = dst_node_it->second; + if (!slot_range.IsValid()) { return {Status::NotOK, errSlotRangeInvalid}; } @@ -331,17 +357,16 @@ Status Cluster::MigrateSlotRange(const SlotRange &slot_range, const std::string return {Status::NotOK, "Slave can't migrate slot"}; } - if (nodes_[dst_node_id]->role != kClusterMaster) { + if (dst_node->role != kClusterMaster) { return {Status::NotOK, "Can't migrate slot to a slave"}; } - if (nodes_[dst_node_id] == myself_) { + if (dst_node == myself_) { return {Status::NotOK, "Can't migrate slot to myself"}; } - const auto &dst = nodes_[dst_node_id]; - Status s = - srv_->slot_migrator->PerformSlotRangeMigration(dst_node_id, dst->host, dst->port, slot_range, blocking_ctx); + Status s = srv_->slot_migrator->PerformSlotRangeMigration(dst_node_id, dst_node->host, dst_node->port, slot_range, + blocking_ctx); return s; } @@ -415,27 +440,18 @@ Status Cluster::GetClusterInfo(std::string *cluster_infos) { if (slots_node != nullptr) ok_slot++; } - *cluster_infos = + *cluster_infos = fmt::format( "cluster_state:ok\r\n" - "cluster_slots_assigned:" + - std::to_string(ok_slot) + - "\r\n" - "cluster_slots_ok:" + - std::to_string(ok_slot) + - "\r\n" + "cluster_slots_assigned:{ok_slot}\r\n" + "cluster_slots_ok:{ok_slot}\r\n" "cluster_slots_pfail:0\r\n" "cluster_slots_fail:0\r\n" - "cluster_known_nodes:" + - std::to_string(nodes_.size()) + - "\r\n" - "cluster_size:" + - std::to_string(size_) + - "\r\n" - "cluster_current_epoch:" + - std::to_string(version_) + - "\r\n" - "cluster_my_epoch:" + - std::to_string(version_) + "\r\n"; + "cluster_known_nodes:{nodes_size}\r\n" + "cluster_size:{size}\r\n" + "cluster_current_epoch:{version}\r\n" + "cluster_my_epoch:{version}\r\n", + fmt::arg("ok_slot", ok_slot), fmt::arg("nodes_size", nodes_->size()), fmt::arg("size", size_), + fmt::arg("version", version_)); if (myself_ != nullptr && myself_->role == kClusterMaster && !srv_->IsSlave()) { // Get migrating status @@ -495,8 +511,11 @@ SlotInfo Cluster::genSlotNodeInfo(int start, int end, const std::shared_ptrhost, n->port, n->id}); // itself for (const auto &id : n->replicas) { // replicas - if (nodes_.find(id) == nodes_.end()) continue; - vn.push_back({nodes_[id]->host, nodes_[id]->port, nodes_[id]->id}); + auto it = nodes_->find(id); + if (it == nodes_->end()) { + continue; + } + vn.push_back({it->second->host, it->second->port, it->second->id}); } return {start, end, vn}; @@ -518,8 +537,8 @@ StatusOr Cluster::GetReplicas(const std::string &node_id) { return {Status::RedisClusterDown, errClusterNoInitialized}; } - auto item = nodes_.find(node_id); - if (item == nodes_.end()) { + auto item = nodes_->find(node_id); + if (item == nodes_->end()) { return {Status::InvalidArgument, errInvalidNodeID}; } @@ -531,8 +550,8 @@ StatusOr Cluster::GetReplicas(const std::string &node_id) { auto now = util::GetTimeStampMS(); std::string replicas_desc; for (const auto &replica_id : node->replicas) { - auto n = nodes_.find(replica_id); - if (n == nodes_.end()) { + auto n = nodes_->find(replica_id); + if (n == nodes_->end()) { continue; } @@ -565,7 +584,7 @@ std::string Cluster::genNodesDescription() { auto now = util::GetTimeStampMS(); std::string nodes_desc; - for (const auto &[_, node] : nodes_) { + for (const auto &[_, node] : *nodes_) { std::string node_str; // ID, host, port node_str.append(node->id + " "); @@ -647,7 +666,7 @@ std::string Cluster::genNodesInfo() const { auto slots_infos = getClusterNodeSlots(); std::string nodes_info; - for (const auto &[_, node] : nodes_) { + for (const auto &[_, node] : *nodes_) { std::string node_str; node_str.append("node "); // ID @@ -740,10 +759,8 @@ Status Cluster::parseClusterNodes(const std::string &nodes_str, ClusterNodes *no return {Status::ClusterInvalidInfo, errInvalidClusterNodeInfo}; } - nodes->clear(); - // Parse all nodes - for (const auto &node_str : nodes_info) { + for (auto &node_str : nodes_info) { std::vector fields = util::Split(node_str, " "); if (fields.size() < 5) { return {Status::ClusterInvalidInfo, errInvalidClusterNodeInfo}; @@ -754,10 +771,10 @@ Status Cluster::parseClusterNodes(const std::string &nodes_str, ClusterNodes *no return {Status::ClusterInvalidInfo, errInvalidNodeID}; } - std::string id = fields[0]; + std::string &id = fields[0]; // 2) host, TODO(@shooterit): check host is valid - std::string host = fields[1]; + std::string &host = fields[1]; // 3) port auto parse_result = ParseInt(fields[2], 10); @@ -790,7 +807,9 @@ Status Cluster::parseClusterNodes(const std::string &nodes_str, ClusterNodes *no return {Status::ClusterInvalidInfo, errInvalidClusterNodeInfo}; } else { // Create slave node - (*nodes)[id] = std::make_shared(id, host, port, role, master_id, slots); + auto node = std::make_shared(std::move(id), std::move(host), port, role, std::move(master_id), + std::move(slots)); + nodes->insert({node->id, node}); continue; } } @@ -816,6 +835,8 @@ Status Cluster::parseClusterNodes(const std::string &nodes_str, ClusterNodes *no if (slots_nodes->find(start) != slots_nodes->end()) { return {Status::ClusterInvalidInfo, errSlotOverlapped}; } else { + // It's ok cause the value of the `slots_nodes` is `std::string`, + // a deep copy of the `id` is created. (*slots_nodes)[start] = id; } } @@ -844,7 +865,9 @@ Status Cluster::parseClusterNodes(const std::string &nodes_str, ClusterNodes *no } // Create master node - (*nodes)[id] = std::make_shared(id, host, port, role, master_id, slots); + auto node = std::make_shared(std::move(id), std::move(host), port, role, std::move(master_id), + std::move(slots)); + nodes->insert({node->id, node}); } return Status::OK(); @@ -931,9 +954,8 @@ Status Cluster::CanExecByMySelf(const redis::CommandAttributes *attributes, cons return Status::OK(); // I'm serving the imported slot } - if (myself_ && myself_->role == kClusterSlave && !(flags & redis::kCmdWrite) && - nodes_.find(myself_->master_id) != nodes_.end() && nodes_[myself_->master_id] == slots_nodes_[slot] && - conn->IsFlagEnabled(redis::Connection::kReadOnly)) { + if (myself_ && myself_->role == kClusterSlave && !(flags & redis::kCmdWrite) && nodes_->count(myself_->master_id) && + nodes_->at(myself_->master_id) == slots_nodes_[slot] && conn->IsFlagEnabled(redis::Connection::kReadOnly)) { return Status::OK(); // My master is serving this slot } @@ -966,7 +988,7 @@ Status Cluster::Reset() { myid_.clear(); myself_.reset(); - nodes_.clear(); + nodes_.reset(); for (auto &n : slots_nodes_) { n = nullptr; } @@ -977,3 +999,13 @@ Status Cluster::Reset() { unlink(srv_->GetConfig()->NodesFilePath().data()); return Status::OK(); } + +// Note that if current version is lower than the given version, +// it can't be determined whether the node is in the cluster, so just regard it as in the cluster. +bool Cluster::IsInCluster(const std::string &node_id, int64_t version) const { + if (version < 0 || this->version_ < version) { + return true; + } + + return nodes_->count(node_id) > 0; +} \ No newline at end of file diff --git a/src/cluster/cluster.h b/src/cluster/cluster.h index 468c154d4d8..a1ffca26a20 100644 --- a/src/cluster/cluster.h +++ b/src/cluster/cluster.h @@ -22,10 +22,13 @@ #include #include +#include +#include #include #include #include #include +#include #include #include @@ -36,11 +39,12 @@ #include "server/redis_connection.h" #include "status.h" #include "storage/scripting.h" +#include "type_util.h" class ClusterNode { public: - explicit ClusterNode(std::string id, std::string host, int port, int role, std::string master_id, - const std::bitset &slots); + explicit ClusterNode(std::string &&id, std::string &&host, int port, int role, std::string &&master_id, + std::bitset &&slots); std::string id; std::string host; int port; @@ -62,7 +66,7 @@ struct SlotInfo { std::vector nodes; }; -using ClusterNodes = std::unordered_map>; +using ClusterNodes = std::unordered_map, StringHash, StringEqual>; class Server; class SyncMigrateContext; @@ -93,6 +97,7 @@ class Cluster { Status DumpClusterNodes(const std::string &file); Status LoadClusterNodes(const std::string &file_path); Status Reset(); + bool IsInCluster(const std::string &node_id, int64_t version) const; static bool SubCommandIsExecExclusive(const std::string &subcommand); @@ -111,7 +116,7 @@ class Cluster { int64_t version_ = -1; std::string myid_; std::shared_ptr myself_; - ClusterNodes nodes_; + std::unique_ptr nodes_; std::shared_ptr slots_nodes_[kClusterSlots]; std::map migrated_slots_; diff --git a/src/cluster/cluster_defs.h b/src/cluster/cluster_defs.h index 12fa568d69e..0ac2c9d2985 100644 --- a/src/cluster/cluster_defs.h +++ b/src/cluster/cluster_defs.h @@ -41,6 +41,7 @@ inline constexpr const char *errNoMasterNode = "The node isn't a master"; inline constexpr const char *errClusterNoInitialized = "The cluster is not initialized"; inline constexpr const char *errInvalidClusterNodeInfo = "Invalid cluster nodes info"; inline constexpr const char *errInvalidImportState = "Invalid import state"; +inline constexpr const char *errYouAreFired = "You are fired"; /// SlotRange is a range of cluster slots covering [start, end], /// where the valid values for start and end are [0, kClusterSlots). diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc index 78a0a02c23c..d7e59170f71 100644 --- a/src/cluster/replication.cc +++ b/src/cluster/replication.cc @@ -41,11 +41,9 @@ #include "io_util.h" #include "rocksdb/write_batch.h" #include "rocksdb_crc32c.h" -#include "scope_exit.h" #include "server/redis_reply.h" #include "server/server.h" #include "status.h" -#include "storage/batch_debugger.h" #include "thread_util.h" #include "time_util.h" #include "unique_fd.h" @@ -58,7 +56,7 @@ Status FeedSlaveThread::Start() { auto s = util::CreateThread("feed-replica", [this] { - sigset_t mask, omask; + sigset_t mask{}, omask{}; sigemptyset(&mask); sigemptyset(&omask); sigaddset(&mask, SIGCHLD); @@ -459,6 +457,12 @@ ReplicationThread::CBState ReplicationThread::replConfWriteCB(bufferevent *bev) data_to_send.emplace_back("ip-address"); data_to_send.emplace_back(config->replica_announce_ip); } + if (!next_try_without_peer_id_) { + data_to_send.emplace_back("peer-id"); + data_to_send.emplace_back(srv_->cluster->GetMyId()); + data_to_send.emplace_back("version"); + data_to_send.emplace_back(std::to_string(srv_->cluster->GetVersion())); + } SendString(bev, redis::ArrayOfBulkStrings(data_to_send)); repl_state_.store(kReplReplConf, std::memory_order_relaxed); LOG(INFO) << "[replication] replconf request was sent, waiting for response"; @@ -470,26 +474,40 @@ ReplicationThread::CBState ReplicationThread::replConfReadCB(bufferevent *bev) { UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT); if (!line) return CBState::AGAIN; + auto resp = line.View(); + // on unknown option: first try without announce ip, if it fails again - do nothing (to prevent infinite loop) - if (isUnknownOption(line.View()) && !next_try_without_announce_ip_address_) { - next_try_without_announce_ip_address_ = true; - LOG(WARNING) << "The old version master, can't handle ip-address, " - << "try without it again"; - // Retry previous state, i.e. send replconf again - return CBState::PREV; + if (isUnknownOption(resp)) { + if (!next_try_without_peer_id_) { + next_try_without_peer_id_ = true; + LOG(WARNING) << "The old version master, can't handle peer-id, try without it again"; + return CBState::PREV; + } + if (!next_try_without_announce_ip_address_) { + next_try_without_announce_ip_address_ = true; + LOG(WARNING) << "The old version master, can't handle ip-address, try without it again"; + return CBState::PREV; + } } - if (line[0] == '-' && isRestoringError(line.View())) { + + if (ResponseLineIsOK(resp)) { + LOG(INFO) << "[replication] replconf is ok, start psync"; + return CBState::NEXT; + } + + if (isRestoringError(resp)) { LOG(WARNING) << "The master was restoring the db, retry later"; return CBState::RESTART; } - if (!ResponseLineIsOK(line.View())) { - LOG(WARNING) << "[replication] Failed to replconf: " << line.get() + 1; - // backward compatible with old version that doesn't support replconf cmd - return CBState::NEXT; - } else { - LOG(INFO) << "[replication] replconf is ok, start psync"; - return CBState::NEXT; + + if (isNodeFired(resp)) { + LOG(ERROR) << "The master has fired the node, stop the replication"; + return CBState::QUIT; } + + LOG(WARNING) << "[replication] Failed to replconf: " << line.get() + 1; + // backward compatible with old version that doesn't support replconf cmd + return CBState::NEXT; } ReplicationThread::CBState ReplicationThread::tryPSyncWriteCB(bufferevent *bev) { @@ -772,6 +790,7 @@ Status ReplicationThread::parallelFetchFile(const std::string &dir, std::atomic fetch_cnt = {0}; std::atomic skip_cnt = {0}; std::vector> results; + results.reserve(concurrency); for (size_t tid = 0; tid < concurrency; ++tid) { results.push_back( std::async(std::launch::async, [this, dir, &files, tid, concurrency, &fetch_cnt, &skip_cnt]() -> Status { @@ -1034,12 +1053,22 @@ Status ReplicationThread::parseWriteBatch(const rocksdb::WriteBatch &write_batch bool ReplicationThread::isRestoringError(std::string_view err) { // err doesn't contain the CRLF, so cannot use redis::Error here. - return err == RESP_PREFIX_ERROR + redis::StatusToRedisErrorMsg({Status::RedisLoading, redis::errRestoringBackup}); + static const auto msg = fmt::format(RESP_PREFIX_ERROR "{}", + redis::StatusToRedisErrorMsg({Status::RedisLoading, redis::errRestoringBackup})); + return err == msg; } bool ReplicationThread::isWrongPsyncNum(std::string_view err) { // err doesn't contain the CRLF, so cannot use redis::Error here. - return err == RESP_PREFIX_ERROR + redis::StatusToRedisErrorMsg({Status::NotOK, redis::errWrongNumOfArguments}); + static const auto msg = + fmt::format(RESP_PREFIX_ERROR "{}", redis::StatusToRedisErrorMsg({Status::NotOK, redis::errWrongNumOfArguments})); + return err == msg; +} + +bool ReplicationThread::isNodeFired(std::string_view err) { + static const auto msg = + fmt::format(RESP_PREFIX_ERROR "{}", redis::StatusToRedisErrorMsg({Status::NotOK, errYouAreFired})); + return err == msg; } bool ReplicationThread::isUnknownOption(std::string_view err) { diff --git a/src/cluster/replication.h b/src/cluster/replication.h index 75c545e08c3..25034761434 100644 --- a/src/cluster/replication.h +++ b/src/cluster/replication.h @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -159,6 +160,7 @@ class ReplicationThread : private EventCallbackBase { std::atomic repl_state_; std::atomic last_io_time_secs_ = 0; bool next_try_old_psync_ = false; + bool next_try_without_peer_id_ = false; bool next_try_without_announce_ip_address_ = false; std::function pre_fullsync_cb_; @@ -209,6 +211,7 @@ class ReplicationThread : private EventCallbackBase { static bool isRestoringError(std::string_view err); static bool isWrongPsyncNum(std::string_view err); static bool isUnknownOption(std::string_view err); + static bool isNodeFired(std::string_view err); Status parseWriteBatch(const rocksdb::WriteBatch &write_batch); }; diff --git a/src/commands/cmd_cluster.cc b/src/commands/cmd_cluster.cc index 7a16ddd9ab1..519bd827c32 100644 --- a/src/commands/cmd_cluster.cc +++ b/src/commands/cmd_cluster.cc @@ -292,7 +292,7 @@ class CommandClusterX : public Commander { } else { return {Status::RedisExecErr, "Invalid cluster command options"}; } - if (need_persist_nodes_info && srv->GetConfig()->persist_cluster_nodes_enabled) { + if (need_persist_nodes_info && srv->GetConfig()->persist_cluster_nodes_enabled && srv->cluster->GetVersion() >= 0) { return srv->cluster->DumpClusterNodes(srv->GetConfig()->NodesFilePath()); } return Status::OK(); diff --git a/src/commands/cmd_replication.cc b/src/commands/cmd_replication.cc index ab288ca8d75..69ef4efd312 100644 --- a/src/commands/cmd_replication.cc +++ b/src/commands/cmd_replication.cc @@ -18,12 +18,22 @@ * */ +#include +#include +#include +#include + +#include "cluster/cluster_defs.h" #include "commander.h" #include "error_constants.h" +#include "fmt/format.h" #include "io_util.h" #include "scope_exit.h" +#include "server/redis_connection.h" #include "server/redis_reply.h" #include "server/server.h" +#include "stats/log_collector.h" +#include "status.h" #include "thread_util.h" #include "time_util.h" #include "unique_fd.h" @@ -57,11 +67,12 @@ class CommandPSync : public Commander { } Status Execute([[maybe_unused]] engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override { - LOG(INFO) << "Slave " << conn->GetAddr() << ", listening port: " << conn->GetListeningPort() - << ", announce ip: " << conn->GetAnnounceIP() << " asks for synchronization" - << " with next sequence: " << next_repl_seq_ - << " replication id: " << (replica_replid_.length() ? replica_replid_ : "not supported") - << ", and local sequence: " << srv->storage->LatestSeqNumber(); + auto peer_info = conn->GetPeerInfo()->ToString(); + + LOG(INFO) << fmt::format( + "Slave {} asks for synchronization with next sequence: {} " + "replication id: {} and local sequence: {}", + peer_info, next_repl_seq_, (new_psync_ ? replica_replid_ : "not supported"), srv->storage->LatestSeqNumber()); bool need_full_sync = false; @@ -166,41 +177,66 @@ class CommandReplConf : public Commander { return Commander::Parse(args); } - Status ParseParam(const std::string &option, const std::string &value) { + Status ParseParam(std::string_view option, std::string_view value) { if (option == "listening-port") { auto parse_result = ParseInt(value, NumericRange{1, PORT_LIMIT - 1}, 10); if (!parse_result) { return {Status::RedisParseErr, "listening-port should be number or out of range"}; } - port_ = *parse_result; - } else if (option == "ip-address") { - if (value == "") { + return Status::OK(); + } + + if (option == "ip-address") { + if (value.empty()) { return {Status::RedisParseErr, "ip-address should not be empty"}; } - ip_address_ = value; - } else { - return {Status::RedisParseErr, errUnknownOption}; + ip_ = value; + return Status::OK(); } - return Status::OK(); + if (option == "peer-id") { + if (value.empty()) { + return {Status::RedisParseErr, "peer-id should not be empty"}; + } + peer_id_ = value; + return Status::OK(); + } + + if (option == "version") { + auto parse_result = ParseInt(value, 10); + if (!parse_result) { + return {Status::RedisParseErr, "version should be number"}; + } + peer_version_ = *parse_result; + return Status::OK(); + } + + return {Status::RedisParseErr, errUnknownOption}; } Status Execute([[maybe_unused]] engine::Context &ctx, [[maybe_unused]] Server *srv, Connection *conn, std::string *output) override { - if (port_ != 0) { - conn->SetListeningPort(port_); + if (peer_version_ >= 0 && !srv->cluster->IsInCluster(peer_id_, peer_version_)) { + return {Status::NotOK, errYouAreFired}; } - if (!ip_address_.empty()) { - conn->SetAnnounceIP(ip_address_); + + if (ip_.empty()) { + ip_ = conn->GetIP(); } + + auto peer_info = std::make_unique(ip_, port_, peer_id_, peer_version_); + conn->SetPeerInfo(std::move(peer_info)); + *output = redis::RESP_OK; return Status::OK(); } private: - int port_ = 0; - std::string ip_address_; + std::string ip_; + uint32_t port_; + std::string peer_id_; + int64_t peer_version_ = -1; }; class CommandFetchMeta : public Commander { @@ -210,7 +246,7 @@ class CommandFetchMeta : public Commander { Status Execute([[maybe_unused]] engine::Context &ctx, Server *srv, Connection *conn, [[maybe_unused]] std::string *output) override { int repl_fd = conn->GetFD(); - std::string ip = conn->GetAnnounceIP(); + std::string_view peer_info = conn->GetPeerInfo()->ToString(); auto s = util::SockSetBlocking(repl_fd, 1); if (!s.IsOK()) { @@ -222,7 +258,7 @@ class CommandFetchMeta : public Commander { srv->stats.IncrFullSyncCount(); // Feed-replica-meta thread - auto t = GET_OR_RET(util::CreateThread("feed-repl-info", [srv, repl_fd, ip, bev = conn->GetBufferEvent()] { + auto t = GET_OR_RET(util::CreateThread("feed-repl-info", [srv, repl_fd, peer_info, bev = conn->GetBufferEvent()] { srv->IncrFetchFileThread(); auto exit = MakeScopeExit([srv, bev] { bufferevent_free(bev); @@ -241,9 +277,9 @@ class CommandFetchMeta : public Commander { } // Send full data file info if (auto s = util::SockSend(repl_fd, files + CRLF, bev)) { - LOG(INFO) << "[replication] Succeed sending full data file info to " << ip; + LOG(INFO) << fmt::format("[replication] Succeed sending full data file info to {}: {}", peer_info, files); } else { - LOG(WARNING) << "[replication] Fail to send full data file info " << ip << ", error: " << s.Msg(); + LOG(WARNING) << fmt::format("[replication] Failed to send full data file info to {}: {}", peer_info, s.Msg()); } auto now_secs = static_cast(util::GetTimeStamp()); srv->storage->SetCheckpointAccessTimeSecs(now_secs); @@ -269,7 +305,7 @@ class CommandFetchFile : public Commander { std::vector files = util::Split(files_str_, ","); int repl_fd = conn->GetFD(); - std::string ip = conn->GetAnnounceIP(); + std::string_view peer_info = conn->GetPeerInfo()->ToString(); auto s = util::SockSetBlocking(repl_fd, 1); if (!s.IsOK()) { @@ -279,51 +315,53 @@ class CommandFetchFile : public Commander { conn->NeedNotFreeBufferEvent(); // Feed-replica-file thread will close the replica bufferevent conn->EnableFlag(redis::Connection::kCloseAsync); - auto t = GET_OR_RET(util::CreateThread("feed-repl-file", [srv, repl_fd, ip, files, bev = conn->GetBufferEvent()]() { - auto exit = MakeScopeExit([bev] { bufferevent_free(bev); }); - srv->IncrFetchFileThread(); - - for (const auto &file : files) { - if (srv->IsStopped()) break; - - uint64_t file_size = 0, max_replication_bytes = 0; - if (srv->GetConfig()->max_replication_mb > 0 && srv->GetFetchFileThreadNum() != 0) { - max_replication_bytes = (srv->GetConfig()->max_replication_mb * MiB) / srv->GetFetchFileThreadNum(); - } - auto start = std::chrono::high_resolution_clock::now(); - auto fd = UniqueFD(engine::Storage::ReplDataManager::OpenDataFile(srv->storage, file, &file_size)); - if (!fd) break; - - // Send file size and content - auto s = util::SockSend(repl_fd, std::to_string(file_size) + CRLF, bev); - if (s) { - s = util::SockSendFile(repl_fd, *fd, file_size, bev); - } - if (s) { - LOG(INFO) << "[replication] Succeed sending file " << file << " to " << ip; - } else { - LOG(WARNING) << "[replication] Fail to send file " << file << " to " << ip << ", error: " << s.Msg(); - break; - } - fd.Close(); - - // Sleep if the speed of sending file is more than replication speed limit - auto end = std::chrono::high_resolution_clock::now(); - uint64_t duration = std::chrono::duration_cast(end - start).count(); - if (max_replication_bytes > 0) { - auto shortest = static_cast(static_cast(file_size) / - static_cast(max_replication_bytes) * (1000 * 1000)); - if (duration < shortest) { - LOG(INFO) << "[replication] Need to sleep " << (shortest - duration) / 1000 - << " ms since of sending files too quickly"; - usleep(shortest - duration); + auto t = GET_OR_RET( + util::CreateThread("feed-repl-file", [srv, repl_fd, peer_info, files, bev = conn->GetBufferEvent()]() { + auto exit = MakeScopeExit([bev] { bufferevent_free(bev); }); + srv->IncrFetchFileThread(); + + for (const auto &file : files) { + if (srv->IsStopped()) break; + + uint64_t file_size = 0, max_replication_bytes = 0; + if (srv->GetConfig()->max_replication_mb > 0 && srv->GetFetchFileThreadNum() != 0) { + max_replication_bytes = (srv->GetConfig()->max_replication_mb * MiB) / srv->GetFetchFileThreadNum(); + } + auto start = std::chrono::high_resolution_clock::now(); + auto fd = UniqueFD(engine::Storage::ReplDataManager::OpenDataFile(srv->storage, file, &file_size)); + if (!fd) break; + + // Send file size and content + auto s = util::SockSend(repl_fd, std::to_string(file_size) + CRLF, bev); + if (s) { + s = util::SockSendFile(repl_fd, *fd, file_size, bev); + } + if (s) { + LOG(INFO) << fmt::format("[replication] Succeed sending file {} to {} with size: {}", file, peer_info, + file_size); + } else { + LOG(WARNING) << fmt::format("[replication] Fail to send file {} to {}: {}", file, peer_info, s.Msg()); + break; + } + fd.Close(); + + // Sleep if the speed of sending file is more than replication speed limit + auto end = std::chrono::high_resolution_clock::now(); + uint64_t duration = std::chrono::duration_cast(end - start).count(); + if (max_replication_bytes > 0) { + auto shortest = static_cast(static_cast(file_size) / + static_cast(max_replication_bytes) * (1000 * 1000)); + if (duration < shortest) { + LOG(INFO) << "[replication] Need to sleep " << (shortest - duration) / 1000 + << " ms since of sending files too quickly"; + usleep(shortest - duration); + } + } } - } - } - auto now_secs = util::GetTimeStamp(); - srv->storage->SetCheckpointAccessTimeSecs(now_secs); - srv->DecrFetchFileThread(); - })); + auto now_secs = util::GetTimeStamp(); + srv->storage->SetCheckpointAccessTimeSecs(now_secs); + srv->DecrFetchFileThread(); + })); if (auto s = util::ThreadDetach(t); !s) { return s; diff --git a/src/common/parse_util.cc b/src/common/parse_util.cc index a7359c78adb..4bb432443d1 100644 --- a/src/common/parse_util.cc +++ b/src/common/parse_util.cc @@ -20,12 +20,11 @@ #include "parse_util.h" -#include - #include "bit_util.h" +#include "string_util.h" StatusOr ParseSizeAndUnit(const std::string &v) { - auto [num, rest] = GET_OR_RET(TryParseInt(v.c_str(), 10)); + auto [num, rest] = GET_OR_RET(TryParseInt(v, 10)); if (*rest == 0) { return num; diff --git a/src/common/parse_util.h b/src/common/parse_util.h index 3e3cf116c3f..bb441afd665 100644 --- a/src/common/parse_util.h +++ b/src/common/parse_util.h @@ -20,70 +20,14 @@ #pragma once +#include +#include #include -#include #include +#include #include #include "status.h" -#include "string_util.h" - -namespace details { - -template -struct ParseIntFunc; - -template <> -struct ParseIntFunc { // NOLINT - constexpr static const auto value = std::strtol; -}; - -template <> -struct ParseIntFunc { // NOLINT - constexpr static const auto value = std::strtol; -}; - -template <> -struct ParseIntFunc { // NOLINT - constexpr static const auto value = std::strtol; -}; - -template <> -struct ParseIntFunc { // NOLINT - constexpr static const auto value = std::strtol; -}; - -template <> -struct ParseIntFunc { // NOLINT - constexpr static const auto value = std::strtoll; -}; - -template <> -struct ParseIntFunc { // NOLINT - constexpr static const auto value = std::strtoul; -}; - -template <> -struct ParseIntFunc { // NOLINT - constexpr static const auto value = std::strtoul; -}; - -template <> -struct ParseIntFunc { // NOLINT - constexpr static const auto value = std::strtoul; -}; - -template <> -struct ParseIntFunc { // NOLINT - constexpr static const auto value = std::strtoul; -}; - -template <> -struct ParseIntFunc { // NOLINT - constexpr static const auto value = std::strtoull; -}; - -} // namespace details template using ParseResultAndPos = std::tuple; @@ -93,41 +37,69 @@ using ParseResultAndPos = std::tuple; // return the result integer and the current string position. // e.g. TryParseInt("100MB") -> {100, "MB"} // if no integer can be parsed or out of type range, an error will be returned -// base can be in {0, 2, ..., 36}, refer to strto* in standard c for more details -template // NOLINT -StatusOr> TryParseInt(const char *v, int base = 0) { - char *end = nullptr; - - errno = 0; - auto res = details::ParseIntFunc::value(v, &end, base); +// base can be in {0, 2, ..., 36} +template +StatusOr> TryParseInt(std::string_view v, int base = 0) { + static const std::string ErrNotInteger = "not started as an integer"; + + T res; + + // Skip leading spaces + const char *p = v.data(); + const char *end = v.data() + v.size(); + while (p < end && std::isspace(static_cast(*p))) { + ++p; + } - if (v == end) { - return {Status::NotOK, "not started as an integer"}; + if (p == end) { + return {Status::NotOK, ErrNotInteger}; } - if (errno) { - return Status::FromErrno(); + if (base == 0) { + if (*p == '0') { + if (p + 1 < end) { + if (std::tolower(*(p + 1)) == 'x') { + base = 16; + p += 2; + } else if (std::tolower(*(p + 1)) == 'b') { + base = 2; + p += 2; + } else { + base = 8; + p += 1; + } + } + } else { + base = 10; + } + } else if (base < 2 || base > 36) { + return {Status::NotOK, "invalid base (must be 2~36 or 0)"}; } - if (!std::is_same::value && - (res < std::numeric_limits::min() || res > std::numeric_limits::max())) { + auto [ptr, ec] = std::from_chars(p, end, res, base); + if (ec == std::errc::invalid_argument) { + return {Status::NotOK, ErrNotInteger}; + } else if (ec == std::errc::result_out_of_range) { return {Status::NotOK, "out of range of integer type"}; } - return ParseResultAndPos{res, end}; + if (ptr == p) { + return {Status::NotOK, ErrNotInteger}; + } + + return ParseResultAndPos{res, ptr}; } // ParseInt parses a string to a integer, // not like TryParseInt, the whole string need to be parsed as an integer, // e.g. ParseInt("100MB") -> error status template // NOLINT -StatusOr ParseInt(const std::string &v, int base = 0) { - const char *begin = v.c_str(); - auto res = TryParseInt(begin, base); +StatusOr ParseInt(std::string_view v, int base = 0) { + auto res = TryParseInt(v, base); if (!res) return res; - if (std::get<1>(*res) != begin + v.size()) { + if (std::get<1>(*res) != v.data() + v.size()) { return {Status::NotOK, "encounter non-integer characters"}; } @@ -140,7 +112,7 @@ using NumericRange = std::tuple; // this overload accepts a range {min, max}, // integer out of the range will trigger an error status template // NOLINT -StatusOr ParseInt(const std::string &v, NumericRange range, int base = 0) { +StatusOr ParseInt(std::string_view v, NumericRange range, int base = 0) { auto res = ParseInt(v, base); if (!res) return res; diff --git a/src/common/type_util.h b/src/common/type_util.h index 98439243622..3cf0c2a442a 100644 --- a/src/common/type_util.h +++ b/src/common/type_util.h @@ -20,6 +20,7 @@ #pragma once +#include #include template @@ -47,3 +48,13 @@ template struct GetClassFromMember { using type = C; // NOLINT }; + +struct StringHash { + using is_transparent = void; // NOLINT + std::size_t operator()(std::string_view str) const noexcept { return std::hash{}(str); } +}; + +struct StringEqual { + using is_transparent = void; // NOLINT + bool operator()(std::string_view lhs, std::string_view rhs) const noexcept { return lhs == rhs; } +}; \ No newline at end of file diff --git a/src/server/redis_connection.h b/src/server/redis_connection.h index e8b44d94144..c30652b37e8 100644 --- a/src/server/redis_connection.h +++ b/src/server/redis_connection.h @@ -22,10 +22,13 @@ #include +#include #include #include +#include #include #include +#include #include #include @@ -38,6 +41,8 @@ class Worker; namespace redis { +class PeerInfo; + class Connection : public EvbufCallbackBase { public: enum Flag { @@ -134,12 +139,18 @@ class Connection : public EvbufCallbackBase { void SetLastCmd(std::string cmd) { last_cmd_ = std::move(cmd); } std::string GetIP() const { return ip_; } uint32_t GetPort() const { return port_; } - void SetListeningPort(int port) { listening_port_ = port; } - int GetListeningPort() const { return listening_port_; } - void SetAnnounceIP(std::string ip) { announce_ip_ = std::move(ip); } - std::string GetAnnounceIP() const { return !announce_ip_.empty() ? announce_ip_ : ip_; } - uint32_t GetAnnouncePort() const { return listening_port_ != 0 ? listening_port_ : port_; } - std::string GetAnnounceAddr() const { return GetAnnounceIP() + ":" + std::to_string(GetAnnouncePort()); } + + void SetPeerInfo(std::unique_ptr &&peer_info) { peer_info_ = std::move(peer_info); } + + const PeerInfo *GetPeerInfo() { + if (peer_info_) { + return peer_info_.get(); + } + + SetPeerInfo(std::make_unique(ip_, port_, "", -1)); + return peer_info_.get(); + } + uint64_t GetClientType() const; Server *GetServer() { return srv_; } @@ -187,10 +198,11 @@ class Connection : public EvbufCallbackBase { std::string ns_; std::string name_; std::string ip_; - std::string announce_ip_; + + std::unique_ptr peer_info_ = nullptr; + uint32_t port_ = 0; std::string addr_; - int listening_port_ = 0; bool is_admin_ = false; bool need_free_bev_ = true; std::string last_cmd_; @@ -217,4 +229,44 @@ class Connection : public EvbufCallbackBase { RESP protocol_version_ = RESP::v2; }; +class PeerInfo { + public: + PeerInfo() = default; + ~PeerInfo() = default; + + PeerInfo(std::string_view ip, uint32_t port, std::string_view peer_id, int64_t peer_version) + : port_(port), peer_version_(peer_version) { + if (peer_id.empty()) { + str_ = fmt::format("{}:{}", ip, port); + } else { + str_ = fmt::format("{}:{} ({}@{})", ip, port, peer_id, peer_version); + } + + addr_ = std::string_view(str_).substr(0, str_.find(' ')); + ip_ = std::string_view(str_).substr(0, ip.length()); + + if (!peer_id.empty()) { + peer_id_ = std::string_view(str_).substr(addr_.length() + 1, peer_id.length()); + } + } + + std::string_view GetIP() const { return ip_; } + uint32_t GetPort() const { return port_; } + + std::string_view ToString() const { return str_; } + std::string_view GetPeerID() const { return peer_id_; } + std::string_view GetAddr() const { return addr_; } + int64_t GetPeerVersion() const { return peer_version_; } + + private: + std::string_view ip_; + std::string_view addr_; + uint32_t port_ = 0; + + std::string_view peer_id_; + int64_t peer_version_ = 0; + + std::string str_; +}; + } // namespace redis diff --git a/src/server/server.cc b/src/server/server.cc index 253a48029e0..4e551dc196f 100644 --- a/src/server/server.cc +++ b/src/server/server.cc @@ -37,6 +37,7 @@ #include #include #include +#include #include #include "commands/commander.h" @@ -354,6 +355,27 @@ void Server::DisconnectSlaves() { } } +void Server::CleanupOrphanSlaves(int64_t version, const ClusterNodes &nodes) { + std::lock_guard lg(slave_threads_mu_); + + for (auto &slave_thread : slave_threads_) { + const auto peer_info = slave_thread->GetConn()->GetPeerInfo(); + auto peer_version = peer_info->GetPeerVersion(); + if (peer_version < 0 || peer_version > version) { + // The peer version is greater than the current version, + // so we can't determine whether it is an orphan node, just skip it. + continue; + } + auto peer_id = peer_info->GetPeerID(); + auto it = nodes.find(peer_id); + if (it != nodes.end()) { + // The peer id is in the cluster, so it is not an orphan node. + continue; + } + slave_thread->Stop(); + } +} + void Server::CleanupExitedSlaves() { std::lock_guard lg(slave_threads_mu_); @@ -1061,8 +1083,8 @@ Server::InfoEntries Server::GetReplicationInfo() { if (slave->IsStopped()) continue; entries.emplace_back("slave" + std::to_string(idx), - fmt::format("ip={},port={},offset={},lag={}", slave->GetConn()->GetAnnounceIP(), - slave->GetConn()->GetAnnouncePort(), slave->GetCurrentReplSeq(), + fmt::format("ip={},port={},offset={},lag={}", slave->GetConn()->GetPeerInfo()->GetIP(), + slave->GetConn()->GetPeerInfo()->GetPort(), slave->GetCurrentReplSeq(), latest_seq - slave->GetCurrentReplSeq())); ++idx; } @@ -1096,10 +1118,11 @@ std::string Server::GetRoleInfo() { slave_threads_mu_.lock(); for (const auto &slave : slave_threads_) { if (slave->IsStopped()) continue; + const auto peer_info = slave->GetConn()->GetPeerInfo(); list.emplace_back(redis::ArrayOfBulkStrings({ - slave->GetConn()->GetAnnounceIP(), - std::to_string(slave->GetConn()->GetListeningPort()), + std::string(peer_info->GetAddr()), + std::to_string(peer_info->GetPort()), std::to_string(slave->GetCurrentReplSeq()), })); } @@ -1658,7 +1681,7 @@ void Server::KillClient(int64_t *killed, const std::string &addr, uint64_t id, u slave_threads_mu_.lock(); for (const auto &st : slave_threads_) { if ((type & kTypeSlave) || - (!addr.empty() && (st->GetConn()->GetAddr() == addr || st->GetConn()->GetAnnounceAddr() == addr)) || + (!addr.empty() && (st->GetConn()->GetAddr() == addr || st->GetConn()->GetPeerInfo()->GetAddr() == addr)) || (id != 0 && st->GetConn()->GetID() == id)) { st->Stop(); (*killed)++; @@ -2040,9 +2063,8 @@ std::list> Server::GetSlaveHostAndPort() { slave_threads_mu_.lock(); for (const auto &slave : slave_threads_) { if (slave->IsStopped()) continue; - std::pair host_port_pair = {slave->GetConn()->GetAnnounceIP(), - slave->GetConn()->GetListeningPort()}; - result.emplace_back(host_port_pair); + const auto peer_info = slave->GetConn()->GetPeerInfo(); + result.emplace_back(peer_info->GetIP(), static_cast(slave->GetConn()->GetPort())); } slave_threads_mu_.unlock(); return result; diff --git a/src/server/server.h b/src/server/server.h index b8c31429123..6849792f665 100644 --- a/src/server/server.h +++ b/src/server/server.h @@ -198,6 +198,7 @@ class Server { Status RemoveMaster(); Status AddSlave(redis::Connection *conn, rocksdb::SequenceNumber next_repl_seq); void DisconnectSlaves(); + void CleanupOrphanSlaves(int64_t version, const ClusterNodes &nodes); void CleanupExitedSlaves(); bool IsSlave() const { return !master_host_.empty(); } void FeedMonitorConns(redis::Connection *conn, const std::vector &tokens); diff --git a/src/server/worker.cc b/src/server/worker.cc index 3a5642c3bbb..71be4d9da30 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -533,7 +533,7 @@ void Worker::KillClient(redis::Connection *self, uint64_t id, const std::string } if ((type & conn->GetClientType()) || - (!addr.empty() && (conn->GetAddr() == addr || conn->GetAnnounceAddr() == addr)) || + (!addr.empty() && (conn->GetAddr() == addr || conn->GetPeerInfo()->GetAddr() == addr)) || (id != 0 && conn->GetID() == id)) { conn->EnableFlag(redis::Connection::kCloseAfterReply); // enable write event to notify worker wake up ASAP, and remove the connection diff --git a/src/storage/storage.h b/src/storage/storage.h index a4563eabc8f..b607e1cbc2d 100644 --- a/src/storage/storage.h +++ b/src/storage/storage.h @@ -29,7 +29,6 @@ #include #include -#include #include #include #include diff --git a/tests/gocase/integration/cluster/cluster_test.go b/tests/gocase/integration/cluster/cluster_test.go index 489fab266a1..244693ebd22 100644 --- a/tests/gocase/integration/cluster/cluster_test.go +++ b/tests/gocase/integration/cluster/cluster_test.go @@ -224,6 +224,11 @@ func TestClusterReplicas(t *testing.T) { clusterNode := fmt.Sprintf("%s\n%s", master1Node, master2Node) err := rdb1.Do(ctx, "clusterx", "SETNODES", clusterNode, "3").Err() require.NoError(t, err) + + require.Eventually(t, func() bool { + return util.FindInfoEntry(rdb2, "master_link_status") == "down" + }, 5*time.Second, 100*time.Millisecond) + err = rdb2.Do(ctx, "clusterx", "SETNODES", clusterNode, "3").Err() require.NoError(t, err) From 41ddd7b23256712affe665ff88fb94deda6eaafc Mon Sep 17 00:00:00 2001 From: Rivers Jin Date: Sun, 23 Mar 2025 19:55:11 +0800 Subject: [PATCH 02/14] [squash me] Replace ToString() with GetStringView() in `PeerInfo` --- src/commands/cmd_replication.cc | 6 +++--- src/server/redis_connection.h | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/commands/cmd_replication.cc b/src/commands/cmd_replication.cc index 69ef4efd312..769082fc683 100644 --- a/src/commands/cmd_replication.cc +++ b/src/commands/cmd_replication.cc @@ -67,7 +67,7 @@ class CommandPSync : public Commander { } Status Execute([[maybe_unused]] engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override { - auto peer_info = conn->GetPeerInfo()->ToString(); + auto peer_info = conn->GetPeerInfo()->GetStringView(); LOG(INFO) << fmt::format( "Slave {} asks for synchronization with next sequence: {} " @@ -246,7 +246,7 @@ class CommandFetchMeta : public Commander { Status Execute([[maybe_unused]] engine::Context &ctx, Server *srv, Connection *conn, [[maybe_unused]] std::string *output) override { int repl_fd = conn->GetFD(); - std::string_view peer_info = conn->GetPeerInfo()->ToString(); + std::string_view peer_info = conn->GetPeerInfo()->GetStringView(); auto s = util::SockSetBlocking(repl_fd, 1); if (!s.IsOK()) { @@ -305,7 +305,7 @@ class CommandFetchFile : public Commander { std::vector files = util::Split(files_str_, ","); int repl_fd = conn->GetFD(); - std::string_view peer_info = conn->GetPeerInfo()->ToString(); + std::string_view peer_info = conn->GetPeerInfo()->GetStringView(); auto s = util::SockSetBlocking(repl_fd, 1); if (!s.IsOK()) { diff --git a/src/server/redis_connection.h b/src/server/redis_connection.h index c30652b37e8..84041b63bfa 100644 --- a/src/server/redis_connection.h +++ b/src/server/redis_connection.h @@ -253,7 +253,7 @@ class PeerInfo { std::string_view GetIP() const { return ip_; } uint32_t GetPort() const { return port_; } - std::string_view ToString() const { return str_; } + std::string_view GetStringView() const { return str_; } std::string_view GetPeerID() const { return peer_id_; } std::string_view GetAddr() const { return addr_; } int64_t GetPeerVersion() const { return peer_version_; } From b5bb943ce6a947dcb3d3dc7c0b5402f7c934b7ca Mon Sep 17 00:00:00 2001 From: Rivers Jin Date: Sun, 23 Mar 2025 20:03:17 +0800 Subject: [PATCH 03/14] [squash me] removing custom hash and equality structs --- src/cluster/cluster.h | 2 +- src/common/type_util.h | 10 ---------- 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/src/cluster/cluster.h b/src/cluster/cluster.h index a1ffca26a20..b0e40ad5463 100644 --- a/src/cluster/cluster.h +++ b/src/cluster/cluster.h @@ -66,7 +66,7 @@ struct SlotInfo { std::vector nodes; }; -using ClusterNodes = std::unordered_map, StringHash, StringEqual>; +using ClusterNodes = std::unordered_map>; class Server; class SyncMigrateContext; diff --git a/src/common/type_util.h b/src/common/type_util.h index 3cf0c2a442a..12ce58835f1 100644 --- a/src/common/type_util.h +++ b/src/common/type_util.h @@ -48,13 +48,3 @@ template struct GetClassFromMember { using type = C; // NOLINT }; - -struct StringHash { - using is_transparent = void; // NOLINT - std::size_t operator()(std::string_view str) const noexcept { return std::hash{}(str); } -}; - -struct StringEqual { - using is_transparent = void; // NOLINT - bool operator()(std::string_view lhs, std::string_view rhs) const noexcept { return lhs == rhs; } -}; \ No newline at end of file From a1bd734a495d007c7f88e83b814f3683c92cd279 Mon Sep 17 00:00:00 2001 From: Rivers Jin Date: Mon, 24 Mar 2025 15:15:10 +0800 Subject: [PATCH 04/14] [squash me] fix unit test issue --- src/cluster/replication.cc | 2 +- src/commands/cmd_bit.cc | 13 ++++++++++++- src/commands/cmd_replication.cc | 3 ++- src/server/server.cc | 4 ++-- 4 files changed, 17 insertions(+), 5 deletions(-) diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc index d7e59170f71..9c4570ec752 100644 --- a/src/cluster/replication.cc +++ b/src/cluster/replication.cc @@ -457,7 +457,7 @@ ReplicationThread::CBState ReplicationThread::replConfWriteCB(bufferevent *bev) data_to_send.emplace_back("ip-address"); data_to_send.emplace_back(config->replica_announce_ip); } - if (!next_try_without_peer_id_) { + if (!next_try_without_peer_id_ && config->cluster_enabled) { data_to_send.emplace_back("peer-id"); data_to_send.emplace_back(srv_->cluster->GetMyId()); data_to_send.emplace_back("version"); diff --git a/src/commands/cmd_bit.cc b/src/commands/cmd_bit.cc index 13be25f3134..b8461b549d3 100644 --- a/src/commands/cmd_bit.cc +++ b/src/commands/cmd_bit.cc @@ -18,6 +18,8 @@ * */ +#include + #include "commander.h" #include "commands/command_parser.h" #include "error_constants.h" @@ -28,11 +30,20 @@ namespace redis { Status GetBitOffsetFromArgument(const std::string &arg, uint32_t *offset) { - auto parse_result = ParseInt(arg, 10); + static const std::string errValueNotInteger = "bit offset is not an integer or out of range"; + auto parse_result = ParseInt(arg, 10); if (!parse_result) { return parse_result.ToStatus(); } + if (*parse_result < 0) { + return {Status::RedisParseErr, errValueNotInteger}; + } + + if (*parse_result > std::numeric_limits::max()) { + return {Status::RedisParseErr, errValueNotInteger}; + } + *offset = *parse_result; return Status::OK(); } diff --git a/src/commands/cmd_replication.cc b/src/commands/cmd_replication.cc index 769082fc683..85134c53519 100644 --- a/src/commands/cmd_replication.cc +++ b/src/commands/cmd_replication.cc @@ -217,7 +217,8 @@ class CommandReplConf : public Commander { Status Execute([[maybe_unused]] engine::Context &ctx, [[maybe_unused]] Server *srv, Connection *conn, std::string *output) override { - if (peer_version_ >= 0 && !srv->cluster->IsInCluster(peer_id_, peer_version_)) { + if (srv->GetConfig()->cluster_enabled && peer_version_ >= 0 && + !srv->cluster->IsInCluster(peer_id_, peer_version_)) { return {Status::NotOK, errYouAreFired}; } diff --git a/src/server/server.cc b/src/server/server.cc index 4e551dc196f..e90f805866b 100644 --- a/src/server/server.cc +++ b/src/server/server.cc @@ -1121,7 +1121,7 @@ std::string Server::GetRoleInfo() { const auto peer_info = slave->GetConn()->GetPeerInfo(); list.emplace_back(redis::ArrayOfBulkStrings({ - std::string(peer_info->GetAddr()), + std::string(peer_info->GetIP()), std::to_string(peer_info->GetPort()), std::to_string(slave->GetCurrentReplSeq()), })); @@ -2064,7 +2064,7 @@ std::list> Server::GetSlaveHostAndPort() { for (const auto &slave : slave_threads_) { if (slave->IsStopped()) continue; const auto peer_info = slave->GetConn()->GetPeerInfo(); - result.emplace_back(peer_info->GetIP(), static_cast(slave->GetConn()->GetPort())); + result.emplace_back(peer_info->GetIP(), peer_info->GetPort()); } slave_threads_mu_.unlock(); return result; From 1f77f039f94f2b20557de6dab1acde8b52ef37b7 Mon Sep 17 00:00:00 2001 From: Rivers Jin Date: Mon, 24 Mar 2025 16:32:41 +0800 Subject: [PATCH 05/14] [squash me] simplify node retrieval logic in SetNodeId and SetSlotRanges --- src/cluster/cluster.cc | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc index 46cfc9c00c8..a74a3ea8c2e 100644 --- a/src/cluster/cluster.cc +++ b/src/cluster/cluster.cc @@ -76,14 +76,12 @@ Status Cluster::SetNodeId(const std::string &node_id) { } myid_ = node_id; - // Already has cluster topology - if (version_ >= 0 && nodes_->count(node_id) > 0) { - myself_ = nodes_->at(node_id); - } else { + if (version_ < 0) { myself_ = nullptr; + } else if (auto it = nodes_->find(node_id); it != nodes_->end()) { + myself_ = it->second; } - // Set replication relationship return SetMasterSlaveRepl(); } @@ -104,14 +102,15 @@ Status Cluster::SetSlotRanges(const std::vector &slot_ranges, const s return {Status::NotOK, errInvalidNodeID}; } + std::shared_ptr to_assign_node{}; + // Get the node which we want to assign slots into it - auto it = nodes_->find(node_id); - if (it == nodes_->end()) { + if (auto it = nodes_->find(node_id); it != nodes_->end()) { + to_assign_node = it->second; + } else { return {Status::NotOK, "No this node in the cluster"}; } - auto to_assign_node = it->second; - if (to_assign_node->role != kClusterMaster) { return {Status::NotOK, errNoMasterNode}; } @@ -215,8 +214,7 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b myself_ = nullptr; if (!myid_.empty()) { - auto it = nodes_->find(myid_); - if (it != nodes_->end()) { + if (auto it = nodes_->find(myid_); it != nodes_->end()) { myself_ = it->second; } } From bf7d6f839a51c6af366bba20b1ba39d588c18ded Mon Sep 17 00:00:00 2001 From: Rivers Jin Date: Thu, 27 Mar 2025 11:55:59 +0800 Subject: [PATCH 06/14] [sqash me] remove unnecessary move semantic parameters --- src/cluster/cluster.cc | 7 +++---- src/cluster/cluster.h | 4 ++-- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc index a74a3ea8c2e..09c59a1ff2f 100644 --- a/src/cluster/cluster.cc +++ b/src/cluster/cluster.cc @@ -42,8 +42,8 @@ #include "string_util.h" #include "time_util.h" -ClusterNode::ClusterNode(std::string &&id, std::string &&host, int port, int role, std::string &&master_id, - std::bitset &&slots) +ClusterNode::ClusterNode(std::string id, std::string host, int port, int role, std::string master_id, + std::bitset slots) : id(std::move(id)), host(std::move(host)), port(port), @@ -863,8 +863,7 @@ Status Cluster::parseClusterNodes(const std::string &nodes_str, ClusterNodes *no } // Create master node - auto node = std::make_shared(std::move(id), std::move(host), port, role, std::move(master_id), - std::move(slots)); + auto node = std::make_shared(id, host, port, role, master_id, slots); nodes->insert({node->id, node}); } diff --git a/src/cluster/cluster.h b/src/cluster/cluster.h index b0e40ad5463..0c51941b1f7 100644 --- a/src/cluster/cluster.h +++ b/src/cluster/cluster.h @@ -43,8 +43,8 @@ class ClusterNode { public: - explicit ClusterNode(std::string &&id, std::string &&host, int port, int role, std::string &&master_id, - std::bitset &&slots); + explicit ClusterNode(std::string id, std::string host, int port, int role, std::string master_id, + std::bitset slots); std::string id; std::string host; int port; From eaec8187ac0eef98f7f9d3176a521937931cf9e2 Mon Sep 17 00:00:00 2001 From: Rivers Jin Date: Fri, 28 Mar 2025 15:01:15 +0800 Subject: [PATCH 07/14] [squash me] change ClusterNode slots parameter to const reference and update GetPeerInfo to return by reference --- src/cluster/cluster.cc | 9 ++------ src/cluster/cluster.h | 2 +- src/commands/cmd_replication.cc | 6 ++--- src/server/redis_connection.h | 41 +++++++++++++++------------------ src/server/server.cc | 20 ++++++++-------- src/server/worker.cc | 2 +- 6 files changed, 35 insertions(+), 45 deletions(-) diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc index 09c59a1ff2f..23db9eaeb6d 100644 --- a/src/cluster/cluster.cc +++ b/src/cluster/cluster.cc @@ -43,13 +43,8 @@ #include "time_util.h" ClusterNode::ClusterNode(std::string id, std::string host, int port, int role, std::string master_id, - std::bitset slots) - : id(std::move(id)), - host(std::move(host)), - port(port), - role(role), - master_id(std::move(master_id)), - slots(std::move(slots)) {} + const std::bitset &slots) + : id(std::move(id)), host(std::move(host)), port(port), role(role), master_id(std::move(master_id)), slots(slots) {} Cluster::Cluster(Server *srv, std::vector binds, int port) : srv_(srv), binds_(std::move(binds)), port_(port) { diff --git a/src/cluster/cluster.h b/src/cluster/cluster.h index 0c51941b1f7..b79863fb896 100644 --- a/src/cluster/cluster.h +++ b/src/cluster/cluster.h @@ -44,7 +44,7 @@ class ClusterNode { public: explicit ClusterNode(std::string id, std::string host, int port, int role, std::string master_id, - std::bitset slots); + const std::bitset &slots); std::string id; std::string host; int port; diff --git a/src/commands/cmd_replication.cc b/src/commands/cmd_replication.cc index 85134c53519..1256073a02d 100644 --- a/src/commands/cmd_replication.cc +++ b/src/commands/cmd_replication.cc @@ -67,7 +67,7 @@ class CommandPSync : public Commander { } Status Execute([[maybe_unused]] engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override { - auto peer_info = conn->GetPeerInfo()->GetStringView(); + auto peer_info = conn->GetPeerInfo().ToString(); LOG(INFO) << fmt::format( "Slave {} asks for synchronization with next sequence: {} " @@ -247,7 +247,7 @@ class CommandFetchMeta : public Commander { Status Execute([[maybe_unused]] engine::Context &ctx, Server *srv, Connection *conn, [[maybe_unused]] std::string *output) override { int repl_fd = conn->GetFD(); - std::string_view peer_info = conn->GetPeerInfo()->GetStringView(); + auto peer_info = conn->GetPeerInfo().ToString(); auto s = util::SockSetBlocking(repl_fd, 1); if (!s.IsOK()) { @@ -306,7 +306,7 @@ class CommandFetchFile : public Commander { std::vector files = util::Split(files_str_, ","); int repl_fd = conn->GetFD(); - std::string_view peer_info = conn->GetPeerInfo()->GetStringView(); + auto peer_info = conn->GetPeerInfo().ToString(); auto s = util::SockSetBlocking(repl_fd, 1); if (!s.IsOK()) { diff --git a/src/server/redis_connection.h b/src/server/redis_connection.h index 84041b63bfa..a832da7ea8e 100644 --- a/src/server/redis_connection.h +++ b/src/server/redis_connection.h @@ -142,13 +142,13 @@ class Connection : public EvbufCallbackBase { void SetPeerInfo(std::unique_ptr &&peer_info) { peer_info_ = std::move(peer_info); } - const PeerInfo *GetPeerInfo() { + const PeerInfo &GetPeerInfo() { if (peer_info_) { - return peer_info_.get(); + return *peer_info_; } SetPeerInfo(std::make_unique(ip_, port_, "", -1)); - return peer_info_.get(); + return *peer_info_; } uint64_t GetClientType() const; @@ -235,35 +235,30 @@ class PeerInfo { ~PeerInfo() = default; PeerInfo(std::string_view ip, uint32_t port, std::string_view peer_id, int64_t peer_version) - : port_(port), peer_version_(peer_version) { - if (peer_id.empty()) { - str_ = fmt::format("{}:{}", ip, port); - } else { - str_ = fmt::format("{}:{} ({}@{})", ip, port, peer_id, peer_version); - } - - addr_ = std::string_view(str_).substr(0, str_.find(' ')); - ip_ = std::string_view(str_).substr(0, ip.length()); - - if (!peer_id.empty()) { - peer_id_ = std::string_view(str_).substr(addr_.length() + 1, peer_id.length()); - } + : ip_(ip), port_(port), peer_id_(peer_id), peer_version_(peer_version) { + addr_ = fmt::format("{}:{}", ip, port); } - std::string_view GetIP() const { return ip_; } + std::string GetIP() const { return ip_; } uint32_t GetPort() const { return port_; } - std::string_view GetStringView() const { return str_; } - std::string_view GetPeerID() const { return peer_id_; } - std::string_view GetAddr() const { return addr_; } + std::string ToString() const { + if (peer_id_.empty()) { + return fmt::format("{}:{}", ip_, port_); + } else { + return fmt::format("{}:{} ({}@{})", ip_, port_, peer_id_, peer_version_); + } + } + std::string GetPeerID() const { return peer_id_; } + std::string GetAddr() const { return addr_; } int64_t GetPeerVersion() const { return peer_version_; } private: - std::string_view ip_; - std::string_view addr_; + std::string ip_; + std::string addr_; uint32_t port_ = 0; - std::string_view peer_id_; + std::string peer_id_; int64_t peer_version_ = 0; std::string str_; diff --git a/src/server/server.cc b/src/server/server.cc index e90f805866b..d4ed2335ffb 100644 --- a/src/server/server.cc +++ b/src/server/server.cc @@ -360,13 +360,13 @@ void Server::CleanupOrphanSlaves(int64_t version, const ClusterNodes &nodes) { for (auto &slave_thread : slave_threads_) { const auto peer_info = slave_thread->GetConn()->GetPeerInfo(); - auto peer_version = peer_info->GetPeerVersion(); + auto peer_version = peer_info.GetPeerVersion(); if (peer_version < 0 || peer_version > version) { // The peer version is greater than the current version, // so we can't determine whether it is an orphan node, just skip it. continue; } - auto peer_id = peer_info->GetPeerID(); + auto peer_id = peer_info.GetPeerID(); auto it = nodes.find(peer_id); if (it != nodes.end()) { // The peer id is in the cluster, so it is not an orphan node. @@ -1081,10 +1081,10 @@ Server::InfoEntries Server::GetReplicationInfo() { entries.emplace_back("connected_slaves", slave_threads_.size()); for (const auto &slave : slave_threads_) { if (slave->IsStopped()) continue; - + const auto& peer_info = slave->GetConn()->GetPeerInfo(); entries.emplace_back("slave" + std::to_string(idx), - fmt::format("ip={},port={},offset={},lag={}", slave->GetConn()->GetPeerInfo()->GetIP(), - slave->GetConn()->GetPeerInfo()->GetPort(), slave->GetCurrentReplSeq(), + fmt::format("ip={},port={},offset={},lag={}", peer_info.GetIP(), + peer_info.GetPort(), slave->GetCurrentReplSeq(), latest_seq - slave->GetCurrentReplSeq())); ++idx; } @@ -1119,10 +1119,10 @@ std::string Server::GetRoleInfo() { for (const auto &slave : slave_threads_) { if (slave->IsStopped()) continue; const auto peer_info = slave->GetConn()->GetPeerInfo(); - + list.emplace_back(redis::ArrayOfBulkStrings({ - std::string(peer_info->GetIP()), - std::to_string(peer_info->GetPort()), + std::string(peer_info.GetIP()), + std::to_string(peer_info.GetPort()), std::to_string(slave->GetCurrentReplSeq()), })); } @@ -1681,7 +1681,7 @@ void Server::KillClient(int64_t *killed, const std::string &addr, uint64_t id, u slave_threads_mu_.lock(); for (const auto &st : slave_threads_) { if ((type & kTypeSlave) || - (!addr.empty() && (st->GetConn()->GetAddr() == addr || st->GetConn()->GetPeerInfo()->GetAddr() == addr)) || + (!addr.empty() && (st->GetConn()->GetAddr() == addr || st->GetConn()->GetPeerInfo().GetAddr() == addr)) || (id != 0 && st->GetConn()->GetID() == id)) { st->Stop(); (*killed)++; @@ -2064,7 +2064,7 @@ std::list> Server::GetSlaveHostAndPort() { for (const auto &slave : slave_threads_) { if (slave->IsStopped()) continue; const auto peer_info = slave->GetConn()->GetPeerInfo(); - result.emplace_back(peer_info->GetIP(), peer_info->GetPort()); + result.emplace_back(peer_info.GetIP(), peer_info.GetPort()); } slave_threads_mu_.unlock(); return result; diff --git a/src/server/worker.cc b/src/server/worker.cc index 71be4d9da30..ea8cfe08c72 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -533,7 +533,7 @@ void Worker::KillClient(redis::Connection *self, uint64_t id, const std::string } if ((type & conn->GetClientType()) || - (!addr.empty() && (conn->GetAddr() == addr || conn->GetPeerInfo()->GetAddr() == addr)) || + (!addr.empty() && (conn->GetAddr() == addr || conn->GetPeerInfo().GetAddr() == addr)) || (id != 0 && conn->GetID() == id)) { conn->EnableFlag(redis::Connection::kCloseAfterReply); // enable write event to notify worker wake up ASAP, and remove the connection From 9c4db61048edcf0ae7e91471cdb0ca5aba5cb301 Mon Sep 17 00:00:00 2001 From: Rivers Jin Date: Fri, 28 Mar 2025 15:54:55 +0800 Subject: [PATCH 08/14] [squash me] revert unrelated modifications --- src/commands/cmd_bit.cc | 13 +--- src/commands/cmd_replication.cc | 2 +- src/common/parse_util.cc | 5 +- src/common/parse_util.h | 128 +++++++++++++++++++------------- src/common/type_util.h | 1 - 5 files changed, 83 insertions(+), 66 deletions(-) diff --git a/src/commands/cmd_bit.cc b/src/commands/cmd_bit.cc index b8461b549d3..13be25f3134 100644 --- a/src/commands/cmd_bit.cc +++ b/src/commands/cmd_bit.cc @@ -18,8 +18,6 @@ * */ -#include - #include "commander.h" #include "commands/command_parser.h" #include "error_constants.h" @@ -30,20 +28,11 @@ namespace redis { Status GetBitOffsetFromArgument(const std::string &arg, uint32_t *offset) { - static const std::string errValueNotInteger = "bit offset is not an integer or out of range"; - auto parse_result = ParseInt(arg, 10); + auto parse_result = ParseInt(arg, 10); if (!parse_result) { return parse_result.ToStatus(); } - if (*parse_result < 0) { - return {Status::RedisParseErr, errValueNotInteger}; - } - - if (*parse_result > std::numeric_limits::max()) { - return {Status::RedisParseErr, errValueNotInteger}; - } - *offset = *parse_result; return Status::OK(); } diff --git a/src/commands/cmd_replication.cc b/src/commands/cmd_replication.cc index 1256073a02d..b2ae6112308 100644 --- a/src/commands/cmd_replication.cc +++ b/src/commands/cmd_replication.cc @@ -177,7 +177,7 @@ class CommandReplConf : public Commander { return Commander::Parse(args); } - Status ParseParam(std::string_view option, std::string_view value) { + Status ParseParam(const std::string& option, const std::string& value) { if (option == "listening-port") { auto parse_result = ParseInt(value, NumericRange{1, PORT_LIMIT - 1}, 10); if (!parse_result) { diff --git a/src/common/parse_util.cc b/src/common/parse_util.cc index 4bb432443d1..a7359c78adb 100644 --- a/src/common/parse_util.cc +++ b/src/common/parse_util.cc @@ -20,11 +20,12 @@ #include "parse_util.h" +#include + #include "bit_util.h" -#include "string_util.h" StatusOr ParseSizeAndUnit(const std::string &v) { - auto [num, rest] = GET_OR_RET(TryParseInt(v, 10)); + auto [num, rest] = GET_OR_RET(TryParseInt(v.c_str(), 10)); if (*rest == 0) { return num; diff --git a/src/common/parse_util.h b/src/common/parse_util.h index bb441afd665..3e3cf116c3f 100644 --- a/src/common/parse_util.h +++ b/src/common/parse_util.h @@ -20,14 +20,70 @@ #pragma once -#include -#include #include +#include #include -#include #include #include "status.h" +#include "string_util.h" + +namespace details { + +template +struct ParseIntFunc; + +template <> +struct ParseIntFunc { // NOLINT + constexpr static const auto value = std::strtol; +}; + +template <> +struct ParseIntFunc { // NOLINT + constexpr static const auto value = std::strtol; +}; + +template <> +struct ParseIntFunc { // NOLINT + constexpr static const auto value = std::strtol; +}; + +template <> +struct ParseIntFunc { // NOLINT + constexpr static const auto value = std::strtol; +}; + +template <> +struct ParseIntFunc { // NOLINT + constexpr static const auto value = std::strtoll; +}; + +template <> +struct ParseIntFunc { // NOLINT + constexpr static const auto value = std::strtoul; +}; + +template <> +struct ParseIntFunc { // NOLINT + constexpr static const auto value = std::strtoul; +}; + +template <> +struct ParseIntFunc { // NOLINT + constexpr static const auto value = std::strtoul; +}; + +template <> +struct ParseIntFunc { // NOLINT + constexpr static const auto value = std::strtoul; +}; + +template <> +struct ParseIntFunc { // NOLINT + constexpr static const auto value = std::strtoull; +}; + +} // namespace details template using ParseResultAndPos = std::tuple; @@ -37,69 +93,41 @@ using ParseResultAndPos = std::tuple; // return the result integer and the current string position. // e.g. TryParseInt("100MB") -> {100, "MB"} // if no integer can be parsed or out of type range, an error will be returned -// base can be in {0, 2, ..., 36} -template -StatusOr> TryParseInt(std::string_view v, int base = 0) { - static const std::string ErrNotInteger = "not started as an integer"; - - T res; - - // Skip leading spaces - const char *p = v.data(); - const char *end = v.data() + v.size(); - while (p < end && std::isspace(static_cast(*p))) { - ++p; - } +// base can be in {0, 2, ..., 36}, refer to strto* in standard c for more details +template // NOLINT +StatusOr> TryParseInt(const char *v, int base = 0) { + char *end = nullptr; - if (p == end) { - return {Status::NotOK, ErrNotInteger}; - } + errno = 0; + auto res = details::ParseIntFunc::value(v, &end, base); - if (base == 0) { - if (*p == '0') { - if (p + 1 < end) { - if (std::tolower(*(p + 1)) == 'x') { - base = 16; - p += 2; - } else if (std::tolower(*(p + 1)) == 'b') { - base = 2; - p += 2; - } else { - base = 8; - p += 1; - } - } - } else { - base = 10; - } - } else if (base < 2 || base > 36) { - return {Status::NotOK, "invalid base (must be 2~36 or 0)"}; + if (v == end) { + return {Status::NotOK, "not started as an integer"}; } - auto [ptr, ec] = std::from_chars(p, end, res, base); - if (ec == std::errc::invalid_argument) { - return {Status::NotOK, ErrNotInteger}; - } else if (ec == std::errc::result_out_of_range) { - return {Status::NotOK, "out of range of integer type"}; + if (errno) { + return Status::FromErrno(); } - if (ptr == p) { - return {Status::NotOK, ErrNotInteger}; + if (!std::is_same::value && + (res < std::numeric_limits::min() || res > std::numeric_limits::max())) { + return {Status::NotOK, "out of range of integer type"}; } - return ParseResultAndPos{res, ptr}; + return ParseResultAndPos{res, end}; } // ParseInt parses a string to a integer, // not like TryParseInt, the whole string need to be parsed as an integer, // e.g. ParseInt("100MB") -> error status template // NOLINT -StatusOr ParseInt(std::string_view v, int base = 0) { - auto res = TryParseInt(v, base); +StatusOr ParseInt(const std::string &v, int base = 0) { + const char *begin = v.c_str(); + auto res = TryParseInt(begin, base); if (!res) return res; - if (std::get<1>(*res) != v.data() + v.size()) { + if (std::get<1>(*res) != begin + v.size()) { return {Status::NotOK, "encounter non-integer characters"}; } @@ -112,7 +140,7 @@ using NumericRange = std::tuple; // this overload accepts a range {min, max}, // integer out of the range will trigger an error status template // NOLINT -StatusOr ParseInt(std::string_view v, NumericRange range, int base = 0) { +StatusOr ParseInt(const std::string &v, NumericRange range, int base = 0) { auto res = ParseInt(v, base); if (!res) return res; diff --git a/src/common/type_util.h b/src/common/type_util.h index 12ce58835f1..98439243622 100644 --- a/src/common/type_util.h +++ b/src/common/type_util.h @@ -20,7 +20,6 @@ #pragma once -#include #include template From 7f115389517ec4b88db6bf1324d815af46bb6509 Mon Sep 17 00:00:00 2001 From: Rivers Date: Fri, 28 Mar 2025 16:02:01 +0800 Subject: [PATCH 09/14] [squash me] Update src/cluster/cluster.cc Co-authored-by: hulk --- src/cluster/cluster.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc index 23db9eaeb6d..1bb372be53d 100644 --- a/src/cluster/cluster.cc +++ b/src/cluster/cluster.cc @@ -1000,4 +1000,4 @@ bool Cluster::IsInCluster(const std::string &node_id, int64_t version) const { } return nodes_->count(node_id) > 0; -} \ No newline at end of file +} From 19e81f3ded8e28a55f0b4fd813ca070e4d3d5677 Mon Sep 17 00:00:00 2001 From: Rivers Jin Date: Fri, 28 Mar 2025 16:02:26 +0800 Subject: [PATCH 10/14] [sqush me] update error message for decommissioned nodes --- src/cluster/cluster_defs.h | 2 +- src/cluster/replication.cc | 2 +- src/commands/cmd_replication.cc | 4 ++-- src/server/server.cc | 9 ++++----- 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/src/cluster/cluster_defs.h b/src/cluster/cluster_defs.h index 0ac2c9d2985..c6e20b4ef5d 100644 --- a/src/cluster/cluster_defs.h +++ b/src/cluster/cluster_defs.h @@ -41,7 +41,7 @@ inline constexpr const char *errNoMasterNode = "The node isn't a master"; inline constexpr const char *errClusterNoInitialized = "The cluster is not initialized"; inline constexpr const char *errInvalidClusterNodeInfo = "Invalid cluster nodes info"; inline constexpr const char *errInvalidImportState = "Invalid import state"; -inline constexpr const char *errYouAreFired = "You are fired"; +inline constexpr const char *errNodeDecommissioned = "Node has been decommissioned"; /// SlotRange is a range of cluster slots covering [start, end], /// where the valid values for start and end are [0, kClusterSlots). diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc index 9c4570ec752..3b121ec4c20 100644 --- a/src/cluster/replication.cc +++ b/src/cluster/replication.cc @@ -1067,7 +1067,7 @@ bool ReplicationThread::isWrongPsyncNum(std::string_view err) { bool ReplicationThread::isNodeFired(std::string_view err) { static const auto msg = - fmt::format(RESP_PREFIX_ERROR "{}", redis::StatusToRedisErrorMsg({Status::NotOK, errYouAreFired})); + fmt::format(RESP_PREFIX_ERROR "{}", redis::StatusToRedisErrorMsg({Status::NotOK, errNodeDecommissioned})); return err == msg; } diff --git a/src/commands/cmd_replication.cc b/src/commands/cmd_replication.cc index b2ae6112308..836c365699e 100644 --- a/src/commands/cmd_replication.cc +++ b/src/commands/cmd_replication.cc @@ -177,7 +177,7 @@ class CommandReplConf : public Commander { return Commander::Parse(args); } - Status ParseParam(const std::string& option, const std::string& value) { + Status ParseParam(const std::string &option, const std::string &value) { if (option == "listening-port") { auto parse_result = ParseInt(value, NumericRange{1, PORT_LIMIT - 1}, 10); if (!parse_result) { @@ -219,7 +219,7 @@ class CommandReplConf : public Commander { std::string *output) override { if (srv->GetConfig()->cluster_enabled && peer_version_ >= 0 && !srv->cluster->IsInCluster(peer_id_, peer_version_)) { - return {Status::NotOK, errYouAreFired}; + return {Status::NotOK, errNodeDecommissioned}; } if (ip_.empty()) { diff --git a/src/server/server.cc b/src/server/server.cc index d4ed2335ffb..43545d9562c 100644 --- a/src/server/server.cc +++ b/src/server/server.cc @@ -1081,11 +1081,10 @@ Server::InfoEntries Server::GetReplicationInfo() { entries.emplace_back("connected_slaves", slave_threads_.size()); for (const auto &slave : slave_threads_) { if (slave->IsStopped()) continue; - const auto& peer_info = slave->GetConn()->GetPeerInfo(); + const auto &peer_info = slave->GetConn()->GetPeerInfo(); entries.emplace_back("slave" + std::to_string(idx), - fmt::format("ip={},port={},offset={},lag={}", peer_info.GetIP(), - peer_info.GetPort(), slave->GetCurrentReplSeq(), - latest_seq - slave->GetCurrentReplSeq())); + fmt::format("ip={},port={},offset={},lag={}", peer_info.GetIP(), peer_info.GetPort(), + slave->GetCurrentReplSeq(), latest_seq - slave->GetCurrentReplSeq())); ++idx; } slave_threads_mu_.unlock(); @@ -1119,7 +1118,7 @@ std::string Server::GetRoleInfo() { for (const auto &slave : slave_threads_) { if (slave->IsStopped()) continue; const auto peer_info = slave->GetConn()->GetPeerInfo(); - + list.emplace_back(redis::ArrayOfBulkStrings({ std::string(peer_info.GetIP()), std::to_string(peer_info.GetPort()), From 14a7750c8e937d67a9da85f1b12b1eda3d19cc0a Mon Sep 17 00:00:00 2001 From: Rivers Jin Date: Fri, 28 Mar 2025 17:04:44 +0800 Subject: [PATCH 11/14] [squash me] revert modification in `cluser` --- src/cluster/cluster.cc | 141 +++++++++++++++++++---------------------- src/cluster/cluster.h | 10 +-- src/server/server.cc | 2 +- 3 files changed, 70 insertions(+), 83 deletions(-) diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc index 1bb372be53d..c8379c32a22 100644 --- a/src/cluster/cluster.cc +++ b/src/cluster/cluster.cc @@ -23,18 +23,14 @@ #include #include -#include #include #include #include -#include -#include #include #include "cluster/cluster_defs.h" #include "commands/commander.h" #include "common/io_util.h" -#include "fmt/base.h" #include "fmt/format.h" #include "parse_util.h" #include "replication.h" @@ -71,12 +67,14 @@ Status Cluster::SetNodeId(const std::string &node_id) { } myid_ = node_id; - if (version_ < 0) { + // Already has cluster topology + if (version_ >= 0 && nodes_.find(node_id) != nodes_.end()) { + myself_ = nodes_[myid_]; + } else { myself_ = nullptr; - } else if (auto it = nodes_->find(node_id); it != nodes_->end()) { - myself_ = it->second; } + // Set replication relationship return SetMasterSlaveRepl(); } @@ -97,12 +95,9 @@ Status Cluster::SetSlotRanges(const std::vector &slot_ranges, const s return {Status::NotOK, errInvalidNodeID}; } - std::shared_ptr to_assign_node{}; - // Get the node which we want to assign slots into it - if (auto it = nodes_->find(node_id); it != nodes_->end()) { - to_assign_node = it->second; - } else { + std::shared_ptr to_assign_node = nodes_[node_id]; + if (to_assign_node == nullptr) { return {Status::NotOK, "No this node in the cluster"}; } @@ -165,10 +160,9 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b if (version_ == version) return Status::OK(); } - // ClusterNodes nodes; - auto nodes = std::make_unique(); + ClusterNodes nodes; std::unordered_map slots_nodes; - Status s = parseClusterNodes(nodes_str, nodes.get(), &slots_nodes); + Status s = parseClusterNodes(nodes_str, &nodes, &slots_nodes); if (!s.IsOK()) return s; // Update version and cluster topology @@ -178,19 +172,14 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b // Update slots to nodes for (const auto &[slot, node_id] : slots_nodes) { - auto it = nodes_->find(node_id); - if (it == nodes_->end()) { - return {Status::NotOK, "No this node in the cluster"}; - } - slots_nodes_[slot] = it->second; + slots_nodes_[slot] = nodes_[node_id]; } // Update replicas info and size - for (const auto &[node_id, node] : *nodes_) { + for (const auto &[node_id, node] : nodes_) { if (node->role == kClusterSlave) { - auto it = nodes_->find(node->master_id); - if (it != nodes_->end()) { - it->second->replicas.emplace_back(node_id); + if (nodes_.find(node->master_id) != nodes_.end()) { + nodes_[node->master_id]->replicas.push_back(node_id); } } if (node->role == kClusterMaster && node->slots.count() > 0) { @@ -199,7 +188,7 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b } if (myid_.empty() || force) { - for (const auto &[node_id, node] : *nodes_) { + for (const auto &[node_id, node] : nodes_) { if (node->port == port_ && util::MatchListeningIP(binds_, node->host)) { myid_ = node_id; break; @@ -208,10 +197,8 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b } myself_ = nullptr; - if (!myid_.empty()) { - if (auto it = nodes_->find(myid_); it != nodes_->end()) { - myself_ = it->second; - } + if (!myid_.empty() && nodes_.find(myid_) != nodes_.end()) { + myself_ = nodes_[myid_]; } // Set replication relationship @@ -265,14 +252,16 @@ Status Cluster::SetMasterSlaveRepl() { srv_->slot_migrator->SetStopMigrationFlag(false); LOG(INFO) << "Change server role to master, restart migration task"; } + if (!is_slave) { - srv_->CleanupOrphanSlaves(version_, *nodes_); + srv_->CleanupOrphanSlaves(version_, nodes_); } + return Status::OK(); } - auto it = nodes_->find(myself_->master_id); - if (it != nodes_->end()) { + auto it = nodes_.find(myself_->master_id); + if (it != nodes_.end()) { // Replica mode and master node is existing std::shared_ptr master = it->second; auto s = srv_->AddMaster(master->host, master->port, false); @@ -324,13 +313,10 @@ Status Cluster::SetSlotRangeImported(const SlotRange &slot_range) { Status Cluster::MigrateSlotRange(const SlotRange &slot_range, const std::string &dst_node_id, SyncMigrateContext *blocking_ctx) { - auto dst_node_it = nodes_->find(dst_node_id); - if (dst_node_it == nodes_->end()) { + if (nodes_.find(dst_node_id) == nodes_.end()) { return {Status::NotOK, "Can't find the destination node id"}; } - const auto &dst_node = dst_node_it->second; - if (!slot_range.IsValid()) { return {Status::NotOK, errSlotRangeInvalid}; } @@ -350,16 +336,17 @@ Status Cluster::MigrateSlotRange(const SlotRange &slot_range, const std::string return {Status::NotOK, "Slave can't migrate slot"}; } - if (dst_node->role != kClusterMaster) { + if (nodes_[dst_node_id]->role != kClusterMaster) { return {Status::NotOK, "Can't migrate slot to a slave"}; } - if (dst_node == myself_) { + if (nodes_[dst_node_id] == myself_) { return {Status::NotOK, "Can't migrate slot to myself"}; } - Status s = srv_->slot_migrator->PerformSlotRangeMigration(dst_node_id, dst_node->host, dst_node->port, slot_range, - blocking_ctx); + const auto &dst = nodes_[dst_node_id]; + Status s = + srv_->slot_migrator->PerformSlotRangeMigration(dst_node_id, dst->host, dst->port, slot_range, blocking_ctx); return s; } @@ -433,18 +420,27 @@ Status Cluster::GetClusterInfo(std::string *cluster_infos) { if (slots_node != nullptr) ok_slot++; } - *cluster_infos = fmt::format( + *cluster_infos = "cluster_state:ok\r\n" - "cluster_slots_assigned:{ok_slot}\r\n" - "cluster_slots_ok:{ok_slot}\r\n" + "cluster_slots_assigned:" + + std::to_string(ok_slot) + + "\r\n" + "cluster_slots_ok:" + + std::to_string(ok_slot) + + "\r\n" "cluster_slots_pfail:0\r\n" "cluster_slots_fail:0\r\n" - "cluster_known_nodes:{nodes_size}\r\n" - "cluster_size:{size}\r\n" - "cluster_current_epoch:{version}\r\n" - "cluster_my_epoch:{version}\r\n", - fmt::arg("ok_slot", ok_slot), fmt::arg("nodes_size", nodes_->size()), fmt::arg("size", size_), - fmt::arg("version", version_)); + "cluster_known_nodes:" + + std::to_string(nodes_.size()) + + "\r\n" + "cluster_size:" + + std::to_string(size_) + + "\r\n" + "cluster_current_epoch:" + + std::to_string(version_) + + "\r\n" + "cluster_my_epoch:" + + std::to_string(version_) + "\r\n"; if (myself_ != nullptr && myself_->role == kClusterMaster && !srv_->IsSlave()) { // Get migrating status @@ -504,11 +500,8 @@ SlotInfo Cluster::genSlotNodeInfo(int start, int end, const std::shared_ptrhost, n->port, n->id}); // itself for (const auto &id : n->replicas) { // replicas - auto it = nodes_->find(id); - if (it == nodes_->end()) { - continue; - } - vn.push_back({it->second->host, it->second->port, it->second->id}); + if (nodes_.find(id) == nodes_.end()) continue; + vn.push_back({nodes_[id]->host, nodes_[id]->port, nodes_[id]->id}); } return {start, end, vn}; @@ -530,8 +523,8 @@ StatusOr Cluster::GetReplicas(const std::string &node_id) { return {Status::RedisClusterDown, errClusterNoInitialized}; } - auto item = nodes_->find(node_id); - if (item == nodes_->end()) { + auto item = nodes_.find(node_id); + if (item == nodes_.end()) { return {Status::InvalidArgument, errInvalidNodeID}; } @@ -543,8 +536,8 @@ StatusOr Cluster::GetReplicas(const std::string &node_id) { auto now = util::GetTimeStampMS(); std::string replicas_desc; for (const auto &replica_id : node->replicas) { - auto n = nodes_->find(replica_id); - if (n == nodes_->end()) { + auto n = nodes_.find(replica_id); + if (n == nodes_.end()) { continue; } @@ -577,7 +570,7 @@ std::string Cluster::genNodesDescription() { auto now = util::GetTimeStampMS(); std::string nodes_desc; - for (const auto &[_, node] : *nodes_) { + for (const auto &[_, node] : nodes_) { std::string node_str; // ID, host, port node_str.append(node->id + " "); @@ -659,7 +652,7 @@ std::string Cluster::genNodesInfo() const { auto slots_infos = getClusterNodeSlots(); std::string nodes_info; - for (const auto &[_, node] : *nodes_) { + for (const auto &[_, node] : nodes_) { std::string node_str; node_str.append("node "); // ID @@ -752,8 +745,10 @@ Status Cluster::parseClusterNodes(const std::string &nodes_str, ClusterNodes *no return {Status::ClusterInvalidInfo, errInvalidClusterNodeInfo}; } + nodes->clear(); + // Parse all nodes - for (auto &node_str : nodes_info) { + for (const auto &node_str : nodes_info) { std::vector fields = util::Split(node_str, " "); if (fields.size() < 5) { return {Status::ClusterInvalidInfo, errInvalidClusterNodeInfo}; @@ -764,10 +759,10 @@ Status Cluster::parseClusterNodes(const std::string &nodes_str, ClusterNodes *no return {Status::ClusterInvalidInfo, errInvalidNodeID}; } - std::string &id = fields[0]; + std::string id = fields[0]; // 2) host, TODO(@shooterit): check host is valid - std::string &host = fields[1]; + std::string host = fields[1]; // 3) port auto parse_result = ParseInt(fields[2], 10); @@ -800,9 +795,7 @@ Status Cluster::parseClusterNodes(const std::string &nodes_str, ClusterNodes *no return {Status::ClusterInvalidInfo, errInvalidClusterNodeInfo}; } else { // Create slave node - auto node = std::make_shared(std::move(id), std::move(host), port, role, std::move(master_id), - std::move(slots)); - nodes->insert({node->id, node}); + (*nodes)[id] = std::make_shared(id, host, port, role, master_id, slots); continue; } } @@ -828,8 +821,6 @@ Status Cluster::parseClusterNodes(const std::string &nodes_str, ClusterNodes *no if (slots_nodes->find(start) != slots_nodes->end()) { return {Status::ClusterInvalidInfo, errSlotOverlapped}; } else { - // It's ok cause the value of the `slots_nodes` is `std::string`, - // a deep copy of the `id` is created. (*slots_nodes)[start] = id; } } @@ -858,8 +849,7 @@ Status Cluster::parseClusterNodes(const std::string &nodes_str, ClusterNodes *no } // Create master node - auto node = std::make_shared(id, host, port, role, master_id, slots); - nodes->insert({node->id, node}); + (*nodes)[id] = std::make_shared(id, host, port, role, master_id, slots); } return Status::OK(); @@ -946,8 +936,9 @@ Status Cluster::CanExecByMySelf(const redis::CommandAttributes *attributes, cons return Status::OK(); // I'm serving the imported slot } - if (myself_ && myself_->role == kClusterSlave && !(flags & redis::kCmdWrite) && nodes_->count(myself_->master_id) && - nodes_->at(myself_->master_id) == slots_nodes_[slot] && conn->IsFlagEnabled(redis::Connection::kReadOnly)) { + if (myself_ && myself_->role == kClusterSlave && !(flags & redis::kCmdWrite) && + nodes_.find(myself_->master_id) != nodes_.end() && nodes_[myself_->master_id] == slots_nodes_[slot] && + conn->IsFlagEnabled(redis::Connection::kReadOnly)) { return Status::OK(); // My master is serving this slot } @@ -980,7 +971,7 @@ Status Cluster::Reset() { myid_.clear(); myself_.reset(); - nodes_.reset(); + nodes_.clear(); for (auto &n : slots_nodes_) { n = nullptr; } @@ -999,5 +990,5 @@ bool Cluster::IsInCluster(const std::string &node_id, int64_t version) const { return true; } - return nodes_->count(node_id) > 0; -} + return nodes_.count(node_id) > 0; +} \ No newline at end of file diff --git a/src/cluster/cluster.h b/src/cluster/cluster.h index b79863fb896..1406439f0f0 100644 --- a/src/cluster/cluster.h +++ b/src/cluster/cluster.h @@ -22,13 +22,10 @@ #include #include -#include -#include #include #include #include #include -#include #include #include @@ -39,7 +36,6 @@ #include "server/redis_connection.h" #include "status.h" #include "storage/scripting.h" -#include "type_util.h" class ClusterNode { public: @@ -66,7 +62,7 @@ struct SlotInfo { std::vector nodes; }; -using ClusterNodes = std::unordered_map>; +using ClusterNodes = std::unordered_map>; class Server; class SyncMigrateContext; @@ -97,9 +93,9 @@ class Cluster { Status DumpClusterNodes(const std::string &file); Status LoadClusterNodes(const std::string &file_path); Status Reset(); - bool IsInCluster(const std::string &node_id, int64_t version) const; static bool SubCommandIsExecExclusive(const std::string &subcommand); + bool IsInCluster(const std::string &node_id, int64_t version) const; private: std::string getNodeIDBySlot(int slot) const; @@ -116,7 +112,7 @@ class Cluster { int64_t version_ = -1; std::string myid_; std::shared_ptr myself_; - std::unique_ptr nodes_; + ClusterNodes nodes_; std::shared_ptr slots_nodes_[kClusterSlots]; std::map migrated_slots_; diff --git a/src/server/server.cc b/src/server/server.cc index 43545d9562c..11f29eaffa3 100644 --- a/src/server/server.cc +++ b/src/server/server.cc @@ -359,7 +359,7 @@ void Server::CleanupOrphanSlaves(int64_t version, const ClusterNodes &nodes) { std::lock_guard lg(slave_threads_mu_); for (auto &slave_thread : slave_threads_) { - const auto peer_info = slave_thread->GetConn()->GetPeerInfo(); + const auto& peer_info = slave_thread->GetConn()->GetPeerInfo(); auto peer_version = peer_info.GetPeerVersion(); if (peer_version < 0 || peer_version > version) { // The peer version is greater than the current version, From 23a1fa832f9b7dc6f73f2fc5880cc76105dbe6d4 Mon Sep 17 00:00:00 2001 From: Rivers Jin Date: Fri, 28 Mar 2025 17:28:59 +0800 Subject: [PATCH 12/14] [squash me] rename `isNodeFired` to `isNodeDecommissioned` --- src/cluster/replication.cc | 4 ++-- src/cluster/replication.h | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc index 3b121ec4c20..b7712e15c23 100644 --- a/src/cluster/replication.cc +++ b/src/cluster/replication.cc @@ -500,7 +500,7 @@ ReplicationThread::CBState ReplicationThread::replConfReadCB(bufferevent *bev) { return CBState::RESTART; } - if (isNodeFired(resp)) { + if (isNodeDecommissioned(resp)) { LOG(ERROR) << "The master has fired the node, stop the replication"; return CBState::QUIT; } @@ -1065,7 +1065,7 @@ bool ReplicationThread::isWrongPsyncNum(std::string_view err) { return err == msg; } -bool ReplicationThread::isNodeFired(std::string_view err) { +bool ReplicationThread::isNodeDecommissioned(std::string_view err) { static const auto msg = fmt::format(RESP_PREFIX_ERROR "{}", redis::StatusToRedisErrorMsg({Status::NotOK, errNodeDecommissioned})); return err == msg; diff --git a/src/cluster/replication.h b/src/cluster/replication.h index 25034761434..423b3963b2f 100644 --- a/src/cluster/replication.h +++ b/src/cluster/replication.h @@ -211,7 +211,7 @@ class ReplicationThread : private EventCallbackBase { static bool isRestoringError(std::string_view err); static bool isWrongPsyncNum(std::string_view err); static bool isUnknownOption(std::string_view err); - static bool isNodeFired(std::string_view err); + static bool isNodeDecommissioned(std::string_view err); Status parseWriteBatch(const rocksdb::WriteBatch &write_batch); }; From 05e8a58eadb2d9ca9bc396c546b9c63c469e9b1e Mon Sep 17 00:00:00 2001 From: Rivers Jin Date: Sat, 29 Mar 2025 09:58:41 +0800 Subject: [PATCH 13/14] [squash me] format --- src/server/server.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/server.cc b/src/server/server.cc index 11f29eaffa3..1e8193651e4 100644 --- a/src/server/server.cc +++ b/src/server/server.cc @@ -359,7 +359,7 @@ void Server::CleanupOrphanSlaves(int64_t version, const ClusterNodes &nodes) { std::lock_guard lg(slave_threads_mu_); for (auto &slave_thread : slave_threads_) { - const auto& peer_info = slave_thread->GetConn()->GetPeerInfo(); + const auto &peer_info = slave_thread->GetConn()->GetPeerInfo(); auto peer_version = peer_info.GetPeerVersion(); if (peer_version < 0 || peer_version > version) { // The peer version is greater than the current version, From b2c0d9b2feea2f0f06c4050822ba09be7e6419f2 Mon Sep 17 00:00:00 2001 From: Rivers Jin Date: Sat, 29 Mar 2025 17:17:04 +0800 Subject: [PATCH 14/14] [squash me] Add missing includes in `replication.cc` --- src/cluster/replication.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc index b7712e15c23..def88a9c6b1 100644 --- a/src/cluster/replication.cc +++ b/src/cluster/replication.cc @@ -41,9 +41,11 @@ #include "io_util.h" #include "rocksdb/write_batch.h" #include "rocksdb_crc32c.h" +#include "scope_exit.h" #include "server/redis_reply.h" #include "server/server.h" #include "status.h" +#include "storage/batch_debugger.h" #include "thread_util.h" #include "time_util.h" #include "unique_fd.h"