Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
de4272b
feat(cluster): disconnect replication connections to orphan slaves
RiversJin Mar 23, 2025
41ddd7b
[squash me] Replace ToString() with GetStringView() in `PeerInfo`
RiversJin Mar 23, 2025
b5bb943
[squash me] removing custom hash and equality structs
RiversJin Mar 23, 2025
8af6476
Merge branch 'unstable' into fix/orphan_slave
RiversJin Mar 23, 2025
a1bd734
[squash me] fix unit test issue
RiversJin Mar 24, 2025
1f77f03
[squash me] simplify node retrieval logic in SetNodeId and SetSlotRanges
RiversJin Mar 24, 2025
2ede94c
Merge branch 'unstable' into fix/orphan_slave
RiversJin Mar 25, 2025
7cc4315
Merge branch 'unstable' into fix/orphan_slave
RiversJin Mar 27, 2025
bf7d6f8
[sqash me] remove unnecessary move semantic parameters
RiversJin Mar 27, 2025
eaec818
[squash me] change ClusterNode slots parameter to const reference and…
RiversJin Mar 28, 2025
9c4db61
[squash me] revert unrelated modifications
RiversJin Mar 28, 2025
7f11538
[squash me] Update src/cluster/cluster.cc
RiversJin Mar 28, 2025
19e81f3
[sqush me] update error message for decommissioned nodes
RiversJin Mar 28, 2025
14a7750
[squash me] revert modification in `cluser`
RiversJin Mar 28, 2025
23a1fa8
[squash me] rename `isNodeFired` to `isNodeDecommissioned`
RiversJin Mar 28, 2025
05e8a58
[squash me] format
RiversJin Mar 29, 2025
37c8cae
Merge branch 'unstable' into fix/orphan_slave
RiversJin Mar 29, 2025
b2c0d9b
[squash me] Add missing includes in `replication.cc`
RiversJin Mar 29, 2025
9571ac8
Merge branch 'unstable' into fix/orphan_slave
RiversJin Mar 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 94 additions & 62 deletions src/cluster/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,33 @@
#include <config/config_util.h>

#include <array>
#include <cstdint>
#include <cstring>
#include <fstream>
#include <memory>
#include <string>
#include <string_view>
#include <vector>

#include "cluster/cluster_defs.h"
#include "commands/commander.h"
#include "common/io_util.h"
#include "fmt/base.h"
#include "fmt/format.h"
#include "parse_util.h"
#include "replication.h"
#include "server/server.h"
#include "string_util.h"
#include "time_util.h"

ClusterNode::ClusterNode(std::string id, std::string host, int port, int role, std::string master_id,
const std::bitset<kClusterSlots> &slots)
: id(std::move(id)), host(std::move(host)), port(port), role(role), master_id(std::move(master_id)), slots(slots) {}
ClusterNode::ClusterNode(std::string &&id, std::string &&host, int port, int role, std::string &&master_id,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why these are changed?

Copy link
Member

@PragmaTwice PragmaTwice Mar 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some notes:

  1. for std::bitset, in most impl it is stored on the stack memory instead heap so move ctor has no difference from the copy ctor. usually you don't need to move it.
  2. for ClassA(std::string x) : x(x) {}, you can pass either lval or rval to it. for rval, it involves two move ctors. And for ClassA(std::string &&x) : x(x) {}, you can only pass rval to it, and also the performance difference is quite little or none.

So these changes are not so useful.

std::bitset<kClusterSlots> &&slots)
: id(std::move(id)),
host(std::move(host)),
port(port),
role(role),
master_id(std::move(master_id)),
slots(std::move(slots)) {}

Cluster::Cluster(Server *srv, std::vector<std::string> binds, int port)
: srv_(srv), binds_(std::move(binds)), port_(port) {
Expand Down Expand Up @@ -68,8 +77,8 @@ Status Cluster::SetNodeId(const std::string &node_id) {

myid_ = node_id;
// Already has cluster topology
if (version_ >= 0 && nodes_.find(node_id) != nodes_.end()) {
myself_ = nodes_[myid_];
if (version_ >= 0 && nodes_->count(node_id) > 0) {
myself_ = nodes_->at(node_id);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the performance is important, why not use find() and iterator based operations? Which could make it just search once

} else {
myself_ = nullptr;
}
Expand All @@ -96,11 +105,13 @@ Status Cluster::SetSlotRanges(const std::vector<SlotRange> &slot_ranges, const s
}

// Get the node which we want to assign slots into it
std::shared_ptr<ClusterNode> to_assign_node = nodes_[node_id];
if (to_assign_node == nullptr) {
auto it = nodes_->find(node_id);
if (it == nodes_->end()) {
return {Status::NotOK, "No this node in the cluster"};
}

auto to_assign_node = it->second;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
auto it = nodes_->find(node_id);
if (it == nodes_->end()) {
return {Status::NotOK, "No this node in the cluster"};
}
auto to_assign_node = it->second;
std::shared_ptr<ClusterNode> to_assign_node;
if (auto it = nodes_.find(); it != nodes_.end()) {
to_assign_node = ..
} else {
return {Status::NotOK, "No this node in the cluster"};
}


if (to_assign_node->role != kClusterMaster) {
return {Status::NotOK, errNoMasterNode};
}
Expand Down Expand Up @@ -160,26 +171,32 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b
if (version_ == version) return Status::OK();
}

ClusterNodes nodes;
// ClusterNodes nodes;
auto nodes = std::make_unique<ClusterNodes>();
std::unordered_map<int, std::string> slots_nodes;
Status s = parseClusterNodes(nodes_str, &nodes, &slots_nodes);
Status s = parseClusterNodes(nodes_str, nodes.get(), &slots_nodes);
if (!s.IsOK()) return s;

// Update version and cluster topology
version_ = version;
nodes_ = nodes;
nodes_ = std::move(nodes);
size_ = 0;

// Update slots to nodes
for (const auto &[slot, node_id] : slots_nodes) {
slots_nodes_[slot] = nodes_[node_id];
auto it = nodes_->find(node_id);
if (it == nodes_->end()) {
return {Status::NotOK, "No this node in the cluster"};
}
slots_nodes_[slot] = it->second;
}

// Update replicas info and size
for (const auto &[node_id, node] : nodes_) {
for (const auto &[node_id, node] : *nodes_) {
if (node->role == kClusterSlave) {
if (nodes_.find(node->master_id) != nodes_.end()) {
nodes_[node->master_id]->replicas.push_back(node_id);
auto it = nodes_->find(node->master_id);
if (it != nodes_->end()) {
it->second->replicas.emplace_back(node_id);
}
}
if (node->role == kClusterMaster && node->slots.count() > 0) {
Expand All @@ -188,7 +205,7 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b
}

if (myid_.empty() || force) {
for (const auto &[node_id, node] : nodes_) {
for (const auto &[node_id, node] : *nodes_) {
if (node->port == port_ && util::MatchListeningIP(binds_, node->host)) {
myid_ = node_id;
break;
Expand All @@ -197,8 +214,11 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b
}

myself_ = nullptr;
if (!myid_.empty() && nodes_.find(myid_) != nodes_.end()) {
myself_ = nodes_[myid_];
if (!myid_.empty()) {
auto it = nodes_->find(myid_);
if (it != nodes_->end()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if(auto it = ;

myself_ = it->second;
}
}

// Set replication relationship
Expand Down Expand Up @@ -252,11 +272,14 @@ Status Cluster::SetMasterSlaveRepl() {
srv_->slot_migrator->SetStopMigrationFlag(false);
LOG(INFO) << "Change server role to master, restart migration task";
}
if (!is_slave) {
srv_->CleanupOrphanSlaves(version_, *nodes_);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is what this patch actually does?

return Status::OK();
}

auto it = nodes_.find(myself_->master_id);
if (it != nodes_.end()) {
auto it = nodes_->find(myself_->master_id);
if (it != nodes_->end()) {
// Replica mode and master node is existing
std::shared_ptr<ClusterNode> master = it->second;
auto s = srv_->AddMaster(master->host, master->port, false);
Expand Down Expand Up @@ -308,10 +331,13 @@ Status Cluster::SetSlotRangeImported(const SlotRange &slot_range) {

Status Cluster::MigrateSlotRange(const SlotRange &slot_range, const std::string &dst_node_id,
SyncMigrateContext *blocking_ctx) {
if (nodes_.find(dst_node_id) == nodes_.end()) {
auto dst_node_it = nodes_->find(dst_node_id);
if (dst_node_it == nodes_->end()) {
return {Status::NotOK, "Can't find the destination node id"};
}

const auto &dst_node = dst_node_it->second;

if (!slot_range.IsValid()) {
return {Status::NotOK, errSlotRangeInvalid};
}
Expand All @@ -331,17 +357,16 @@ Status Cluster::MigrateSlotRange(const SlotRange &slot_range, const std::string
return {Status::NotOK, "Slave can't migrate slot"};
}

if (nodes_[dst_node_id]->role != kClusterMaster) {
if (dst_node->role != kClusterMaster) {
return {Status::NotOK, "Can't migrate slot to a slave"};
}

if (nodes_[dst_node_id] == myself_) {
if (dst_node == myself_) {
return {Status::NotOK, "Can't migrate slot to myself"};
}

const auto &dst = nodes_[dst_node_id];
Status s =
srv_->slot_migrator->PerformSlotRangeMigration(dst_node_id, dst->host, dst->port, slot_range, blocking_ctx);
Status s = srv_->slot_migrator->PerformSlotRangeMigration(dst_node_id, dst_node->host, dst_node->port, slot_range,
blocking_ctx);
return s;
}

Expand Down Expand Up @@ -415,27 +440,18 @@ Status Cluster::GetClusterInfo(std::string *cluster_infos) {
if (slots_node != nullptr) ok_slot++;
}

*cluster_infos =
*cluster_infos = fmt::format(
"cluster_state:ok\r\n"
"cluster_slots_assigned:" +
std::to_string(ok_slot) +
"\r\n"
"cluster_slots_ok:" +
std::to_string(ok_slot) +
"\r\n"
"cluster_slots_assigned:{ok_slot}\r\n"
"cluster_slots_ok:{ok_slot}\r\n"
"cluster_slots_pfail:0\r\n"
"cluster_slots_fail:0\r\n"
"cluster_known_nodes:" +
std::to_string(nodes_.size()) +
"\r\n"
"cluster_size:" +
std::to_string(size_) +
"\r\n"
"cluster_current_epoch:" +
std::to_string(version_) +
"\r\n"
"cluster_my_epoch:" +
std::to_string(version_) + "\r\n";
"cluster_known_nodes:{nodes_size}\r\n"
"cluster_size:{size}\r\n"
"cluster_current_epoch:{version}\r\n"
"cluster_my_epoch:{version}\r\n",
fmt::arg("ok_slot", ok_slot), fmt::arg("nodes_size", nodes_->size()), fmt::arg("size", size_),
fmt::arg("version", version_));

if (myself_ != nullptr && myself_->role == kClusterMaster && !srv_->IsSlave()) {
// Get migrating status
Expand Down Expand Up @@ -495,8 +511,11 @@ SlotInfo Cluster::genSlotNodeInfo(int start, int end, const std::shared_ptr<Clus
vn.push_back({n->host, n->port, n->id}); // itself

for (const auto &id : n->replicas) { // replicas
if (nodes_.find(id) == nodes_.end()) continue;
vn.push_back({nodes_[id]->host, nodes_[id]->port, nodes_[id]->id});
auto it = nodes_->find(id);
if (it == nodes_->end()) {
continue;
}
vn.push_back({it->second->host, it->second->port, it->second->id});
}

return {start, end, vn};
Expand All @@ -518,8 +537,8 @@ StatusOr<std::string> Cluster::GetReplicas(const std::string &node_id) {
return {Status::RedisClusterDown, errClusterNoInitialized};
}

auto item = nodes_.find(node_id);
if (item == nodes_.end()) {
auto item = nodes_->find(node_id);
if (item == nodes_->end()) {
return {Status::InvalidArgument, errInvalidNodeID};
}

Expand All @@ -531,8 +550,8 @@ StatusOr<std::string> Cluster::GetReplicas(const std::string &node_id) {
auto now = util::GetTimeStampMS();
std::string replicas_desc;
for (const auto &replica_id : node->replicas) {
auto n = nodes_.find(replica_id);
if (n == nodes_.end()) {
auto n = nodes_->find(replica_id);
if (n == nodes_->end()) {
continue;
}

Expand Down Expand Up @@ -565,7 +584,7 @@ std::string Cluster::genNodesDescription() {

auto now = util::GetTimeStampMS();
std::string nodes_desc;
for (const auto &[_, node] : nodes_) {
for (const auto &[_, node] : *nodes_) {
std::string node_str;
// ID, host, port
node_str.append(node->id + " ");
Expand Down Expand Up @@ -647,7 +666,7 @@ std::string Cluster::genNodesInfo() const {
auto slots_infos = getClusterNodeSlots();

std::string nodes_info;
for (const auto &[_, node] : nodes_) {
for (const auto &[_, node] : *nodes_) {
std::string node_str;
node_str.append("node ");
// ID
Expand Down Expand Up @@ -740,10 +759,8 @@ Status Cluster::parseClusterNodes(const std::string &nodes_str, ClusterNodes *no
return {Status::ClusterInvalidInfo, errInvalidClusterNodeInfo};
}

nodes->clear();

// Parse all nodes
for (const auto &node_str : nodes_info) {
for (auto &node_str : nodes_info) {
std::vector<std::string> fields = util::Split(node_str, " ");
if (fields.size() < 5) {
return {Status::ClusterInvalidInfo, errInvalidClusterNodeInfo};
Expand All @@ -754,10 +771,10 @@ Status Cluster::parseClusterNodes(const std::string &nodes_str, ClusterNodes *no
return {Status::ClusterInvalidInfo, errInvalidNodeID};
}

std::string id = fields[0];
std::string &id = fields[0];

// 2) host, TODO(@shooterit): check host is valid
std::string host = fields[1];
std::string &host = fields[1];

// 3) port
auto parse_result = ParseInt<uint16_t>(fields[2], 10);
Expand Down Expand Up @@ -790,7 +807,9 @@ Status Cluster::parseClusterNodes(const std::string &nodes_str, ClusterNodes *no
return {Status::ClusterInvalidInfo, errInvalidClusterNodeInfo};
} else {
// Create slave node
(*nodes)[id] = std::make_shared<ClusterNode>(id, host, port, role, master_id, slots);
auto node = std::make_shared<ClusterNode>(std::move(id), std::move(host), port, role, std::move(master_id),
std::move(slots));
nodes->insert({node->id, node});
continue;
}
}
Expand All @@ -816,6 +835,8 @@ Status Cluster::parseClusterNodes(const std::string &nodes_str, ClusterNodes *no
if (slots_nodes->find(start) != slots_nodes->end()) {
return {Status::ClusterInvalidInfo, errSlotOverlapped};
} else {
// It's ok cause the value of the `slots_nodes` is `std::string`,
// a deep copy of the `id` is created.
(*slots_nodes)[start] = id;
}
}
Expand Down Expand Up @@ -844,7 +865,9 @@ Status Cluster::parseClusterNodes(const std::string &nodes_str, ClusterNodes *no
}

// Create master node
(*nodes)[id] = std::make_shared<ClusterNode>(id, host, port, role, master_id, slots);
auto node = std::make_shared<ClusterNode>(std::move(id), std::move(host), port, role, std::move(master_id),
std::move(slots));
nodes->insert({node->id, node});
}

return Status::OK();
Expand Down Expand Up @@ -931,9 +954,8 @@ Status Cluster::CanExecByMySelf(const redis::CommandAttributes *attributes, cons
return Status::OK(); // I'm serving the imported slot
}

if (myself_ && myself_->role == kClusterSlave && !(flags & redis::kCmdWrite) &&
nodes_.find(myself_->master_id) != nodes_.end() && nodes_[myself_->master_id] == slots_nodes_[slot] &&
conn->IsFlagEnabled(redis::Connection::kReadOnly)) {
if (myself_ && myself_->role == kClusterSlave && !(flags & redis::kCmdWrite) && nodes_->count(myself_->master_id) &&
nodes_->at(myself_->master_id) == slots_nodes_[slot] && conn->IsFlagEnabled(redis::Connection::kReadOnly)) {
return Status::OK(); // My master is serving this slot
}

Expand Down Expand Up @@ -966,7 +988,7 @@ Status Cluster::Reset() {
myid_.clear();
myself_.reset();

nodes_.clear();
nodes_.reset();
for (auto &n : slots_nodes_) {
n = nullptr;
}
Expand All @@ -977,3 +999,13 @@ Status Cluster::Reset() {
unlink(srv_->GetConfig()->NodesFilePath().data());
return Status::OK();
}

// Note that if current version is lower than the given version,
// it can't be determined whether the node is in the cluster, so just regard it as in the cluster.
bool Cluster::IsInCluster(const std::string &node_id, int64_t version) const {
if (version < 0 || this->version_ < version) {
return true;
}

return nodes_->count(node_id) > 0;
}
Loading