Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions integration/indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,15 @@ def create(self, client: valkey.client):
for f in self.fields:
cmd += f.create(self.type)
print(f"Creating Index: {cmd}")
client.execute_command("DEBUG LOG", f"Creating index {self.name}")
client.execute_command(*cmd)

def drop(self, client: valkey.client):
cmd = ["FT.DROPINDEX", self.name]
print("Executing: ", cmd)
client.execute_command(*cmd)
client.execute_command("DEBUG LOG", f"Deleting index {self.name}")

def load_data(self, client: valkey.client, rows: int, start_index: int = 0):
print("Loading data to ", client)
for i in range(start_index, rows):
Expand Down Expand Up @@ -194,3 +201,20 @@ def info(self, client: valkey.client) -> FTInfoParser:
def backfill_complete(self, client: valkey.client) -> bool:
res = self.info(client)
return res.backfill_in_progress == 0

def query(self, client:valkey.client, query_string: str, *args) -> dict[bytes, dict[bytes, bytes]]:
assert self.type == KeyDataType.HASH, "JSON not supported yet"
query = ["ft.search", self.name, query_string] + list(args)
print("Execute Query Command: ", query)
result = client.execute_command(*query)
print("Result is ", result)
count = result[0]
dict_result = {}
for row in range(1, len(result)-1, 2):
key = result[row]
fields = result[row+1][0::2]
values = result[row+1][1::2]
print("Key", key, "Fields:", fields, " Values:", values)
dict_result[key] = {fields[i]:values[i] for i in range(len(fields))}
print("Final result", dict_result)
return dict_result
95 changes: 95 additions & 0 deletions integration/test_dbnum.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from valkey_search_test_case import ValkeySearchClusterTestCaseDebugMode
from valkey.cluster import ValkeyCluster
from valkey.client import Valkey
from valkeytestframework.conftest import resource_port_tracker
from valkeytestframework.util import waiters
from valkey.exceptions import ResponseError, ConnectionError
import pytest, time, logging
from indexes import *

class TestDBNum(ValkeySearchClusterTestCaseDebugMode):
def setup_connections(self):
self.client00 = self.get_primary(0).connect();
self.client00.select(0)
self.client10 = self.get_primary(1).connect();
self.client10.select(0)
self.client20 = self.get_primary(2).connect();
self.client20.select(0)
self.client01 = self.get_primary(0).connect();
self.client01.select(1)
self.client11 = self.get_primary(1).connect();
self.client11.select(1)
self.client21 = self.get_primary(2).connect();
self.client21.select(1)
self.clients = [[self.client00, self.client01], [self.client10, self.client11], [self.client20, self.client21]]

def test_dbnum(self):
"""
Test for dbnumbers in cluster mode, a Valkey 9 feature.
"""
index0 = Index('index0', [Tag('t')])
index1 = Index('index1', [Tag('t')])

self.setup_connections()

def show(msg):
for i in range(3):
self.clients[i][0].execute_command("DEBUG LOG", f"{i}:{msg}")
self.clients[i][0].execute_command("FT._DEBUG SHOW_METADATA")

def exec(dbnum, l):
for i in range(3):
l(self.clients[i][dbnum])

index1.create(self.client11)
show("After create index1")
index0.create(self.client10)
show("After create index0")
assert(self.client00.execute_command("FT._LIST") == [b'index0'])
assert(self.client10.execute_command("FT._LIST") == [b'index0'])
assert(self.client20.execute_command("FT._LIST") == [b'index0'])
assert(self.client01.execute_command("FT._LIST") == [b'index1'])
assert(self.client11.execute_command("FT._LIST") == [b'index1'])
assert(self.client21.execute_command("FT._LIST") == [b'index1'])
try:
self.client00.execute_command("debug restart")
except:
pass
self.setup_connections()
assert(self.client00.execute_command("FT._LIST") == [b'index0'])
assert(self.client10.execute_command("FT._LIST") == [b'index0'])
assert(self.client20.execute_command("FT._LIST") == [b'index0'])
assert(self.client01.execute_command("FT._LIST") == [b'index1'])
assert(self.client11.execute_command("FT._LIST") == [b'index1'])
assert(self.client21.execute_command("FT._LIST") == [b'index1'])
#
# Load some data and do some queries....
#
#cluster0 = self.new_cluster_client()
#cluster0.select(0)
#cluster1 = self.new_cluster_client()
#cluster1.select(1)
self.client20.hset("0", mapping={"t":"tag0"})
self.client21.hset("0", mapping={"t":"tag1"})
answer0 = index0.query(self.client20,"@t:{tag*}")
assert answer0 == {b"0": {b"t":b"tag0"}}
self.client21.execute_command("DEBUG LOG", "Doing query 1")
answer1 = index1.query(self.client21,"@t:{tag*}")
assert answer1 == {b"0": {b"t":b"tag1"}}

index0.drop(self.client00)
show("After drop index0")
assert(self.client00.execute_command("FT._LIST") == [])
assert(self.client10.execute_command("FT._LIST") == [])
assert(self.client20.execute_command("FT._LIST") == [])
assert(self.client01.execute_command("FT._LIST") == [b'index1'])
assert(self.client11.execute_command("FT._LIST") == [b'index1'])
assert(self.client21.execute_command("FT._LIST") == [b'index1'])
index1.drop(self.client01)
show("after drop index1")
assert(self.client00.execute_command("FT._LIST") == [])
assert(self.client10.execute_command("FT._LIST") == [])
assert(self.client20.execute_command("FT._LIST") == [])
assert(self.client01.execute_command("FT._LIST") == [])
assert(self.client11.execute_command("FT._LIST") == [])
assert(self.client21.execute_command("FT._LIST") == [])
1 change: 1 addition & 0 deletions integration/valkey_search_test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ def setup_test(self, request):
def get_config_file_lines(self, testdir, port) -> List[str]:
return [
"enable-debug-command yes",
"cluster-databases 16",
f"loadmodule {os.getenv('JSON_MODULE_PATH')}",
f"dir {testdir}",
"cluster-enabled yes",
Expand Down
2 changes: 1 addition & 1 deletion scripts/common.rc
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ function setup_valkey_server() {
fi

# Clone and build it
VALKEY_VERSION="${VALKEY_VERSION:=8.1.1}"
VALKEY_VERSION="${VALKEY_VERSION:=9.0.0-rc3}"
export VALKEY_SERVER_HOME_DIR=$(get_third_party_build_dir)/valkey-server
export VALKEY_SERVER_BUILD_DIR=${VALKEY_SERVER_HOME_DIR}/.build-release
if [ ! -d ${VALKEY_SERVER_HOME_DIR} ]; then
Expand Down
3 changes: 2 additions & 1 deletion src/commands/ft_aggregate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ absl::StatusOr<std::unique_ptr<AggregateParameters>> ParseCommand(
index_schema_name));
RealIndexInterface index_interface(index_schema);
auto params = std::make_unique<AggregateParameters>(
options::GetDefaultTimeoutMs().GetValue(), &index_interface);
options::GetDefaultTimeoutMs().GetValue(), &index_interface,
ValkeyModule_GetSelectedDb(ctx));
DBG << "AggregateParameters created for index: " << index_schema_name << " @"
<< (void *)params.get() << "\n";
params->index_schema_name = std::move(index_schema_name);
Expand Down
5 changes: 3 additions & 2 deletions src/commands/ft_aggregate_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,9 @@ struct AggregateParameters : public expr::Expression::CompileContext,
parse_vars.ClearAtEndOfParse();
}

AggregateParameters(uint64_t timeout, IndexInterface* index_interface)
: query::VectorSearchParameters(timeout, nullptr) {
AggregateParameters(uint64_t timeout, IndexInterface* index_interface,
uint32_t db_num)
: query::VectorSearchParameters(timeout, nullptr, db_num) {
parse_vars_.index_interface_ = index_interface;
}

Expand Down
6 changes: 6 additions & 0 deletions src/commands/ft_debug.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "module_config.h"
#include "src/commands/commands.h"
#include "src/coordinator/metadata_manager.h"
#include "vmsdk/src/command_parser.h"
#include "vmsdk/src/debug.h"
#include "vmsdk/src/info.h"
Expand Down Expand Up @@ -144,6 +145,8 @@ absl::Status HelpCmd(ValkeyModuleCtx *ctx, vmsdk::ArgsIterator &itr) {
"list all controlled variables and their values"},
{"FT._DEBUG PAUSEPOINT [ SET | RESET | TEST | LIST] <pausepoint>",
"control pause points"},
{"FT_DEBUG SHOW_METADATA",
"list internal metadata manager table namespace"},
};
ValkeyModule_ReplySetArrayLength(ctx, 2 * help_text.size());
for (auto &pair : help_text) {
Expand Down Expand Up @@ -185,6 +188,9 @@ absl::Status FTDebugCmd(ValkeyModuleCtx *ctx, ValkeyModuleString **argv,
return PausePointControlCmd(ctx, itr);
} else if (keyword == "CONTROLLED_VARIABLE") {
return ControlledCmd(ctx, itr);
} else if (keyword == "SHOW_METADATA") {
return valkey_search::coordinator::MetadataManager::Instance().ShowMetadata(
ctx, itr);
} else if (keyword == "HELP") {
return HelpCmd(ctx, itr);
} else {
Expand Down
3 changes: 2 additions & 1 deletion src/commands/ft_search_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,8 @@ ParseVectorSearchParameters(ValkeyModuleCtx *ctx, ValkeyModuleString **argv,
int argc, const SchemaManager &schema_manager) {
vmsdk::ArgsIterator itr{argv, argc};
auto parameters = std::make_unique<query::VectorSearchParameters>(
options::GetDefaultTimeoutMs().GetValue(), nullptr);
options::GetDefaultTimeoutMs().GetValue(), nullptr,
ValkeyModule_GetSelectedDb(ctx));
VMSDK_RETURN_IF_ERROR(
vmsdk::ParseParamValue(itr, parameters->index_schema_name));
VMSDK_ASSIGN_OR_RETURN(
Expand Down
2 changes: 2 additions & 0 deletions src/coordinator/coordinator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ message SearchIndexPartitionRequest {
bool no_content = 11;
optional Predicate root_filter_predicate = 12;
repeated ReturnParameter return_parameters = 13;
uint32 db_num = 14;
}

message NeighborEntry {
Expand Down Expand Up @@ -172,4 +173,5 @@ message InfoIndexPartitionResponse {
string error = 14;
optional IndexFingerprintVersion index_fingerprint_version = 15;
FanoutErrorType error_type = 16;
uint32 db_num = 17;
}
16 changes: 15 additions & 1 deletion src/coordinator/metadata_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "command_parser.h"
#include "google/protobuf/any.pb.h"
#include "google/protobuf/util/json_util.h"
#include "grpcpp/support/status.h"
#include "highwayhash/arch_specific.h"
#include "highwayhash/highwayhash.h"
#include "src/coordinator/client_pool.h"
#include "src/coordinator/coordinator.pb.h"
#include "src/coordinator/util.h"
#include "src/rdb_serialization.h"
#include "src/valkey_search_options.h"
#include "vmsdk/src/log.h"
#include "vmsdk/src/status/status_macros.h"
#include "vmsdk/src/utils.h"
Expand Down Expand Up @@ -656,4 +657,17 @@ void MetadataManager::RegisterForClusterMessages(ValkeyModuleCtx *ctx) {
ctx, coordinator::kMetadataBroadcastClusterMessageReceiverId,
MetadataManagerOnClusterMessageCallback);
}

absl::Status MetadataManager::ShowMetadata(
ValkeyModuleCtx *ctx, [[maybe_unused]] vmsdk::ArgsIterator &itr) const {
auto metadata = metadata_.Get().DebugString();
VMSDK_LOG(WARNING, ctx) << "Metadata: " << metadata;
google::protobuf::util::JsonPrintOptions options;
options.always_print_fields_with_no_presence = true;
[[maybe_unused]] auto status = google::protobuf::util::MessageToJsonString(
metadata_.Get(), &metadata, options);
ValkeyModule_ReplyWithStringBuffer(ctx, metadata.data(), metadata.size());
return absl::OkStatus();
}

} // namespace valkey_search::coordinator
5 changes: 4 additions & 1 deletion src/coordinator/metadata_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "command_parser.h"
#include "google/protobuf/any.pb.h"
#include "highwayhash/hh_types.h"
#include "src/coordinator/client_pool.h"
Expand All @@ -43,7 +44,6 @@ static constexpr uint8_t kMetadataBroadcastClusterMessageReceiverId = 0x00;
static constexpr highwayhash::HHKey kHashKey{
0x9736bad976c904ea, 0x08f963a1a52eece9, 0x1ea3f3f773f3b510,
0x9290a6b4e4db3d51};

class MetadataManager {
public:
MetadataManager(ValkeyModuleCtx *ctx, ClientPool &client_pool)
Expand Down Expand Up @@ -141,6 +141,9 @@ class MetadataManager {
static void InitInstance(std::unique_ptr<MetadataManager> instance);
static MetadataManager &Instance();

absl::Status ShowMetadata(ValkeyModuleCtx *ctx,
vmsdk::ArgsIterator &iter) const;

private:
struct RegisteredType {
uint32_t encoding_version;
Expand Down
9 changes: 5 additions & 4 deletions src/coordinator/search_converter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,12 @@ absl::StatusOr<std::unique_ptr<query::VectorSearchParameters>>
GRPCSearchRequestToParameters(const SearchIndexPartitionRequest& request,
grpc::CallbackServerContext* context) {
auto parameters = std::make_unique<query::VectorSearchParameters>(
request.timeout_ms(), context);
request.timeout_ms(), context, request.db_num());
parameters->index_schema_name = request.index_schema_name();
parameters->attribute_alias = request.attribute_alias();
VMSDK_ASSIGN_OR_RETURN(
parameters->index_schema,
SchemaManager::Instance().GetIndexSchema(0, request.index_schema_name()));
VMSDK_ASSIGN_OR_RETURN(parameters->index_schema,
SchemaManager::Instance().GetIndexSchema(
request.db_num(), request.index_schema_name()));
if (request.has_score_as()) {
parameters->score_as = vmsdk::MakeUniqueValkeyString(request.score_as());
} else {
Expand Down Expand Up @@ -227,6 +227,7 @@ std::unique_ptr<SearchIndexPartitionRequest> ParametersToGRPCSearchRequest(
const query::VectorSearchParameters& parameters) {
auto request = std::make_unique<SearchIndexPartitionRequest>();
request->set_index_schema_name(parameters.index_schema_name);
request->set_db_num(parameters.db_num_);
request->set_attribute_alias(parameters.attribute_alias);
request->set_score_as(vmsdk::ToStringView(parameters.score_as.get()));
request->set_query(parameters.query);
Expand Down
7 changes: 5 additions & 2 deletions src/coordinator/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,17 @@ Service::GenerateInfoResponse(
kSchemaManagerMetadataTypeName));
const auto& entry_map =
global_metadata->type_namespace_map().at(kSchemaManagerMetadataTypeName);
CHECK(entry_map.entries().contains(index_name));
const auto& entry = entry_map.entries().at(index_name);
std::string encoded_index_name =
IndexName(db_num, index_name).GetEncodedName();
CHECK(entry_map.entries().contains(encoded_index_name));
const auto& entry = entry_map.entries().at(encoded_index_name);
index_fingerprint_version.emplace();
index_fingerprint_version->set_fingerprint(entry.fingerprint());
index_fingerprint_version->set_version(entry.version());

response.set_exists(true);
response.set_index_name(index_name);
response.set_db_num(db_num);
response.set_num_docs(data.num_docs);
response.set_num_records(data.num_records);
response.set_hash_indexing_failures(data.hash_indexing_failures);
Expand Down
1 change: 1 addition & 0 deletions src/query/fanout_operation_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ class FanoutOperationBase {
++Metrics::GetStats().info_fanout_retry_cnt;
ResetBaseForRetry();
ResetForRetry();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
StartFanoutRound();
} else {
OnCompletion();
Expand Down
7 changes: 5 additions & 2 deletions src/query/search.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ struct VectorSearchParameters {
vmsdk::UniqueValkeyString score_as;
std::string query;
uint32_t dialect{kDialect};
uint32_t db_num_;
bool local_only{false};
int k{0};
std::optional<unsigned> ef;
Expand Down Expand Up @@ -110,9 +111,11 @@ struct VectorSearchParameters {
} parse_vars;
bool IsNonVectorQuery() const { return attribute_alias.empty(); }
bool IsVectorQuery() const { return !IsNonVectorQuery(); }
VectorSearchParameters(uint64_t timeout, grpc::CallbackServerContext* context)
VectorSearchParameters(uint64_t timeout, grpc::CallbackServerContext* context,
uint32_t db_num)
: timeout_ms(timeout),
cancellation_token(cancel::Make(timeout, context)) {}
cancellation_token(cancel::Make(timeout, context)),
db_num_(db_num) {}
};

// Callback to be called when the search is done.
Expand Down
Loading
Loading