diff --git a/integration/test_ft_metadata_cluster_validation.py b/integration/test_ft_metadata_cluster_validation.py new file mode 100644 index 000000000..75b459e19 --- /dev/null +++ b/integration/test_ft_metadata_cluster_validation.py @@ -0,0 +1,282 @@ +import pytest +from valkey import ResponseError +from valkey.cluster import ValkeyCluster +from valkey.client import Valkey +from valkey_search_test_case import ValkeySearchClusterTestCase +from valkeytestframework.conftest import resource_port_tracker +from valkeytestframework.util import waiters +from ft_info_parser import FTInfoParser +from typing import List, Dict, Any + + +class TestFTMetadataClusterValidation(ValkeySearchClusterTestCase): + """ + Integration test for validating that metadata has been properly transferred + across different nodes of the cluster. This test creates various text indexes + with different parameters and validates that FT._LIST and FT.INFO commands + return consistent results across all cluster nodes. + """ + + def is_indexing_complete_on_node(self, node: Valkey, index_name: str) -> bool: + """Check if indexing is complete on a specific node.""" + try: + info_response = node.execute_command("FT.INFO", index_name) + parser = FTInfoParser(info_response) + return parser.is_backfill_complete() and parser.is_ready() + except ResponseError: + return False + + def wait_for_indexing_complete_on_all_nodes(self, index_name: str, timeout: int = 10): + """Wait for indexing to complete on all cluster nodes.""" + for i in range(self.CLUSTER_SIZE): + node = self.new_client_for_primary(i) + waiters.wait_for_equal( + lambda node=node: self.is_indexing_complete_on_node(node, index_name), + True, + timeout=timeout + ) + + def get_ft_list_from_all_nodes(self) -> List[List[bytes]]: + """Get FT._LIST results from all cluster nodes.""" + results = [] + for i in range(self.CLUSTER_SIZE): + node = self.new_client_for_primary(i) + result = node.execute_command("FT._LIST") + results.append(result) + return results + + def get_ft_info_from_all_nodes(self, index_name: str) -> List[FTInfoParser]: + """Get FT.INFO results from all cluster nodes.""" + results = [] + for i in range(self.CLUSTER_SIZE): + node = self.new_client_for_primary(i) + try: + info_response = node.execute_command("FT.INFO", index_name) + parser = FTInfoParser(info_response) + results.append(parser) + except ResponseError as e: + # If index doesn't exist on this node, we still want to track this + results.append(None) + return results + + def validate_ft_list_consistency(self, expected_indexes: List[str]): + """Validate that FT._LIST returns consistent results across all nodes.""" + ft_list_results = self.get_ft_list_from_all_nodes() + + # Convert all results to sets of strings for comparison + normalized_results = [] + for result in ft_list_results: + normalized = set() + for item in result: + if isinstance(item, bytes): + normalized.add(item.decode('utf-8')) + else: + normalized.add(str(item)) + normalized_results.append(normalized) + + # All nodes should have the same set of indexes + first_result = normalized_results[0] + for i, result in enumerate(normalized_results[1:], 1): + assert result == first_result, f"Node {i} FT._LIST result differs from node 0: {result} vs {first_result}" + + # Verify expected indexes are present + expected_set = set(expected_indexes) + assert expected_set.issubset(first_result), f"Expected indexes {expected_set} not found in {first_result}" + + def validate_ft_info_consistency(self, index_name: str, expected_attributes: Dict[str, Any]): + """Validate that FT.INFO returns consistent results across all nodes.""" + ft_info_results = self.get_ft_info_from_all_nodes(index_name) + + # All nodes should have the index + for i, parser in enumerate(ft_info_results): + assert parser is not None, f"Index '{index_name}' not found on node {i}" + + # Compare key metadata fields across all nodes + first_parser = ft_info_results[0] + + for i, parser in enumerate(ft_info_results[1:], 1): + # Validate basic index information + assert parser.index_name == first_parser.index_name, f"Index name mismatch on node {i}" + assert len(parser.attributes) == len(first_parser.attributes), f"Attribute count mismatch on node {i}" + + # Validate index definition consistency + assert parser.index_definition == first_parser.index_definition, f"Index definition mismatch on node {i}" + + # Validate attributes consistency - compare by identifier rather than position + first_attr_names = set() + for attr in first_parser.attributes: + if isinstance(attr, dict): + first_attr_names.add(attr.get('identifier')) + elif isinstance(attr, list): + parsed_attr = first_parser._parse_key_value_list(attr) + if isinstance(parsed_attr, dict): + first_attr_names.add(parsed_attr.get('identifier')) + + node_attr_names = set() + for attr in parser.attributes: + if isinstance(attr, dict): + node_attr_names.add(attr.get('identifier')) + elif isinstance(attr, list): + parsed_attr = parser._parse_key_value_list(attr) + if isinstance(parsed_attr, dict): + node_attr_names.add(parsed_attr.get('identifier')) + + assert first_attr_names == node_attr_names, f"Attribute names mismatch on node {i}: {first_attr_names} vs {node_attr_names}" + + # Validate attribute types match across nodes + for attr_name in first_attr_names: + first_attr = first_parser.get_attribute_by_name(attr_name) + node_attr = parser.get_attribute_by_name(attr_name) + assert first_attr is not None and node_attr is not None, f"Attribute '{attr_name}' parsing failed" + assert node_attr.get('type') == first_attr.get('type'), f"Attribute '{attr_name}' type mismatch on node {i}" + + # Validate specific expected attributes + for attr_name, expected_config in expected_attributes.items(): + attr = first_parser.get_attribute_by_name(attr_name) + assert attr is not None, f"Expected attribute '{attr_name}' not found" + + for key, expected_value in expected_config.items(): + actual_value = attr.get(key) + assert actual_value == expected_value, f"Attribute '{attr_name}' {key} mismatch: expected {expected_value}, got {actual_value}" + + def test_complex_text_index_metadata_validation(self): + """Test complex text index with multiple parameters and options.""" + cluster: ValkeyCluster = self.new_cluster_client() + node0: Valkey = self.new_client_for_primary(0) + + index_name = "complex_text_idx" + + # Create complex text index with various options + assert node0.execute_command( + "FT.CREATE", index_name, + "ON", "HASH", + "PREFIX", "2", "product:", "item:", + "PUNCTUATION", ".,!?", + "WITHOFFSETS", + "NOSTEM", + "STOPWORDS", "3", "the", "and", "or", + "SCHEMA", + "title", "TEXT", "NOSTEM", + "description", "TEXT", + "price", "NUMERIC", + "category", "TAG", "SEPARATOR", "|", + "subcategory", "TAG", "CASESENSITIVE" + ) == b"OK" + + # Wait for indexing to complete on all nodes + self.wait_for_indexing_complete_on_all_nodes(index_name) + + # Validate FT._LIST consistency + self.validate_ft_list_consistency([index_name]) + + # Validate FT.INFO consistency with detailed attribute checking + # Note: Based on server response, subcategory TAG field with CASESENSITIVE uses default separator ',' + expected_attributes = { + "title": { + "type": "TEXT", + "identifier": "title", + "NO_STEM": 1 + }, + "description": { + "type": "TEXT", + "identifier": "description" + }, + "price": { + "type": "NUMERIC", + "identifier": "price" + }, + "category": { + "type": "TAG", + "identifier": "category", + "SEPARATOR": "|" + }, + "subcategory": { + "type": "TAG", + "identifier": "subcategory", + "SEPARATOR": ",", + "CASESENSITIVE": 1 + } + } + self.validate_ft_info_consistency(index_name, expected_attributes) + + # Additional validation for global index settings + ft_info_results = self.get_ft_info_from_all_nodes(index_name) + first_parser = ft_info_results[0] + + # Validate global settings are consistent across nodes + assert first_parser.punctuation == ".,!?", f"Punctuation setting incorrect: {first_parser.punctuation}" + assert first_parser.with_offsets == 1, f"WithOffsets setting incorrect: {first_parser.with_offsets}" + assert set(first_parser.stop_words) == {"the", "and", "or"}, f"Stop words incorrect: {first_parser.stop_words}" + + # Validate these settings are consistent across all nodes + for i, parser in enumerate(ft_info_results[1:], 1): + assert parser.punctuation == first_parser.punctuation, f"Punctuation mismatch on node {i}" + assert parser.with_offsets == first_parser.with_offsets, f"WithOffsets mismatch on node {i}" + assert set(parser.stop_words) == set(first_parser.stop_words), f"Stop words mismatch on node {i}" + + def test_multiple_indexes_metadata_validation(self): + """Test multiple indexes with different configurations.""" + cluster: ValkeyCluster = self.new_cluster_client() + node0: Valkey = self.new_client_for_primary(0) + + indexes = [ + { + "name": "products_idx", + "command": [ + "FT.CREATE", "products_idx", + "ON", "HASH", + "PREFIX", "1", "product:", + "SCHEMA", "name", "TEXT", "price", "NUMERIC" + ], + "attributes": { + "name": {"type": "TEXT", "identifier": "name"}, + "price": {"type": "NUMERIC", "identifier": "price"} + } + }, + { + "name": "users_idx", + "command": [ + "FT.CREATE", "users_idx", + "ON", "HASH", + "PREFIX", "1", "user:", + "PUNCTUATION", ".-", + "SCHEMA", "email", "TEXT", "age", "NUMERIC", "tags", "TAG" + ], + "attributes": { + "email": {"type": "TEXT", "identifier": "email"}, + "age": {"type": "NUMERIC", "identifier": "age"}, + "tags": {"type": "TAG", "identifier": "tags"} + } + }, + { + "name": "articles_idx", + "command": [ + "FT.CREATE", "articles_idx", + "ON", "HASH", + "PREFIX", "1", "article:", + "WITHOFFSETS", + "STOPWORDS", "2", "a", "an", + "SCHEMA", "title", "TEXT", "content", "TEXT", "NOSTEM" + ], + "attributes": { + "title": {"type": "TEXT", "identifier": "title"}, + "content": {"type": "TEXT", "identifier": "content", "NO_STEM": 1} + } + } + ] + + # Create all indexes + for index_config in indexes: + assert node0.execute_command(*index_config["command"]) == b"OK" + + # Wait for all indexes to complete on all nodes + for index_config in indexes: + self.wait_for_indexing_complete_on_all_nodes(index_config["name"]) + + # Validate FT._LIST shows all indexes on all nodes + expected_index_names = [idx["name"] for idx in indexes] + self.validate_ft_list_consistency(expected_index_names) + + # Validate each index's metadata consistency + for index_config in indexes: + self.validate_ft_info_consistency(index_config["name"], index_config["attributes"]) diff --git a/integration/test_fulltext.py b/integration/test_fulltext.py index 0720bb0ff..1ef433c0a 100644 --- a/integration/test_fulltext.py +++ b/integration/test_fulltext.py @@ -286,15 +286,44 @@ def test_text_search_per_field(self): def test_default_ingestion_pipeline(self): """ - Test comprehensive ingestion pipeline: FT.CREATE → HSET → FT.SEARCH with full tokenization + Test comprehensive ingestion pipeline: FT.CREATE → HSET → FT.SEARCH with full tokenization, + index metadata validation with data, and index lifecycle management. """ client: Valkey = self.server.get_new_client() - client.execute_command("FT.CREATE idx ON HASH SCHEMA content TEXT") - client.execute_command("HSET", "doc:1", "content", "The quick-running searches are finding EFFECTIVE results!") - client.execute_command("HSET", "doc:2", "content", "But slow searches aren't working...") - # List of queries with pass/fail expectations - test_cases = [ + # Comprehensive test: Create index with multiple field types for both tokenization and metadata validation + comprehensive_index = "comprehensive_test_idx" + + # Create index with multiple field types + assert client.execute_command( + "FT.CREATE", comprehensive_index, + "ON", "HASH", + "PREFIX", "1", "doc:", + "SCHEMA", "title", "TEXT", "content", "TEXT", "score", "NUMERIC" + ) == b"OK" + + # Insert test data that covers both tokenization and metadata validation + test_docs = [ + {"key": "doc:1", "title": "First Document", "content": "The quick-running searches are finding EFFECTIVE results!", "score": "10"}, + {"key": "doc:2", "title": "Second Document", "content": "But slow searches aren't working...", "score": "20"}, + {"key": "doc:3", "title": "Third Document", "content": "This is the third document content", "score": "30"}, + {"key": "doc:4", "title": "Fourth Document", "content": "This is the fourth document content", "score": "40"}, + {"key": "doc:5", "title": "Fifth Document", "content": "This is the fifth document content", "score": "50"} + ] + + for doc in test_docs: + client.execute_command("HSET", doc["key"], "title", doc["title"], "content", doc["content"], "score", doc["score"]) + + # Wait for indexing to complete + def check_is_backfill_complete(index_name): + info = client.execute_command("FT.INFO", index_name) + parser = FTInfoParser(info) + return parser.is_backfill_complete() + + waiters.wait_for_equal(lambda: check_is_backfill_complete(comprehensive_index), True, timeout=15) + + # Test tokenization functionality with the indexed data + tokenization_test_cases = [ ("quick*", True, "Punctuation tokenization - hyphen creates word boundaries"), ("effect*", True, "Case insensitivity - lowercase matches uppercase"), ("the", False, "Stop word filtering - common words filtered out"), @@ -303,14 +332,108 @@ def test_default_ingestion_pipeline(self): ] expected_key = b'doc:1' - expected_fields = [b'content', b"The quick-running searches are finding EFFECTIVE results!"] + expected_fields = [b'title', b'First Document', b'content', b"The quick-running searches are finding EFFECTIVE results!", b'score', b'10'] - for query_term, should_match, description in test_cases: - result = client.execute_command("FT.SEARCH", "idx", f'@content:"{query_term}"') + for query_term, should_match, description in tokenization_test_cases: + result = client.execute_command("FT.SEARCH", comprehensive_index, f'@content:"{query_term}"') if should_match: - assert result[0] == 1 and result[1] == expected_key and result[2] == expected_fields, f"Failed: {description}" + assert result[0] == 1, f"Failed: {description} - expected 1 result, got {result[0]}" + assert result[1] == expected_key, f"Failed: {description} - wrong document key" + # Check that the returned fields contain the expected content + returned_fields = dict(zip(result[2][::2], result[2][1::2])) + assert returned_fields[b'content'] == b"The quick-running searches are finding EFFECTIVE results!", f"Failed: {description} - wrong content" else: - assert result[0] == 0, f"Failed: {description}" + assert result[0] == 0, f"Failed: {description} - expected 0 results, got {result[0]}" + + # Validate index exists in FT._LIST + index_list = client.execute_command("FT._LIST") + index_names = [item.decode('utf-8') if isinstance(item, bytes) else str(item) for item in index_list] + assert comprehensive_index in index_names, f"Index {comprehensive_index} not found in FT._LIST" + + # Validate FT.INFO structure and attributes + info_response = client.execute_command("FT.INFO", comprehensive_index) + parser = FTInfoParser(info_response) + + # Validate expected attributes exist with correct types + expected_attributes = { + "title": {"type": "TEXT", "identifier": "title"}, + "content": {"type": "TEXT", "identifier": "content"}, + "score": {"type": "NUMERIC", "identifier": "score"} + } + + for attr_name, expected_config in expected_attributes.items(): + attr = parser.get_attribute_by_name(attr_name) + assert attr is not None, f"Expected attribute '{attr_name}' not found" + + for key, expected_value in expected_config.items(): + actual_value = attr.get(key) + assert actual_value == expected_value, f"Attribute '{attr_name}' {key} mismatch: expected {expected_value}, got {actual_value}" + + # Validate indexing state + assert parser.is_ready(), "Index is not in ready state" + assert parser.is_backfill_complete(), "Index backfill not complete" + + # Test additional search functionality with different fields + search_result = client.execute_command("FT.SEARCH", comprehensive_index, '@title:"first"') + assert search_result[0] == 1, "Should find one document with 'first' in title" + assert search_result[1] == b"doc:1", "Should find the first document" + + # Test numeric field search + search_result = client.execute_command("FT.SEARCH", comprehensive_index, '@score:[30 50]') + assert search_result[0] == 3, "Should find three documents with scores between 30-50" + + # Index lifecycle management + lifecycle_index = "lifecycle_test_idx" + + # Create index + assert client.execute_command( + "FT.CREATE", lifecycle_index, + "ON", "HASH", + "PREFIX", "1", "test:", + "SCHEMA", "field1", "TEXT" + ) == b"OK" + + # Wait for indexing to complete + waiters.wait_for_equal(lambda: check_is_backfill_complete(lifecycle_index), True, timeout=10) + + # Validate index appears in list + index_list = client.execute_command("FT._LIST") + index_names = [item.decode('utf-8') if isinstance(item, bytes) else str(item) for item in index_list] + assert lifecycle_index in index_names, f"Index {lifecycle_index} not found in FT._LIST after creation" + + # Add some data + client.execute_command("HSET", "test:1", "field1", "test value") + + # Wait for indexing + waiters.wait_for_equal(lambda: check_is_backfill_complete(lifecycle_index), True, timeout=10) + + # Validate metadata is still consistent + info_response = client.execute_command("FT.INFO", lifecycle_index) + parser = FTInfoParser(info_response) + + field1_attr = parser.get_attribute_by_name("field1") + assert field1_attr is not None, "field1 attribute not found" + assert field1_attr.get("type") == "TEXT", "field1 should be TEXT type" + assert field1_attr.get("identifier") == "field1", "field1 identifier mismatch" + + # Test search works + search_result = client.execute_command("FT.SEARCH", lifecycle_index, '@field1:"test"') + assert search_result[0] == 1, "Should find the test document" + + # Drop index + assert client.execute_command("FT.DROPINDEX", lifecycle_index) == b"OK" + + # Validate index is removed from list + import time + time.sleep(0.5) # Brief wait for cleanup + + index_list = client.execute_command("FT._LIST") + index_names = [item.decode('utf-8') if isinstance(item, bytes) else str(item) for item in index_list] + assert lifecycle_index not in index_names, f"Index {lifecycle_index} still exists after drop" + + # Validate FT.INFO fails after drop + with pytest.raises(ResponseError): + client.execute_command("FT.INFO", lifecycle_index) def test_multi_text_field(self): """ @@ -381,4 +504,4 @@ def test_custom_punctuation(self): # @ NOT configured as separator - should not be able with split words result = client.execute_command("FT.SEARCH", "idx", '@content:"test"') - assert result[0] == 0 \ No newline at end of file + assert result[0] == 0 diff --git a/integration/test_info_cluster.py b/integration/test_info_cluster.py index 3f0742bbd..abb815d9b 100644 --- a/integration/test_info_cluster.py +++ b/integration/test_info_cluster.py @@ -2,6 +2,7 @@ from valkey_search_test_case import ValkeySearchClusterTestCase from valkey.cluster import ValkeyCluster from valkey.client import Valkey +from valkey import ResponseError from valkeytestframework.conftest import resource_port_tracker from valkeytestframework.util import waiters from test_info_primary import _parse_info_kv_list, verify_error_response @@ -64,3 +65,31 @@ def test_ft_info_non_existing_index(self): "FT.INFO index123 CLUSTER", "Index with name 'index123' not found", ) + + def test_error_handling_metadata_validation(self): + """Test error handling consistency across cluster nodes.""" + node0: Valkey = self.new_client_for_primary(0) + node1: Valkey = self.new_client_for_primary(1) + node2: Valkey = self.new_client_for_primary(2) + + # Test FT.INFO on non-existent index + for i, node in enumerate([node0, node1, node2]): + with pytest.raises(ResponseError) as exc_info: + node.execute_command("FT.INFO", "nonexistent_index") + assert "not found" in str(exc_info.value).lower(), f"Unexpected error message on node {i}: {exc_info.value}" + + # Test invalid FT.CREATE commands + invalid_commands = [ + # Missing stopwords count + ["FT.CREATE", "invalid1", "ON", "HASH", "STOPWORDS", "the", "and", "SCHEMA", "field", "TEXT"], + # Missing punctuation value + ["FT.CREATE", "invalid2", "ON", "HASH", "PUNCTUATION", "SCHEMA", "field", "TEXT"], + # Invalid field type + ["FT.CREATE", "invalid3", "ON", "HASH", "SCHEMA", "field", "INVALID_TYPE"] + ] + + for cmd in invalid_commands: + # All nodes should reject the same invalid commands + for i, node in enumerate([node0, node1, node2]): + with pytest.raises(ResponseError): + node.execute_command(*cmd) diff --git a/src/indexes/tag.cc b/src/indexes/tag.cc index 16bb7669d..af0865237 100644 --- a/src/indexes/tag.cc +++ b/src/indexes/tag.cc @@ -167,16 +167,14 @@ absl::StatusOr Tag::RemoveRecord(const InternedStringPtr& key, } int Tag::RespondWithInfo(ValkeyModuleCtx* ctx) const { - auto num_replies = 6; + auto num_replies = 8; ValkeyModule_ReplyWithSimpleString(ctx, "type"); ValkeyModule_ReplyWithSimpleString(ctx, "TAG"); ValkeyModule_ReplyWithSimpleString(ctx, "SEPARATOR"); ValkeyModule_ReplyWithSimpleString( ctx, std::string(&separator_, sizeof(char)).c_str()); - if (case_sensitive_) { - num_replies++; - ValkeyModule_ReplyWithSimpleString(ctx, "CASESENSITIVE"); - } + ValkeyModule_ReplyWithSimpleString(ctx, "CASESENSITIVE"); + ValkeyModule_ReplyWithSimpleString(ctx, case_sensitive_ ? "1" : "0"); ValkeyModule_ReplyWithSimpleString(ctx, "size"); absl::MutexLock lock(&index_mutex_); ValkeyModule_ReplyWithCString( diff --git a/testing/ft_info_test.cc b/testing/ft_info_test.cc index 9304f444e..e9090f7b3 100644 --- a/testing/ft_info_test.cc +++ b/testing/ft_info_test.cc @@ -286,9 +286,11 @@ INSTANTIATE_TEST_SUITE_P( "options\r\n*0\r\n+index_definition\r\n*6\r\n+key_" "type\r\n+HASH\r\n+prefixes\r\n*1\r\n+prefix_1\r\n+" "default_score\r\n$1\r\n1\r\n+attributes\r\n*1\r\n*" - "10\r\n+identifier\r\n+test_identifier_1\r\n+" + "12\r\n+identifier\r\n+test_identifier_1\r\n+" "attribute\r\n+test_attribute_1\r\n+type\r\n+" - "TAG\r\n+SEPARATOR\r\n+@\r\n+size\r\n$1\r\n0\r\n+" + "TAG\r\n+SEPARATOR\r\n+@\r\n+CASESENSITIVE\r\n+" + "0\r\n+" + "size\r\n$1\r\n0\r\n+" "num_docs\r\n:0\r\n+num_terms\r\n:0\r\n+" "num_records\r\n:0\r\n+hash_indexing_failures\r\n$" "1\r\n0\r\n+gc_stats\r\n*14\r\n+" @@ -348,9 +350,10 @@ INSTANTIATE_TEST_SUITE_P( "options\r\n*0\r\n+index_definition\r\n*6\r\n+key_" "type\r\n+HASH\r\n+prefixes\r\n*1\r\n+prefix_1\r\n+" "default_score\r\n$1\r\n1\r\n+attributes\r\n*1\r\n*" - "11\r\n+identifier\r\n+test_identifier_1\r\n+" + "12\r\n+identifier\r\n+test_identifier_1\r\n+" "attribute\r\n+test_attribute_1\r\n+type\r\n+" "TAG\r\n+SEPARATOR\r\n+@\r\n+CASESENSITIVE\r\n+" + "1\r\n+" "size\r\n$1\r\n0\r\n+num_docs\r\n:0\r\n+num_" "terms\r\n:0\r\n+num_records\r\n:0\r\n+" "hash_indexing_failures\r\n$1\r\n0\r\n+gc_" @@ -646,4 +649,4 @@ INSTANTIATE_TEST_SUITE_P( } // namespace -} // namespace valkey_search \ No newline at end of file +} // namespace valkey_search