2323#include < config/config_util.h>
2424
2525#include < array>
26- #include < cstdint>
2726#include < cstring>
2827#include < fstream>
2928#include < memory>
30- #include < string>
31- #include < string_view>
3229#include < vector>
3330
3431#include " cluster/cluster_defs.h"
3532#include " commands/commander.h"
3633#include " common/io_util.h"
37- #include " fmt/base.h"
3834#include " fmt/format.h"
3935#include " parse_util.h"
4036#include " replication.h"
@@ -71,12 +67,14 @@ Status Cluster::SetNodeId(const std::string &node_id) {
7167 }
7268
7369 myid_ = node_id;
74- if (version_ < 0 ) {
70+ // Already has cluster topology
71+ if (version_ >= 0 && nodes_.find (node_id) != nodes_.end ()) {
72+ myself_ = nodes_[myid_];
73+ } else {
7574 myself_ = nullptr ;
76- } else if (auto it = nodes_->find (node_id); it != nodes_->end ()) {
77- myself_ = it->second ;
7875 }
7976
77+ // Set replication relationship
8078 return SetMasterSlaveRepl ();
8179}
8280
@@ -97,12 +95,9 @@ Status Cluster::SetSlotRanges(const std::vector<SlotRange> &slot_ranges, const s
9795 return {Status::NotOK, errInvalidNodeID};
9896 }
9997
100- std::shared_ptr<ClusterNode> to_assign_node{};
101-
10298 // Get the node which we want to assign slots into it
103- if (auto it = nodes_->find (node_id); it != nodes_->end ()) {
104- to_assign_node = it->second ;
105- } else {
99+ std::shared_ptr<ClusterNode> to_assign_node = nodes_[node_id];
100+ if (to_assign_node == nullptr ) {
106101 return {Status::NotOK, " No this node in the cluster" };
107102 }
108103
@@ -165,10 +160,9 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b
165160 if (version_ == version) return Status::OK ();
166161 }
167162
168- // ClusterNodes nodes;
169- auto nodes = std::make_unique<ClusterNodes>();
163+ ClusterNodes nodes;
170164 std::unordered_map<int , std::string> slots_nodes;
171- Status s = parseClusterNodes (nodes_str, nodes. get () , &slots_nodes);
165+ Status s = parseClusterNodes (nodes_str, & nodes, &slots_nodes);
172166 if (!s.IsOK ()) return s;
173167
174168 // Update version and cluster topology
@@ -178,19 +172,14 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b
178172
179173 // Update slots to nodes
180174 for (const auto &[slot, node_id] : slots_nodes) {
181- auto it = nodes_->find (node_id);
182- if (it == nodes_->end ()) {
183- return {Status::NotOK, " No this node in the cluster" };
184- }
185- slots_nodes_[slot] = it->second ;
175+ slots_nodes_[slot] = nodes_[node_id];
186176 }
187177
188178 // Update replicas info and size
189- for (const auto &[node_id, node] : * nodes_) {
179+ for (const auto &[node_id, node] : nodes_) {
190180 if (node->role == kClusterSlave ) {
191- auto it = nodes_->find (node->master_id );
192- if (it != nodes_->end ()) {
193- it->second ->replicas .emplace_back (node_id);
181+ if (nodes_.find (node->master_id ) != nodes_.end ()) {
182+ nodes_[node->master_id ]->replicas .push_back (node_id);
194183 }
195184 }
196185 if (node->role == kClusterMaster && node->slots .count () > 0 ) {
@@ -199,7 +188,7 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b
199188 }
200189
201190 if (myid_.empty () || force) {
202- for (const auto &[node_id, node] : * nodes_) {
191+ for (const auto &[node_id, node] : nodes_) {
203192 if (node->port == port_ && util::MatchListeningIP (binds_, node->host )) {
204193 myid_ = node_id;
205194 break ;
@@ -208,10 +197,8 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b
208197 }
209198
210199 myself_ = nullptr ;
211- if (!myid_.empty ()) {
212- if (auto it = nodes_->find (myid_); it != nodes_->end ()) {
213- myself_ = it->second ;
214- }
200+ if (!myid_.empty () && nodes_.find (myid_) != nodes_.end ()) {
201+ myself_ = nodes_[myid_];
215202 }
216203
217204 // Set replication relationship
@@ -265,14 +252,16 @@ Status Cluster::SetMasterSlaveRepl() {
265252 srv_->slot_migrator ->SetStopMigrationFlag (false );
266253 LOG (INFO) << " Change server role to master, restart migration task" ;
267254 }
255+
268256 if (!is_slave) {
269- srv_->CleanupOrphanSlaves (version_, * nodes_);
257+ srv_->CleanupOrphanSlaves (version_, nodes_);
270258 }
259+
271260 return Status::OK ();
272261 }
273262
274- auto it = nodes_-> find (myself_->master_id );
275- if (it != nodes_-> end ()) {
263+ auto it = nodes_. find (myself_->master_id );
264+ if (it != nodes_. end ()) {
276265 // Replica mode and master node is existing
277266 std::shared_ptr<ClusterNode> master = it->second ;
278267 auto s = srv_->AddMaster (master->host , master->port , false );
@@ -324,13 +313,10 @@ Status Cluster::SetSlotRangeImported(const SlotRange &slot_range) {
324313
325314Status Cluster::MigrateSlotRange (const SlotRange &slot_range, const std::string &dst_node_id,
326315 SyncMigrateContext *blocking_ctx) {
327- auto dst_node_it = nodes_->find (dst_node_id);
328- if (dst_node_it == nodes_->end ()) {
316+ if (nodes_.find (dst_node_id) == nodes_.end ()) {
329317 return {Status::NotOK, " Can't find the destination node id" };
330318 }
331319
332- const auto &dst_node = dst_node_it->second ;
333-
334320 if (!slot_range.IsValid ()) {
335321 return {Status::NotOK, errSlotRangeInvalid};
336322 }
@@ -350,16 +336,17 @@ Status Cluster::MigrateSlotRange(const SlotRange &slot_range, const std::string
350336 return {Status::NotOK, " Slave can't migrate slot" };
351337 }
352338
353- if (dst_node ->role != kClusterMaster ) {
339+ if (nodes_[dst_node_id] ->role != kClusterMaster ) {
354340 return {Status::NotOK, " Can't migrate slot to a slave" };
355341 }
356342
357- if (dst_node == myself_) {
343+ if (nodes_[dst_node_id] == myself_) {
358344 return {Status::NotOK, " Can't migrate slot to myself" };
359345 }
360346
361- Status s = srv_->slot_migrator ->PerformSlotRangeMigration (dst_node_id, dst_node->host , dst_node->port , slot_range,
362- blocking_ctx);
347+ const auto &dst = nodes_[dst_node_id];
348+ Status s =
349+ srv_->slot_migrator ->PerformSlotRangeMigration (dst_node_id, dst->host , dst->port , slot_range, blocking_ctx);
363350 return s;
364351}
365352
@@ -433,18 +420,27 @@ Status Cluster::GetClusterInfo(std::string *cluster_infos) {
433420 if (slots_node != nullptr ) ok_slot++;
434421 }
435422
436- *cluster_infos = fmt::format (
423+ *cluster_infos =
437424 " cluster_state:ok\r\n "
438- " cluster_slots_assigned:{ok_slot}\r\n "
439- " cluster_slots_ok:{ok_slot}\r\n "
425+ " cluster_slots_assigned:" +
426+ std::to_string (ok_slot) +
427+ " \r\n "
428+ " cluster_slots_ok:" +
429+ std::to_string (ok_slot) +
430+ " \r\n "
440431 " cluster_slots_pfail:0\r\n "
441432 " cluster_slots_fail:0\r\n "
442- " cluster_known_nodes:{nodes_size}\r\n "
443- " cluster_size:{size}\r\n "
444- " cluster_current_epoch:{version}\r\n "
445- " cluster_my_epoch:{version}\r\n " ,
446- fmt::arg (" ok_slot" , ok_slot), fmt::arg (" nodes_size" , nodes_->size ()), fmt::arg (" size" , size_),
447- fmt::arg (" version" , version_));
433+ " cluster_known_nodes:" +
434+ std::to_string (nodes_.size ()) +
435+ " \r\n "
436+ " cluster_size:" +
437+ std::to_string (size_) +
438+ " \r\n "
439+ " cluster_current_epoch:" +
440+ std::to_string (version_) +
441+ " \r\n "
442+ " cluster_my_epoch:" +
443+ std::to_string (version_) + " \r\n " ;
448444
449445 if (myself_ != nullptr && myself_->role == kClusterMaster && !srv_->IsSlave ()) {
450446 // Get migrating status
@@ -504,11 +500,8 @@ SlotInfo Cluster::genSlotNodeInfo(int start, int end, const std::shared_ptr<Clus
504500 vn.push_back ({n->host , n->port , n->id }); // itself
505501
506502 for (const auto &id : n->replicas ) { // replicas
507- auto it = nodes_->find (id);
508- if (it == nodes_->end ()) {
509- continue ;
510- }
511- vn.push_back ({it->second ->host , it->second ->port , it->second ->id });
503+ if (nodes_.find (id) == nodes_.end ()) continue ;
504+ vn.push_back ({nodes_[id]->host , nodes_[id]->port , nodes_[id]->id });
512505 }
513506
514507 return {start, end, vn};
@@ -530,8 +523,8 @@ StatusOr<std::string> Cluster::GetReplicas(const std::string &node_id) {
530523 return {Status::RedisClusterDown, errClusterNoInitialized};
531524 }
532525
533- auto item = nodes_-> find (node_id);
534- if (item == nodes_-> end ()) {
526+ auto item = nodes_. find (node_id);
527+ if (item == nodes_. end ()) {
535528 return {Status::InvalidArgument, errInvalidNodeID};
536529 }
537530
@@ -543,8 +536,8 @@ StatusOr<std::string> Cluster::GetReplicas(const std::string &node_id) {
543536 auto now = util::GetTimeStampMS ();
544537 std::string replicas_desc;
545538 for (const auto &replica_id : node->replicas ) {
546- auto n = nodes_-> find (replica_id);
547- if (n == nodes_-> end ()) {
539+ auto n = nodes_. find (replica_id);
540+ if (n == nodes_. end ()) {
548541 continue ;
549542 }
550543
@@ -577,7 +570,7 @@ std::string Cluster::genNodesDescription() {
577570
578571 auto now = util::GetTimeStampMS ();
579572 std::string nodes_desc;
580- for (const auto &[_, node] : * nodes_) {
573+ for (const auto &[_, node] : nodes_) {
581574 std::string node_str;
582575 // ID, host, port
583576 node_str.append (node->id + " " );
@@ -659,7 +652,7 @@ std::string Cluster::genNodesInfo() const {
659652 auto slots_infos = getClusterNodeSlots ();
660653
661654 std::string nodes_info;
662- for (const auto &[_, node] : * nodes_) {
655+ for (const auto &[_, node] : nodes_) {
663656 std::string node_str;
664657 node_str.append (" node " );
665658 // ID
@@ -752,8 +745,10 @@ Status Cluster::parseClusterNodes(const std::string &nodes_str, ClusterNodes *no
752745 return {Status::ClusterInvalidInfo, errInvalidClusterNodeInfo};
753746 }
754747
748+ nodes->clear ();
749+
755750 // Parse all nodes
756- for (auto &node_str : nodes_info) {
751+ for (const auto &node_str : nodes_info) {
757752 std::vector<std::string> fields = util::Split (node_str, " " );
758753 if (fields.size () < 5 ) {
759754 return {Status::ClusterInvalidInfo, errInvalidClusterNodeInfo};
@@ -764,10 +759,10 @@ Status Cluster::parseClusterNodes(const std::string &nodes_str, ClusterNodes *no
764759 return {Status::ClusterInvalidInfo, errInvalidNodeID};
765760 }
766761
767- std::string & id = fields[0 ];
762+ std::string id = fields[0 ];
768763
769764 // 2) host, TODO(@shooterit): check host is valid
770- std::string & host = fields[1 ];
765+ std::string host = fields[1 ];
771766
772767 // 3) port
773768 auto parse_result = ParseInt<uint16_t >(fields[2 ], 10 );
@@ -800,9 +795,7 @@ Status Cluster::parseClusterNodes(const std::string &nodes_str, ClusterNodes *no
800795 return {Status::ClusterInvalidInfo, errInvalidClusterNodeInfo};
801796 } else {
802797 // Create slave node
803- auto node = std::make_shared<ClusterNode>(std::move (id), std::move (host), port, role, std::move (master_id),
804- std::move (slots));
805- nodes->insert ({node->id , node});
798+ (*nodes)[id] = std::make_shared<ClusterNode>(id, host, port, role, master_id, slots);
806799 continue ;
807800 }
808801 }
@@ -828,8 +821,6 @@ Status Cluster::parseClusterNodes(const std::string &nodes_str, ClusterNodes *no
828821 if (slots_nodes->find (start) != slots_nodes->end ()) {
829822 return {Status::ClusterInvalidInfo, errSlotOverlapped};
830823 } else {
831- // It's ok cause the value of the `slots_nodes` is `std::string`,
832- // a deep copy of the `id` is created.
833824 (*slots_nodes)[start] = id;
834825 }
835826 }
@@ -858,8 +849,7 @@ Status Cluster::parseClusterNodes(const std::string &nodes_str, ClusterNodes *no
858849 }
859850
860851 // Create master node
861- auto node = std::make_shared<ClusterNode>(id, host, port, role, master_id, slots);
862- nodes->insert ({node->id , node});
852+ (*nodes)[id] = std::make_shared<ClusterNode>(id, host, port, role, master_id, slots);
863853 }
864854
865855 return Status::OK ();
@@ -946,8 +936,9 @@ Status Cluster::CanExecByMySelf(const redis::CommandAttributes *attributes, cons
946936 return Status::OK (); // I'm serving the imported slot
947937 }
948938
949- if (myself_ && myself_->role == kClusterSlave && !(flags & redis::kCmdWrite ) && nodes_->count (myself_->master_id ) &&
950- nodes_->at (myself_->master_id ) == slots_nodes_[slot] && conn->IsFlagEnabled (redis::Connection::kReadOnly )) {
939+ if (myself_ && myself_->role == kClusterSlave && !(flags & redis::kCmdWrite ) &&
940+ nodes_.find (myself_->master_id ) != nodes_.end () && nodes_[myself_->master_id ] == slots_nodes_[slot] &&
941+ conn->IsFlagEnabled (redis::Connection::kReadOnly )) {
951942 return Status::OK (); // My master is serving this slot
952943 }
953944
@@ -980,7 +971,7 @@ Status Cluster::Reset() {
980971 myid_.clear ();
981972 myself_.reset ();
982973
983- nodes_.reset ();
974+ nodes_.clear ();
984975 for (auto &n : slots_nodes_) {
985976 n = nullptr ;
986977 }
@@ -999,5 +990,5 @@ bool Cluster::IsInCluster(const std::string &node_id, int64_t version) const {
999990 return true ;
1000991 }
1001992
1002- return nodes_-> count (node_id) > 0 ;
1003- }
993+ return nodes_. count (node_id) > 0 ;
994+ }
0 commit comments