diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc index 10dbf0e8380..c8379c32a22 100644 --- a/src/cluster/cluster.cc +++ b/src/cluster/cluster.cc @@ -167,7 +167,7 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b // Update version and cluster topology version_ = version; - nodes_ = nodes; + nodes_ = std::move(nodes); size_ = 0; // Update slots to nodes @@ -252,6 +252,11 @@ Status Cluster::SetMasterSlaveRepl() { srv_->slot_migrator->SetStopMigrationFlag(false); LOG(INFO) << "Change server role to master, restart migration task"; } + + if (!is_slave) { + srv_->CleanupOrphanSlaves(version_, nodes_); + } + return Status::OK(); } @@ -977,3 +982,13 @@ Status Cluster::Reset() { unlink(srv_->GetConfig()->NodesFilePath().data()); return Status::OK(); } + +// Note that if current version is lower than the given version, +// it can't be determined whether the node is in the cluster, so just regard it as in the cluster. +bool Cluster::IsInCluster(const std::string &node_id, int64_t version) const { + if (version < 0 || this->version_ < version) { + return true; + } + + return nodes_.count(node_id) > 0; +} \ No newline at end of file diff --git a/src/cluster/cluster.h b/src/cluster/cluster.h index 468c154d4d8..1406439f0f0 100644 --- a/src/cluster/cluster.h +++ b/src/cluster/cluster.h @@ -95,6 +95,7 @@ class Cluster { Status Reset(); static bool SubCommandIsExecExclusive(const std::string &subcommand); + bool IsInCluster(const std::string &node_id, int64_t version) const; private: std::string getNodeIDBySlot(int slot) const; diff --git a/src/cluster/cluster_defs.h b/src/cluster/cluster_defs.h index 12fa568d69e..c6e20b4ef5d 100644 --- a/src/cluster/cluster_defs.h +++ b/src/cluster/cluster_defs.h @@ -41,6 +41,7 @@ inline constexpr const char *errNoMasterNode = "The node isn't a master"; inline constexpr const char *errClusterNoInitialized = "The cluster is not initialized"; inline constexpr const char *errInvalidClusterNodeInfo = "Invalid cluster nodes info"; inline constexpr const char *errInvalidImportState = "Invalid import state"; +inline constexpr const char *errNodeDecommissioned = "Node has been decommissioned"; /// SlotRange is a range of cluster slots covering [start, end], /// where the valid values for start and end are [0, kClusterSlots). diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc index 78a0a02c23c..def88a9c6b1 100644 --- a/src/cluster/replication.cc +++ b/src/cluster/replication.cc @@ -58,7 +58,7 @@ Status FeedSlaveThread::Start() { auto s = util::CreateThread("feed-replica", [this] { - sigset_t mask, omask; + sigset_t mask{}, omask{}; sigemptyset(&mask); sigemptyset(&omask); sigaddset(&mask, SIGCHLD); @@ -459,6 +459,12 @@ ReplicationThread::CBState ReplicationThread::replConfWriteCB(bufferevent *bev) data_to_send.emplace_back("ip-address"); data_to_send.emplace_back(config->replica_announce_ip); } + if (!next_try_without_peer_id_ && config->cluster_enabled) { + data_to_send.emplace_back("peer-id"); + data_to_send.emplace_back(srv_->cluster->GetMyId()); + data_to_send.emplace_back("version"); + data_to_send.emplace_back(std::to_string(srv_->cluster->GetVersion())); + } SendString(bev, redis::ArrayOfBulkStrings(data_to_send)); repl_state_.store(kReplReplConf, std::memory_order_relaxed); LOG(INFO) << "[replication] replconf request was sent, waiting for response"; @@ -470,26 +476,40 @@ ReplicationThread::CBState ReplicationThread::replConfReadCB(bufferevent *bev) { UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT); if (!line) return CBState::AGAIN; + auto resp = line.View(); + // on unknown option: first try without announce ip, if it fails again - do nothing (to prevent infinite loop) - if (isUnknownOption(line.View()) && !next_try_without_announce_ip_address_) { - next_try_without_announce_ip_address_ = true; - LOG(WARNING) << "The old version master, can't handle ip-address, " - << "try without it again"; - // Retry previous state, i.e. send replconf again - return CBState::PREV; + if (isUnknownOption(resp)) { + if (!next_try_without_peer_id_) { + next_try_without_peer_id_ = true; + LOG(WARNING) << "The old version master, can't handle peer-id, try without it again"; + return CBState::PREV; + } + if (!next_try_without_announce_ip_address_) { + next_try_without_announce_ip_address_ = true; + LOG(WARNING) << "The old version master, can't handle ip-address, try without it again"; + return CBState::PREV; + } } - if (line[0] == '-' && isRestoringError(line.View())) { + + if (ResponseLineIsOK(resp)) { + LOG(INFO) << "[replication] replconf is ok, start psync"; + return CBState::NEXT; + } + + if (isRestoringError(resp)) { LOG(WARNING) << "The master was restoring the db, retry later"; return CBState::RESTART; } - if (!ResponseLineIsOK(line.View())) { - LOG(WARNING) << "[replication] Failed to replconf: " << line.get() + 1; - // backward compatible with old version that doesn't support replconf cmd - return CBState::NEXT; - } else { - LOG(INFO) << "[replication] replconf is ok, start psync"; - return CBState::NEXT; + + if (isNodeDecommissioned(resp)) { + LOG(ERROR) << "The master has fired the node, stop the replication"; + return CBState::QUIT; } + + LOG(WARNING) << "[replication] Failed to replconf: " << line.get() + 1; + // backward compatible with old version that doesn't support replconf cmd + return CBState::NEXT; } ReplicationThread::CBState ReplicationThread::tryPSyncWriteCB(bufferevent *bev) { @@ -772,6 +792,7 @@ Status ReplicationThread::parallelFetchFile(const std::string &dir, std::atomic fetch_cnt = {0}; std::atomic skip_cnt = {0}; std::vector> results; + results.reserve(concurrency); for (size_t tid = 0; tid < concurrency; ++tid) { results.push_back( std::async(std::launch::async, [this, dir, &files, tid, concurrency, &fetch_cnt, &skip_cnt]() -> Status { @@ -1034,12 +1055,22 @@ Status ReplicationThread::parseWriteBatch(const rocksdb::WriteBatch &write_batch bool ReplicationThread::isRestoringError(std::string_view err) { // err doesn't contain the CRLF, so cannot use redis::Error here. - return err == RESP_PREFIX_ERROR + redis::StatusToRedisErrorMsg({Status::RedisLoading, redis::errRestoringBackup}); + static const auto msg = fmt::format(RESP_PREFIX_ERROR "{}", + redis::StatusToRedisErrorMsg({Status::RedisLoading, redis::errRestoringBackup})); + return err == msg; } bool ReplicationThread::isWrongPsyncNum(std::string_view err) { // err doesn't contain the CRLF, so cannot use redis::Error here. - return err == RESP_PREFIX_ERROR + redis::StatusToRedisErrorMsg({Status::NotOK, redis::errWrongNumOfArguments}); + static const auto msg = + fmt::format(RESP_PREFIX_ERROR "{}", redis::StatusToRedisErrorMsg({Status::NotOK, redis::errWrongNumOfArguments})); + return err == msg; +} + +bool ReplicationThread::isNodeDecommissioned(std::string_view err) { + static const auto msg = + fmt::format(RESP_PREFIX_ERROR "{}", redis::StatusToRedisErrorMsg({Status::NotOK, errNodeDecommissioned})); + return err == msg; } bool ReplicationThread::isUnknownOption(std::string_view err) { diff --git a/src/cluster/replication.h b/src/cluster/replication.h index 75c545e08c3..423b3963b2f 100644 --- a/src/cluster/replication.h +++ b/src/cluster/replication.h @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -159,6 +160,7 @@ class ReplicationThread : private EventCallbackBase { std::atomic repl_state_; std::atomic last_io_time_secs_ = 0; bool next_try_old_psync_ = false; + bool next_try_without_peer_id_ = false; bool next_try_without_announce_ip_address_ = false; std::function pre_fullsync_cb_; @@ -209,6 +211,7 @@ class ReplicationThread : private EventCallbackBase { static bool isRestoringError(std::string_view err); static bool isWrongPsyncNum(std::string_view err); static bool isUnknownOption(std::string_view err); + static bool isNodeDecommissioned(std::string_view err); Status parseWriteBatch(const rocksdb::WriteBatch &write_batch); }; diff --git a/src/commands/cmd_cluster.cc b/src/commands/cmd_cluster.cc index 7a16ddd9ab1..519bd827c32 100644 --- a/src/commands/cmd_cluster.cc +++ b/src/commands/cmd_cluster.cc @@ -292,7 +292,7 @@ class CommandClusterX : public Commander { } else { return {Status::RedisExecErr, "Invalid cluster command options"}; } - if (need_persist_nodes_info && srv->GetConfig()->persist_cluster_nodes_enabled) { + if (need_persist_nodes_info && srv->GetConfig()->persist_cluster_nodes_enabled && srv->cluster->GetVersion() >= 0) { return srv->cluster->DumpClusterNodes(srv->GetConfig()->NodesFilePath()); } return Status::OK(); diff --git a/src/commands/cmd_replication.cc b/src/commands/cmd_replication.cc index ab288ca8d75..836c365699e 100644 --- a/src/commands/cmd_replication.cc +++ b/src/commands/cmd_replication.cc @@ -18,12 +18,22 @@ * */ +#include +#include +#include +#include + +#include "cluster/cluster_defs.h" #include "commander.h" #include "error_constants.h" +#include "fmt/format.h" #include "io_util.h" #include "scope_exit.h" +#include "server/redis_connection.h" #include "server/redis_reply.h" #include "server/server.h" +#include "stats/log_collector.h" +#include "status.h" #include "thread_util.h" #include "time_util.h" #include "unique_fd.h" @@ -57,11 +67,12 @@ class CommandPSync : public Commander { } Status Execute([[maybe_unused]] engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override { - LOG(INFO) << "Slave " << conn->GetAddr() << ", listening port: " << conn->GetListeningPort() - << ", announce ip: " << conn->GetAnnounceIP() << " asks for synchronization" - << " with next sequence: " << next_repl_seq_ - << " replication id: " << (replica_replid_.length() ? replica_replid_ : "not supported") - << ", and local sequence: " << srv->storage->LatestSeqNumber(); + auto peer_info = conn->GetPeerInfo().ToString(); + + LOG(INFO) << fmt::format( + "Slave {} asks for synchronization with next sequence: {} " + "replication id: {} and local sequence: {}", + peer_info, next_repl_seq_, (new_psync_ ? replica_replid_ : "not supported"), srv->storage->LatestSeqNumber()); bool need_full_sync = false; @@ -172,35 +183,61 @@ class CommandReplConf : public Commander { if (!parse_result) { return {Status::RedisParseErr, "listening-port should be number or out of range"}; } - port_ = *parse_result; - } else if (option == "ip-address") { - if (value == "") { + return Status::OK(); + } + + if (option == "ip-address") { + if (value.empty()) { return {Status::RedisParseErr, "ip-address should not be empty"}; } - ip_address_ = value; - } else { - return {Status::RedisParseErr, errUnknownOption}; + ip_ = value; + return Status::OK(); } - return Status::OK(); + if (option == "peer-id") { + if (value.empty()) { + return {Status::RedisParseErr, "peer-id should not be empty"}; + } + peer_id_ = value; + return Status::OK(); + } + + if (option == "version") { + auto parse_result = ParseInt(value, 10); + if (!parse_result) { + return {Status::RedisParseErr, "version should be number"}; + } + peer_version_ = *parse_result; + return Status::OK(); + } + + return {Status::RedisParseErr, errUnknownOption}; } Status Execute([[maybe_unused]] engine::Context &ctx, [[maybe_unused]] Server *srv, Connection *conn, std::string *output) override { - if (port_ != 0) { - conn->SetListeningPort(port_); + if (srv->GetConfig()->cluster_enabled && peer_version_ >= 0 && + !srv->cluster->IsInCluster(peer_id_, peer_version_)) { + return {Status::NotOK, errNodeDecommissioned}; } - if (!ip_address_.empty()) { - conn->SetAnnounceIP(ip_address_); + + if (ip_.empty()) { + ip_ = conn->GetIP(); } + + auto peer_info = std::make_unique(ip_, port_, peer_id_, peer_version_); + conn->SetPeerInfo(std::move(peer_info)); + *output = redis::RESP_OK; return Status::OK(); } private: - int port_ = 0; - std::string ip_address_; + std::string ip_; + uint32_t port_; + std::string peer_id_; + int64_t peer_version_ = -1; }; class CommandFetchMeta : public Commander { @@ -210,7 +247,7 @@ class CommandFetchMeta : public Commander { Status Execute([[maybe_unused]] engine::Context &ctx, Server *srv, Connection *conn, [[maybe_unused]] std::string *output) override { int repl_fd = conn->GetFD(); - std::string ip = conn->GetAnnounceIP(); + auto peer_info = conn->GetPeerInfo().ToString(); auto s = util::SockSetBlocking(repl_fd, 1); if (!s.IsOK()) { @@ -222,7 +259,7 @@ class CommandFetchMeta : public Commander { srv->stats.IncrFullSyncCount(); // Feed-replica-meta thread - auto t = GET_OR_RET(util::CreateThread("feed-repl-info", [srv, repl_fd, ip, bev = conn->GetBufferEvent()] { + auto t = GET_OR_RET(util::CreateThread("feed-repl-info", [srv, repl_fd, peer_info, bev = conn->GetBufferEvent()] { srv->IncrFetchFileThread(); auto exit = MakeScopeExit([srv, bev] { bufferevent_free(bev); @@ -241,9 +278,9 @@ class CommandFetchMeta : public Commander { } // Send full data file info if (auto s = util::SockSend(repl_fd, files + CRLF, bev)) { - LOG(INFO) << "[replication] Succeed sending full data file info to " << ip; + LOG(INFO) << fmt::format("[replication] Succeed sending full data file info to {}: {}", peer_info, files); } else { - LOG(WARNING) << "[replication] Fail to send full data file info " << ip << ", error: " << s.Msg(); + LOG(WARNING) << fmt::format("[replication] Failed to send full data file info to {}: {}", peer_info, s.Msg()); } auto now_secs = static_cast(util::GetTimeStamp()); srv->storage->SetCheckpointAccessTimeSecs(now_secs); @@ -269,7 +306,7 @@ class CommandFetchFile : public Commander { std::vector files = util::Split(files_str_, ","); int repl_fd = conn->GetFD(); - std::string ip = conn->GetAnnounceIP(); + auto peer_info = conn->GetPeerInfo().ToString(); auto s = util::SockSetBlocking(repl_fd, 1); if (!s.IsOK()) { @@ -279,51 +316,53 @@ class CommandFetchFile : public Commander { conn->NeedNotFreeBufferEvent(); // Feed-replica-file thread will close the replica bufferevent conn->EnableFlag(redis::Connection::kCloseAsync); - auto t = GET_OR_RET(util::CreateThread("feed-repl-file", [srv, repl_fd, ip, files, bev = conn->GetBufferEvent()]() { - auto exit = MakeScopeExit([bev] { bufferevent_free(bev); }); - srv->IncrFetchFileThread(); - - for (const auto &file : files) { - if (srv->IsStopped()) break; - - uint64_t file_size = 0, max_replication_bytes = 0; - if (srv->GetConfig()->max_replication_mb > 0 && srv->GetFetchFileThreadNum() != 0) { - max_replication_bytes = (srv->GetConfig()->max_replication_mb * MiB) / srv->GetFetchFileThreadNum(); - } - auto start = std::chrono::high_resolution_clock::now(); - auto fd = UniqueFD(engine::Storage::ReplDataManager::OpenDataFile(srv->storage, file, &file_size)); - if (!fd) break; - - // Send file size and content - auto s = util::SockSend(repl_fd, std::to_string(file_size) + CRLF, bev); - if (s) { - s = util::SockSendFile(repl_fd, *fd, file_size, bev); - } - if (s) { - LOG(INFO) << "[replication] Succeed sending file " << file << " to " << ip; - } else { - LOG(WARNING) << "[replication] Fail to send file " << file << " to " << ip << ", error: " << s.Msg(); - break; - } - fd.Close(); - - // Sleep if the speed of sending file is more than replication speed limit - auto end = std::chrono::high_resolution_clock::now(); - uint64_t duration = std::chrono::duration_cast(end - start).count(); - if (max_replication_bytes > 0) { - auto shortest = static_cast(static_cast(file_size) / - static_cast(max_replication_bytes) * (1000 * 1000)); - if (duration < shortest) { - LOG(INFO) << "[replication] Need to sleep " << (shortest - duration) / 1000 - << " ms since of sending files too quickly"; - usleep(shortest - duration); + auto t = GET_OR_RET( + util::CreateThread("feed-repl-file", [srv, repl_fd, peer_info, files, bev = conn->GetBufferEvent()]() { + auto exit = MakeScopeExit([bev] { bufferevent_free(bev); }); + srv->IncrFetchFileThread(); + + for (const auto &file : files) { + if (srv->IsStopped()) break; + + uint64_t file_size = 0, max_replication_bytes = 0; + if (srv->GetConfig()->max_replication_mb > 0 && srv->GetFetchFileThreadNum() != 0) { + max_replication_bytes = (srv->GetConfig()->max_replication_mb * MiB) / srv->GetFetchFileThreadNum(); + } + auto start = std::chrono::high_resolution_clock::now(); + auto fd = UniqueFD(engine::Storage::ReplDataManager::OpenDataFile(srv->storage, file, &file_size)); + if (!fd) break; + + // Send file size and content + auto s = util::SockSend(repl_fd, std::to_string(file_size) + CRLF, bev); + if (s) { + s = util::SockSendFile(repl_fd, *fd, file_size, bev); + } + if (s) { + LOG(INFO) << fmt::format("[replication] Succeed sending file {} to {} with size: {}", file, peer_info, + file_size); + } else { + LOG(WARNING) << fmt::format("[replication] Fail to send file {} to {}: {}", file, peer_info, s.Msg()); + break; + } + fd.Close(); + + // Sleep if the speed of sending file is more than replication speed limit + auto end = std::chrono::high_resolution_clock::now(); + uint64_t duration = std::chrono::duration_cast(end - start).count(); + if (max_replication_bytes > 0) { + auto shortest = static_cast(static_cast(file_size) / + static_cast(max_replication_bytes) * (1000 * 1000)); + if (duration < shortest) { + LOG(INFO) << "[replication] Need to sleep " << (shortest - duration) / 1000 + << " ms since of sending files too quickly"; + usleep(shortest - duration); + } + } } - } - } - auto now_secs = util::GetTimeStamp(); - srv->storage->SetCheckpointAccessTimeSecs(now_secs); - srv->DecrFetchFileThread(); - })); + auto now_secs = util::GetTimeStamp(); + srv->storage->SetCheckpointAccessTimeSecs(now_secs); + srv->DecrFetchFileThread(); + })); if (auto s = util::ThreadDetach(t); !s) { return s; diff --git a/src/server/redis_connection.h b/src/server/redis_connection.h index e8b44d94144..a832da7ea8e 100644 --- a/src/server/redis_connection.h +++ b/src/server/redis_connection.h @@ -22,10 +22,13 @@ #include +#include #include #include +#include #include #include +#include #include #include @@ -38,6 +41,8 @@ class Worker; namespace redis { +class PeerInfo; + class Connection : public EvbufCallbackBase { public: enum Flag { @@ -134,12 +139,18 @@ class Connection : public EvbufCallbackBase { void SetLastCmd(std::string cmd) { last_cmd_ = std::move(cmd); } std::string GetIP() const { return ip_; } uint32_t GetPort() const { return port_; } - void SetListeningPort(int port) { listening_port_ = port; } - int GetListeningPort() const { return listening_port_; } - void SetAnnounceIP(std::string ip) { announce_ip_ = std::move(ip); } - std::string GetAnnounceIP() const { return !announce_ip_.empty() ? announce_ip_ : ip_; } - uint32_t GetAnnouncePort() const { return listening_port_ != 0 ? listening_port_ : port_; } - std::string GetAnnounceAddr() const { return GetAnnounceIP() + ":" + std::to_string(GetAnnouncePort()); } + + void SetPeerInfo(std::unique_ptr &&peer_info) { peer_info_ = std::move(peer_info); } + + const PeerInfo &GetPeerInfo() { + if (peer_info_) { + return *peer_info_; + } + + SetPeerInfo(std::make_unique(ip_, port_, "", -1)); + return *peer_info_; + } + uint64_t GetClientType() const; Server *GetServer() { return srv_; } @@ -187,10 +198,11 @@ class Connection : public EvbufCallbackBase { std::string ns_; std::string name_; std::string ip_; - std::string announce_ip_; + + std::unique_ptr peer_info_ = nullptr; + uint32_t port_ = 0; std::string addr_; - int listening_port_ = 0; bool is_admin_ = false; bool need_free_bev_ = true; std::string last_cmd_; @@ -217,4 +229,39 @@ class Connection : public EvbufCallbackBase { RESP protocol_version_ = RESP::v2; }; +class PeerInfo { + public: + PeerInfo() = default; + ~PeerInfo() = default; + + PeerInfo(std::string_view ip, uint32_t port, std::string_view peer_id, int64_t peer_version) + : ip_(ip), port_(port), peer_id_(peer_id), peer_version_(peer_version) { + addr_ = fmt::format("{}:{}", ip, port); + } + + std::string GetIP() const { return ip_; } + uint32_t GetPort() const { return port_; } + + std::string ToString() const { + if (peer_id_.empty()) { + return fmt::format("{}:{}", ip_, port_); + } else { + return fmt::format("{}:{} ({}@{})", ip_, port_, peer_id_, peer_version_); + } + } + std::string GetPeerID() const { return peer_id_; } + std::string GetAddr() const { return addr_; } + int64_t GetPeerVersion() const { return peer_version_; } + + private: + std::string ip_; + std::string addr_; + uint32_t port_ = 0; + + std::string peer_id_; + int64_t peer_version_ = 0; + + std::string str_; +}; + } // namespace redis diff --git a/src/server/server.cc b/src/server/server.cc index 253a48029e0..1e8193651e4 100644 --- a/src/server/server.cc +++ b/src/server/server.cc @@ -37,6 +37,7 @@ #include #include #include +#include #include #include "commands/commander.h" @@ -354,6 +355,27 @@ void Server::DisconnectSlaves() { } } +void Server::CleanupOrphanSlaves(int64_t version, const ClusterNodes &nodes) { + std::lock_guard lg(slave_threads_mu_); + + for (auto &slave_thread : slave_threads_) { + const auto &peer_info = slave_thread->GetConn()->GetPeerInfo(); + auto peer_version = peer_info.GetPeerVersion(); + if (peer_version < 0 || peer_version > version) { + // The peer version is greater than the current version, + // so we can't determine whether it is an orphan node, just skip it. + continue; + } + auto peer_id = peer_info.GetPeerID(); + auto it = nodes.find(peer_id); + if (it != nodes.end()) { + // The peer id is in the cluster, so it is not an orphan node. + continue; + } + slave_thread->Stop(); + } +} + void Server::CleanupExitedSlaves() { std::lock_guard lg(slave_threads_mu_); @@ -1059,11 +1081,10 @@ Server::InfoEntries Server::GetReplicationInfo() { entries.emplace_back("connected_slaves", slave_threads_.size()); for (const auto &slave : slave_threads_) { if (slave->IsStopped()) continue; - + const auto &peer_info = slave->GetConn()->GetPeerInfo(); entries.emplace_back("slave" + std::to_string(idx), - fmt::format("ip={},port={},offset={},lag={}", slave->GetConn()->GetAnnounceIP(), - slave->GetConn()->GetAnnouncePort(), slave->GetCurrentReplSeq(), - latest_seq - slave->GetCurrentReplSeq())); + fmt::format("ip={},port={},offset={},lag={}", peer_info.GetIP(), peer_info.GetPort(), + slave->GetCurrentReplSeq(), latest_seq - slave->GetCurrentReplSeq())); ++idx; } slave_threads_mu_.unlock(); @@ -1096,10 +1117,11 @@ std::string Server::GetRoleInfo() { slave_threads_mu_.lock(); for (const auto &slave : slave_threads_) { if (slave->IsStopped()) continue; + const auto peer_info = slave->GetConn()->GetPeerInfo(); list.emplace_back(redis::ArrayOfBulkStrings({ - slave->GetConn()->GetAnnounceIP(), - std::to_string(slave->GetConn()->GetListeningPort()), + std::string(peer_info.GetIP()), + std::to_string(peer_info.GetPort()), std::to_string(slave->GetCurrentReplSeq()), })); } @@ -1658,7 +1680,7 @@ void Server::KillClient(int64_t *killed, const std::string &addr, uint64_t id, u slave_threads_mu_.lock(); for (const auto &st : slave_threads_) { if ((type & kTypeSlave) || - (!addr.empty() && (st->GetConn()->GetAddr() == addr || st->GetConn()->GetAnnounceAddr() == addr)) || + (!addr.empty() && (st->GetConn()->GetAddr() == addr || st->GetConn()->GetPeerInfo().GetAddr() == addr)) || (id != 0 && st->GetConn()->GetID() == id)) { st->Stop(); (*killed)++; @@ -2040,9 +2062,8 @@ std::list> Server::GetSlaveHostAndPort() { slave_threads_mu_.lock(); for (const auto &slave : slave_threads_) { if (slave->IsStopped()) continue; - std::pair host_port_pair = {slave->GetConn()->GetAnnounceIP(), - slave->GetConn()->GetListeningPort()}; - result.emplace_back(host_port_pair); + const auto peer_info = slave->GetConn()->GetPeerInfo(); + result.emplace_back(peer_info.GetIP(), peer_info.GetPort()); } slave_threads_mu_.unlock(); return result; diff --git a/src/server/server.h b/src/server/server.h index b8c31429123..6849792f665 100644 --- a/src/server/server.h +++ b/src/server/server.h @@ -198,6 +198,7 @@ class Server { Status RemoveMaster(); Status AddSlave(redis::Connection *conn, rocksdb::SequenceNumber next_repl_seq); void DisconnectSlaves(); + void CleanupOrphanSlaves(int64_t version, const ClusterNodes &nodes); void CleanupExitedSlaves(); bool IsSlave() const { return !master_host_.empty(); } void FeedMonitorConns(redis::Connection *conn, const std::vector &tokens); diff --git a/src/server/worker.cc b/src/server/worker.cc index 3a5642c3bbb..ea8cfe08c72 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -533,7 +533,7 @@ void Worker::KillClient(redis::Connection *self, uint64_t id, const std::string } if ((type & conn->GetClientType()) || - (!addr.empty() && (conn->GetAddr() == addr || conn->GetAnnounceAddr() == addr)) || + (!addr.empty() && (conn->GetAddr() == addr || conn->GetPeerInfo().GetAddr() == addr)) || (id != 0 && conn->GetID() == id)) { conn->EnableFlag(redis::Connection::kCloseAfterReply); // enable write event to notify worker wake up ASAP, and remove the connection diff --git a/src/storage/storage.h b/src/storage/storage.h index a4563eabc8f..b607e1cbc2d 100644 --- a/src/storage/storage.h +++ b/src/storage/storage.h @@ -29,7 +29,6 @@ #include #include -#include #include #include #include diff --git a/tests/gocase/integration/cluster/cluster_test.go b/tests/gocase/integration/cluster/cluster_test.go index 489fab266a1..244693ebd22 100644 --- a/tests/gocase/integration/cluster/cluster_test.go +++ b/tests/gocase/integration/cluster/cluster_test.go @@ -224,6 +224,11 @@ func TestClusterReplicas(t *testing.T) { clusterNode := fmt.Sprintf("%s\n%s", master1Node, master2Node) err := rdb1.Do(ctx, "clusterx", "SETNODES", clusterNode, "3").Err() require.NoError(t, err) + + require.Eventually(t, func() bool { + return util.FindInfoEntry(rdb2, "master_link_status") == "down" + }, 5*time.Second, 100*time.Millisecond) + err = rdb2.Do(ctx, "clusterx", "SETNODES", clusterNode, "3").Err() require.NoError(t, err)