Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .clangd
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CompileFlags:
Add:
- "-ferror-limit=0"
- "-DRunningClangd"
6 changes: 6 additions & 0 deletions .config/typos.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# See https://github.com/crate-ci/typos/blob/master/docs/reference.md to configure typos

[files]
ignore-hidden = true
extend-exclude = [
]

Expand All @@ -12,9 +13,14 @@ CrEaTe = "CrEaTe"
LiSt = "LiSt"
DeBuG = "DeBuG"
DrOpInDeX = "DrOpInDeX"
Clangd = "Clangd"

[type.cpp]
extend-ignore-re = [
"baNAna",
"eXIst",
"DIALEC",
"nghi",
"bubu",
"Teser"
]
8 changes: 8 additions & 0 deletions .vscode/cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,26 @@
"words": [
"absl",
"bazel",
"bubu",
"Clandd",
"CLangd",
"DIALEC",
"Externalizer",
"highwayhash",
"hnsw",
"hnswlib",
"Inorder",
"MRMW",
"mstime",
"nghi",
"NOLINTNEXTLINE",
"nonexistentkey",
"redis",
"Redisearch",
"synchronistically",
"Teser",
"upsert",
"upserts",
"Valkey",
"valkeysearch",
"vmsdk"
Expand Down
1 change: 1 addition & 0 deletions src/commands/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ set(SRCS_COMMANDS
${CMAKE_CURRENT_LIST_DIR}/ft_list.cc
${CMAKE_CURRENT_LIST_DIR}/ft_search.cc
${CMAKE_CURRENT_LIST_DIR}/commands.h
${CMAKE_CURRENT_LIST_DIR}/commands.cc
${CMAKE_CURRENT_LIST_DIR}/ft_search.h)

valkey_search_add_static_library(commands "${SRCS_COMMANDS}")
Expand Down
141 changes: 141 additions & 0 deletions src/commands/commands.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright (c) 2025, valkey-search contributors
* All rights reserved.
* SPDX-License-Identifier: BSD 3-Clause
*
*/

#include "src/commands/commands.h"

#include "fanout.h"
#include "ft_create_parser.h"
#include "src/acl.h"
#include "src/commands/ft_search.h"
#include "src/query/search.h"
#include "src/schema_manager.h"
#include "src/valkey_search.h"
#include "valkey_search_options.h"
#include "vmsdk/src/debug.h"

namespace valkey_search {
namespace async {

struct Result {
cancel::Token cancellation_token;
absl::StatusOr<std::deque<indexes::Neighbor>> neighbors;
std::unique_ptr<QueryCommand> parameters;
};

int Timeout(ValkeyModuleCtx *ctx, [[maybe_unused]] ValkeyModuleString **argv,
[[maybe_unused]] int argc) {
return ValkeyModule_ReplyWithError(
ctx, "Search operation cancelled due to timeout");
}

int Reply(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
auto *res =
static_cast<Result *>(ValkeyModule_GetBlockedClientPrivateData(ctx));
CHECK(res != nullptr);
if (!res->neighbors.ok()) {
++Metrics::GetStats().query_failed_requests_cnt;
return ValkeyModule_ReplyWithError(
ctx, res->neighbors.status().message().data());
}
res->parameters->SendReply(ctx, res->neighbors.value());
return VALKEYMODULE_OK;
}

void Free([[maybe_unused]] ValkeyModuleCtx *ctx, void *privdata) {
auto *result = static_cast<Result *>(privdata);
delete result;
}

} // namespace async

CONTROLLED_BOOLEAN(ForceReplicasOnly, false);

//
// Common Class for FT.SEARCH and FT.AGGREGATE command
//
absl::Status QueryCommand::Execute(ValkeyModuleCtx *ctx,
ValkeyModuleString **argv, int argc,
std::unique_ptr<QueryCommand> parameters) {
auto status = [&]() -> absl::Status {
auto &schema_manager = SchemaManager::Instance();
vmsdk::ArgsIterator itr{argv + 1, argc - 1};
parameters->timeout_ms = options::GetDefaultTimeoutMs().GetValue();
VMSDK_RETURN_IF_ERROR(
vmsdk::ParseParamValue(itr, parameters->index_schema_name));
VMSDK_ASSIGN_OR_RETURN(
parameters->index_schema,
SchemaManager::Instance().GetIndexSchema(
ValkeyModule_GetSelectedDb(ctx), parameters->index_schema_name));
VMSDK_RETURN_IF_ERROR(
vmsdk::ParseParamValue(itr, parameters->parse_vars.query_string));
VMSDK_RETURN_IF_ERROR(parameters->ParseCommand(itr));
parameters->parse_vars.ClearAtEndOfParse();
parameters->cancellation_token =
cancel::Make(parameters->timeout_ms, nullptr);
static const auto permissions =
PrefixACLPermissions(kSearchCmdPermissions, kSearchCommand);
VMSDK_RETURN_IF_ERROR(AclPrefixCheck(
ctx, permissions, parameters->index_schema->GetKeyPrefixes()));

parameters->index_schema->ProcessMultiQueue();

const bool inside_multi_exec = vmsdk::MultiOrLua(ctx);
if (ABSL_PREDICT_FALSE(!ValkeySearch::Instance().SupportParallelQueries() ||
inside_multi_exec)) {
VMSDK_ASSIGN_OR_RETURN(
auto neighbors,
query::Search(*parameters, query::SearchMode::kLocal));
if (!options::GetEnablePartialResults().GetValue() &&
parameters->cancellation_token->IsCancelled()) {
ValkeyModule_ReplyWithError(
ctx, "Search operation cancelled due to timeout");
++Metrics::GetStats().query_failed_requests_cnt;
return absl::OkStatus();
}
parameters->SendReply(ctx, neighbors);
return absl::OkStatus();
}

vmsdk::BlockedClient blocked_client(ctx, async::Reply, async::Timeout,
async::Free, parameters->timeout_ms);
blocked_client.MeasureTimeStart();
auto on_done_callback = [blocked_client = std::move(blocked_client)](
auto &neighbors, auto parameters) mutable {
std::unique_ptr<QueryCommand> upcast_parameters(
dynamic_cast<QueryCommand *>(parameters.release()));
CHECK(upcast_parameters != nullptr);
auto result = std::make_unique<async::Result>(async::Result{
.neighbors = std::move(neighbors),
.parameters = std::move(upcast_parameters),
});
blocked_client.SetReplyPrivateData(result.release());
};

if (ValkeySearch::Instance().UsingCoordinator() &&
ValkeySearch::Instance().IsCluster() && !parameters->local_only) {
auto mode = /* !vmsdk::IsReadOnly(ctx) ? query::fanout::kPrimaries ? */
ForceReplicasOnly.GetValue()
? query::fanout::FanoutTargetMode::kReplicasOnly
: query::fanout::FanoutTargetMode::kRandom;
auto search_targets = query::fanout::GetSearchTargetsForFanout(ctx, mode);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is using the old code in fanout_template.h. With the new cluster map, this can be replaced by ValkeySearch()::Instance().GetClusterMap()->GetRandomTargets()/GetReplicaTargets(). A switch clause might be needed here to determine the get target mode.
If the ctx is valid here (which I think should be the case), we should use ValkeySearch()::Instance().GetOrRefreshClusterMap()

return query::fanout::PerformSearchFanoutAsync(
ctx, search_targets,
ValkeySearch::Instance().GetCoordinatorClientPool(),
std::move(parameters), ValkeySearch::Instance().GetReaderThreadPool(),
std::move(on_done_callback));
}
return query::SearchAsync(
std::move(parameters), ValkeySearch::Instance().GetReaderThreadPool(),
std::move(on_done_callback), query::SearchMode::kLocal);
}();
if (!status.ok()) {
++Metrics::GetStats().query_failed_requests_cnt;
}
return status;
}

} // namespace valkey_search
37 changes: 37 additions & 0 deletions src/commands/commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include "absl/container/flat_hash_set.h"
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "command_parser.h"
#include "src/query/search.h"
#include "vmsdk/src/valkey_module_api/valkey_module.h"

namespace valkey_search {
Expand Down Expand Up @@ -74,6 +76,41 @@ absl::Status FTDebugCmd(ValkeyModuleCtx *ctx, ValkeyModuleString **argv,
int argc);
absl::Status FTAggregateCmd(ValkeyModuleCtx *ctx, ValkeyModuleString **argv,
int argc);

//
// Common stuff for FT.SEARCH and FT.AGGREGATE command
//
struct QueryCommand : public query::SearchParameters {
QueryCommand() : query::SearchParameters(0, nullptr) {}
//
// Start of command.
//
static absl::Status Execute(ValkeyModuleCtx *ctx, ValkeyModuleString **argv,
int argc, std::unique_ptr<QueryCommand> cmd);

//
// Parse command (after index and query string)
//
virtual absl::Status ParseCommand(vmsdk::ArgsIterator &itr) = 0;
//
// Executed on Main Thread after merge
//
virtual void SendReply(ValkeyModuleCtx *ctx,
std::deque<indexes::Neighbor> &neighbors) = 0;
};

namespace async {

int Reply(ValkeyModuleCtx *ctx, [[maybe_unused]] ValkeyModuleString **argv,
[[maybe_unused]] int argc);

int Timeout(ValkeyModuleCtx *ctx, [[maybe_unused]] ValkeyModuleString **argv,
[[maybe_unused]] int argc);

void Free(ValkeyModuleCtx * /*ctx*/, void *privdata);

} // namespace async

} // namespace valkey_search

#endif // VALKEYSEARCH_SRC_COMMANDS_COMMANDS_H_
Loading
Loading