Skip to content

Commit

Permalink
[Refactor][Core] Migrate to RunArgvAsync for RedisDelKeyPrefixSync (#…
Browse files Browse the repository at this point in the history
…49876)

Signed-off-by: Chi-Sheng Liu <[email protected]>
  • Loading branch information
MortalHappiness authored Jan 16, 2025
1 parent 9a65d0c commit 0022380
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 27 deletions.
34 changes: 15 additions & 19 deletions src/ray/gcs/redis_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,8 @@ std::optional<std::pair<std::string, int>> ParseIffMovedError(
}
} // namespace

void ValidateRedisDB(RedisContext &context) {
auto reply = context.RunArgvSync(std::vector<std::string>{"INFO", "CLUSTER"});
void RedisContext::ValidateRedisDB() {
auto reply = RunArgvSync(std::vector<std::string>{"INFO", "CLUSTER"});
// cluster_state:ok
// cluster_slots_assigned:16384
// cluster_slots_ok:16384
Expand Down Expand Up @@ -450,23 +450,22 @@ void ValidateRedisDB(RedisContext &context) {
}
}

bool isRedisSentinel(RedisContext &context) {
auto reply = context.RunArgvSync(std::vector<std::string>{"INFO", "SENTINEL"});
bool RedisContext::IsRedisSentinel() {
auto reply = RunArgvSync(std::vector<std::string>{"INFO", "SENTINEL"});
if (reply->IsNil() || reply->IsError() || reply->ReadAsString().length() == 0) {
return false;
} else {
return true;
}
}

Status ConnectRedisCluster(RedisContext &context,
const std::string &username,
const std::string &password,
bool enable_ssl,
const std::string &redis_address) {
Status RedisContext::ConnectRedisCluster(const std::string &username,
const std::string &password,
bool enable_ssl,
const std::string &redis_address) {
RAY_LOG(INFO) << "Connect to Redis Cluster";
// Ray has some restrictions for RedisDB. Validate it here.
ValidateRedisDB(context);
ValidateRedisDB();

// Find the true leader
std::vector<const char *> argv;
Expand All @@ -478,7 +477,7 @@ Status ConnectRedisCluster(RedisContext &context,
}

auto redis_reply = reinterpret_cast<redisReply *>(
::redisCommandArgv(context.sync_context(), cmds.size(), argv.data(), argc.data()));
::redisCommandArgv(sync_context(), cmds.size(), argv.data(), argc.data()));

if (redis_reply->type == REDIS_REPLY_ERROR) {
// This should be a MOVED error
Expand All @@ -488,12 +487,12 @@ Status ConnectRedisCluster(RedisContext &context,
auto maybe_ip_port = ParseIffMovedError(error_msg);
RAY_CHECK(maybe_ip_port.has_value())
<< "Setup Redis cluster failed in the dummy deletion: " << error_msg;
context.Disconnect();
Disconnect();
const auto &[ip, port] = maybe_ip_port.value();
// Connect to the true leader.
RAY_LOG(INFO) << "Redis cluster leader is " << ip << ":" << port
<< ". Reconnect to it.";
return context.Connect(ip, port, username, password, enable_ssl);
return Connect(ip, port, username, password, enable_ssl);
} else {
RAY_LOG(INFO) << "Redis cluster leader is " << redis_address;
freeReplyObject(redis_reply);
Expand Down Expand Up @@ -637,14 +636,11 @@ Status RedisContext::Connect(const std::string &address,
SetDisconnectCallback(redis_async_context_.get());

// handle validation and primary connection for different types of redis
if (isRedisSentinel(*this)) {
if (IsRedisSentinel()) {
return ConnectRedisSentinel(*this, username, password, enable_ssl);
} else {
return ConnectRedisCluster(*this,
username,
password,
enable_ssl,
ip_addresses[0] + ":" + std::to_string(port));
return ConnectRedisCluster(
username, password, enable_ssl, ip_addresses[0] + ":" + std::to_string(port));
}
}

Expand Down
21 changes: 15 additions & 6 deletions src/ray/gcs/redis_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,6 @@ class RedisContext {
/// Disconnect from the server.
void Disconnect();

/// Run an arbitrary Redis command synchronously.
///
/// \param args The vector of command args to pass to Redis.
/// \return CallbackReply(The reply from redis).
std::unique_ptr<CallbackReply> RunArgvSync(const std::vector<std::string> &args);

/// Run an arbitrary Redis command without a callback.
///
/// \param args The vector of command args to pass to Redis.
Expand All @@ -173,6 +167,21 @@ class RedisContext {
instrumented_io_context &io_service() { return io_service_; }

private:
/// Run an arbitrary Redis command synchronously.
///
/// \param args The vector of command args to pass to Redis.
/// \return CallbackReply(The reply from redis).
std::unique_ptr<CallbackReply> RunArgvSync(const std::vector<std::string> &args);

void ValidateRedisDB();

bool IsRedisSentinel();

Status ConnectRedisCluster(const std::string &username,
const std::string &password,
bool enable_ssl,
const std::string &redis_address);

instrumented_io_context &io_service_;

std::unique_ptr<redisContext, RedisContextDeleter> context_;
Expand Down
13 changes: 11 additions & 2 deletions src/ray/gcs/store_client/redis_store_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,11 @@ bool RedisDelKeyPrefixSync(const std::string &host,
RedisKey redis_key{external_storage_namespace, /*table_name=*/""};
std::vector<std::string> cmd{"KEYS",
RedisMatchPattern::Prefix(redis_key.ToString()).escaped};
auto reply = context->RunArgvSync(cmd);
std::promise<std::shared_ptr<CallbackReply>> promise;
context->RunArgvAsync(cmd, [&promise](const std::shared_ptr<CallbackReply> &reply) {
promise.set_value(reply);
});
auto reply = promise.get_future().get();
const auto &keys = reply->ReadAsStringArray();
if (keys.empty()) {
RAY_LOG(INFO) << "No keys found for external storage namespace "
Expand All @@ -532,7 +536,12 @@ bool RedisDelKeyPrefixSync(const std::string &host,
}
auto delete_one_sync = [context](const std::string &key) {
auto del_cmd = std::vector<std::string>{"DEL", key};
auto del_reply = context->RunArgvSync(del_cmd);
std::promise<std::shared_ptr<CallbackReply>> promise;
context->RunArgvAsync(del_cmd,
[&promise](const std::shared_ptr<CallbackReply> &reply) {
promise.set_value(reply);
});
auto del_reply = promise.get_future().get();
return del_reply->ReadAsInteger() > 0;
};
size_t num_deleted = 0;
Expand Down

0 comments on commit 0022380

Please sign in to comment.