Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
de4272b
feat(cluster): disconnect replication connections to orphan slaves
RiversJin Mar 23, 2025
41ddd7b
[squash me] Replace ToString() with GetStringView() in `PeerInfo`
RiversJin Mar 23, 2025
b5bb943
[squash me] removing custom hash and equality structs
RiversJin Mar 23, 2025
8af6476
Merge branch 'unstable' into fix/orphan_slave
RiversJin Mar 23, 2025
a1bd734
[squash me] fix unit test issue
RiversJin Mar 24, 2025
1f77f03
[squash me] simplify node retrieval logic in SetNodeId and SetSlotRanges
RiversJin Mar 24, 2025
2ede94c
Merge branch 'unstable' into fix/orphan_slave
RiversJin Mar 25, 2025
7cc4315
Merge branch 'unstable' into fix/orphan_slave
RiversJin Mar 27, 2025
bf7d6f8
[sqash me] remove unnecessary move semantic parameters
RiversJin Mar 27, 2025
eaec818
[squash me] change ClusterNode slots parameter to const reference and…
RiversJin Mar 28, 2025
9c4db61
[squash me] revert unrelated modifications
RiversJin Mar 28, 2025
7f11538
[squash me] Update src/cluster/cluster.cc
RiversJin Mar 28, 2025
19e81f3
[sqush me] update error message for decommissioned nodes
RiversJin Mar 28, 2025
14a7750
[squash me] revert modification in `cluser`
RiversJin Mar 28, 2025
23a1fa8
[squash me] rename `isNodeFired` to `isNodeDecommissioned`
RiversJin Mar 28, 2025
05e8a58
[squash me] format
RiversJin Mar 29, 2025
37c8cae
Merge branch 'unstable' into fix/orphan_slave
RiversJin Mar 29, 2025
b2c0d9b
[squash me] Add missing includes in `replication.cc`
RiversJin Mar 29, 2025
9571ac8
Merge branch 'unstable' into fix/orphan_slave
RiversJin Mar 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion src/cluster/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b

// Update version and cluster topology
version_ = version;
nodes_ = nodes;
nodes_ = std::move(nodes);
size_ = 0;

// Update slots to nodes
Expand Down Expand Up @@ -252,6 +252,11 @@ Status Cluster::SetMasterSlaveRepl() {
srv_->slot_migrator->SetStopMigrationFlag(false);
LOG(INFO) << "Change server role to master, restart migration task";
}

if (!is_slave) {
srv_->CleanupOrphanSlaves(version_, nodes_);
}

return Status::OK();
}

Expand Down Expand Up @@ -977,3 +982,13 @@ Status Cluster::Reset() {
unlink(srv_->GetConfig()->NodesFilePath().data());
return Status::OK();
}

// Note that if current version is lower than the given version,
// it can't be determined whether the node is in the cluster, so just regard it as in the cluster.
bool Cluster::IsInCluster(const std::string &node_id, int64_t version) const {
if (version < 0 || this->version_ < version) {
return true;
}

return nodes_.count(node_id) > 0;
}
1 change: 1 addition & 0 deletions src/cluster/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class Cluster {
Status Reset();

static bool SubCommandIsExecExclusive(const std::string &subcommand);
bool IsInCluster(const std::string &node_id, int64_t version) const;

private:
std::string getNodeIDBySlot(int slot) const;
Expand Down
1 change: 1 addition & 0 deletions src/cluster/cluster_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ inline constexpr const char *errNoMasterNode = "The node isn't a master";
inline constexpr const char *errClusterNoInitialized = "The cluster is not initialized";
inline constexpr const char *errInvalidClusterNodeInfo = "Invalid cluster nodes info";
inline constexpr const char *errInvalidImportState = "Invalid import state";
inline constexpr const char *errNodeDecommissioned = "Node has been decommissioned";

/// SlotRange is a range of cluster slots covering [start, end],
/// where the valid values for start and end are [0, kClusterSlots).
Expand Down
65 changes: 48 additions & 17 deletions src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@

Status FeedSlaveThread::Start() {
auto s = util::CreateThread("feed-replica", [this] {
sigset_t mask, omask;
sigset_t mask{}, omask{};
sigemptyset(&mask);
sigemptyset(&omask);
sigaddset(&mask, SIGCHLD);
Expand Down Expand Up @@ -459,6 +459,12 @@ ReplicationThread::CBState ReplicationThread::replConfWriteCB(bufferevent *bev)
data_to_send.emplace_back("ip-address");
data_to_send.emplace_back(config->replica_announce_ip);
}
if (!next_try_without_peer_id_ && config->cluster_enabled) {
data_to_send.emplace_back("peer-id");
data_to_send.emplace_back(srv_->cluster->GetMyId());
data_to_send.emplace_back("version");
data_to_send.emplace_back(std::to_string(srv_->cluster->GetVersion()));
}
SendString(bev, redis::ArrayOfBulkStrings(data_to_send));
repl_state_.store(kReplReplConf, std::memory_order_relaxed);
LOG(INFO) << "[replication] replconf request was sent, waiting for response";
Expand All @@ -470,26 +476,40 @@ ReplicationThread::CBState ReplicationThread::replConfReadCB(bufferevent *bev) {
UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;

auto resp = line.View();

// on unknown option: first try without announce ip, if it fails again - do nothing (to prevent infinite loop)
if (isUnknownOption(line.View()) && !next_try_without_announce_ip_address_) {
next_try_without_announce_ip_address_ = true;
LOG(WARNING) << "The old version master, can't handle ip-address, "
<< "try without it again";
// Retry previous state, i.e. send replconf again
return CBState::PREV;
if (isUnknownOption(resp)) {
if (!next_try_without_peer_id_) {
next_try_without_peer_id_ = true;
LOG(WARNING) << "The old version master, can't handle peer-id, try without it again";
return CBState::PREV;
}
if (!next_try_without_announce_ip_address_) {
next_try_without_announce_ip_address_ = true;
LOG(WARNING) << "The old version master, can't handle ip-address, try without it again";
return CBState::PREV;
}
}
if (line[0] == '-' && isRestoringError(line.View())) {

if (ResponseLineIsOK(resp)) {
LOG(INFO) << "[replication] replconf is ok, start psync";
return CBState::NEXT;
}

if (isRestoringError(resp)) {
LOG(WARNING) << "The master was restoring the db, retry later";
return CBState::RESTART;
}
if (!ResponseLineIsOK(line.View())) {
LOG(WARNING) << "[replication] Failed to replconf: " << line.get() + 1;
// backward compatible with old version that doesn't support replconf cmd
return CBState::NEXT;
} else {
LOG(INFO) << "[replication] replconf is ok, start psync";
return CBState::NEXT;

if (isNodeDecommissioned(resp)) {
LOG(ERROR) << "The master has fired the node, stop the replication";
return CBState::QUIT;
}

LOG(WARNING) << "[replication] Failed to replconf: " << line.get() + 1;
// backward compatible with old version that doesn't support replconf cmd
return CBState::NEXT;
}

ReplicationThread::CBState ReplicationThread::tryPSyncWriteCB(bufferevent *bev) {
Expand Down Expand Up @@ -772,6 +792,7 @@ Status ReplicationThread::parallelFetchFile(const std::string &dir,
std::atomic<uint32_t> fetch_cnt = {0};
std::atomic<uint32_t> skip_cnt = {0};
std::vector<std::future<Status>> results;
results.reserve(concurrency);
for (size_t tid = 0; tid < concurrency; ++tid) {
results.push_back(
std::async(std::launch::async, [this, dir, &files, tid, concurrency, &fetch_cnt, &skip_cnt]() -> Status {
Expand Down Expand Up @@ -1034,12 +1055,22 @@ Status ReplicationThread::parseWriteBatch(const rocksdb::WriteBatch &write_batch

bool ReplicationThread::isRestoringError(std::string_view err) {
// err doesn't contain the CRLF, so cannot use redis::Error here.
return err == RESP_PREFIX_ERROR + redis::StatusToRedisErrorMsg({Status::RedisLoading, redis::errRestoringBackup});
static const auto msg = fmt::format(RESP_PREFIX_ERROR "{}",
redis::StatusToRedisErrorMsg({Status::RedisLoading, redis::errRestoringBackup}));
return err == msg;
}

bool ReplicationThread::isWrongPsyncNum(std::string_view err) {
// err doesn't contain the CRLF, so cannot use redis::Error here.
return err == RESP_PREFIX_ERROR + redis::StatusToRedisErrorMsg({Status::NotOK, redis::errWrongNumOfArguments});
static const auto msg =
fmt::format(RESP_PREFIX_ERROR "{}", redis::StatusToRedisErrorMsg({Status::NotOK, redis::errWrongNumOfArguments}));
return err == msg;
}

bool ReplicationThread::isNodeDecommissioned(std::string_view err) {
static const auto msg =
fmt::format(RESP_PREFIX_ERROR "{}", redis::StatusToRedisErrorMsg({Status::NotOK, errNodeDecommissioned}));
return err == msg;
}

bool ReplicationThread::isUnknownOption(std::string_view err) {
Expand Down
3 changes: 3 additions & 0 deletions src/cluster/replication.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <deque>
#include <memory>
#include <string>
#include <string_view>
#include <thread>
#include <tuple>
#include <utility>
Expand Down Expand Up @@ -159,6 +160,7 @@ class ReplicationThread : private EventCallbackBase<ReplicationThread> {
std::atomic<ReplState> repl_state_;
std::atomic<int64_t> last_io_time_secs_ = 0;
bool next_try_old_psync_ = false;
bool next_try_without_peer_id_ = false;
bool next_try_without_announce_ip_address_ = false;

std::function<bool()> pre_fullsync_cb_;
Expand Down Expand Up @@ -209,6 +211,7 @@ class ReplicationThread : private EventCallbackBase<ReplicationThread> {
static bool isRestoringError(std::string_view err);
static bool isWrongPsyncNum(std::string_view err);
static bool isUnknownOption(std::string_view err);
static bool isNodeDecommissioned(std::string_view err);

Status parseWriteBatch(const rocksdb::WriteBatch &write_batch);
};
Expand Down
2 changes: 1 addition & 1 deletion src/commands/cmd_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ class CommandClusterX : public Commander {
} else {
return {Status::RedisExecErr, "Invalid cluster command options"};
}
if (need_persist_nodes_info && srv->GetConfig()->persist_cluster_nodes_enabled) {
if (need_persist_nodes_info && srv->GetConfig()->persist_cluster_nodes_enabled && srv->cluster->GetVersion() >= 0) {
return srv->cluster->DumpClusterNodes(srv->GetConfig()->NodesFilePath());
}
return Status::OK();
Expand Down
Loading