diff --git a/metadata-ingestion/src/datahub/ingestion/source/elastic_search.py b/metadata-ingestion/src/datahub/ingestion/source/elastic_search.py index f37f28c6d6acb0..785047a9a43745 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/elastic_search.py +++ b/metadata-ingestion/src/datahub/ingestion/source/elastic_search.py @@ -407,12 +407,78 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: for mcp in self._get_data_stream_index_count_mcps(): yield mcp.as_workunit() if self.source_config.ingest_index_templates: - templates = self.client.indices.get_template() - for template in templates: + # Fetch legacy index templates + legacy_templates = self.client.indices.get_template() + for template in legacy_templates: if self.source_config.index_template_pattern.allowed(template): for mcp in self._extract_mcps(template, is_index=False): yield mcp.as_workunit() + # Fetch composable index templates (ES 7.8+ / OpenSearch) + try: + composable_templates = self.client.indices.get_index_template() + for template_info in composable_templates.get("index_templates", []): + template = template_info.get("name") + if template and self.source_config.index_template_pattern.allowed( + template + ): + for mcp in self._extract_mcps( + template, is_index=False, is_composable_template=True + ): + yield mcp.as_workunit() + except Exception as e: + logger.warning(f"Unable to fetch composable index templates: {e}") + + def _get_template_metadata( + self, template_name: str, is_composable: bool + ) -> Dict[str, Any]: + """Fetch template metadata from Elasticsearch/OpenSearch.""" + if is_composable: + # For composable templates (ES 7.8+ / OpenSearch) + raw_response = self.client.indices.get_index_template(name=template_name) + template_data = raw_response.get("index_templates", [{}])[0] + return template_data.get("index_template", {}) + else: + # For legacy templates + raw_response = self.client.indices.get_template(name=template_name) + return raw_response[template_name] + + def _extract_template_custom_properties( + self, raw_metadata: Dict[str, Any], is_composable: bool + ) -> Dict[str, str]: + """Extract custom properties from template metadata.""" + custom_properties: Dict[str, str] = {} + + # Extract aliases + if is_composable: + aliases_dict = raw_metadata.get("template", {}).get("aliases", {}) + else: + aliases_dict = raw_metadata.get("aliases", {}) + index_aliases: List[str] = list(aliases_dict.keys()) if aliases_dict else [] + if index_aliases: + custom_properties["aliases"] = ",".join(index_aliases) + + # Extract index_patterns + index_patterns: List[str] = raw_metadata.get("index_patterns", []) + if index_patterns: + custom_properties["index_patterns"] = ",".join(index_patterns) + + # Extract settings + if is_composable: + index_settings: Dict[str, Any] = ( + raw_metadata.get("template", {}).get("settings", {}).get("index", {}) + ) + else: + index_settings = raw_metadata.get("settings", {}).get("index", {}) + num_shards: str = index_settings.get("number_of_shards", "") + if num_shards: + custom_properties["num_shards"] = num_shards + num_replicas: str = index_settings.get("number_of_replicas", "") + if num_replicas: + custom_properties["num_replicas"] = num_replicas + + return custom_properties + def _get_data_stream_index_count_mcps( self, ) -> Iterable[MetadataChangeProposalWrapper]: @@ -434,9 +500,11 @@ def _get_data_stream_index_count_mcps( ) def _extract_mcps( - self, index: str, is_index: bool = True + self, index: str, is_index: bool = True, is_composable_template: bool = False ) -> Iterable[MetadataChangeProposalWrapper]: - logger.debug(f"index='{index}', is_index={is_index}") + logger.debug( + f"index='{index}', is_index={is_index}, is_composable_template={is_composable_template}" + ) if is_index: raw_index = self.client.indices.get(index=index) @@ -451,15 +519,20 @@ def _extract_mcps( # This is a duplicate, skip processing it further. return else: - raw_index = self.client.indices.get_template(name=index) - raw_index_metadata = raw_index[index] + raw_index_metadata = self._get_template_metadata( + index, is_composable_template + ) collapsed_index_name = collapse_name( name=index, collapse_urns=self.source_config.collapse_urns ) # 1. Construct and emit the schemaMetadata aspect # 1.1 Generate the schema fields from ES mappings. - index_mappings = raw_index_metadata["mappings"] + # For composable templates, mappings are under 'template.mappings' + if is_composable_template: + index_mappings = raw_index_metadata.get("template", {}).get("mappings", {}) + else: + index_mappings = raw_index_metadata.get("mappings", {}) index_mappings_json_str: str = json.dumps(index_mappings) md5_hash = md5(index_mappings_json_str.encode()).hexdigest() schema_fields = list( @@ -517,28 +590,32 @@ def _extract_mcps( ), ) - # 4. Construct and emit properties if needed. Will attempt to get the following properties - custom_properties: Dict[str, str] = {} - # 4.1 aliases - index_aliases: List[str] = raw_index_metadata.get("aliases", {}).keys() - if index_aliases: - custom_properties["aliases"] = ",".join(index_aliases) - # 4.2 index_patterns - index_patterns: List[str] = raw_index_metadata.get("index_patterns", []) - if index_patterns: - custom_properties["index_patterns"] = ",".join(index_patterns) - - # 4.3 number_of_shards - index_settings: Dict[str, Any] = raw_index_metadata.get("settings", {}).get( - "index", {} - ) - num_shards: str = index_settings.get("number_of_shards", "") - if num_shards: - custom_properties["num_shards"] = num_shards - # 4.4 number_of_replicas - num_replicas: str = index_settings.get("number_of_replicas", "") - if num_replicas: - custom_properties["num_replicas"] = num_replicas + # 4. Construct and emit properties + if is_index: + custom_properties: Dict[str, str] = {} + # Extract properties for indices + index_aliases: List[str] = list( + raw_index_metadata.get("aliases", {}).keys() + ) + if index_aliases: + custom_properties["aliases"] = ",".join(index_aliases) + index_patterns: List[str] = raw_index_metadata.get("index_patterns", []) + if index_patterns: + custom_properties["index_patterns"] = ",".join(index_patterns) + index_settings: Dict[str, Any] = raw_index_metadata.get("settings", {}).get( + "index", {} + ) + num_shards: str = index_settings.get("number_of_shards", "") + if num_shards: + custom_properties["num_shards"] = num_shards + num_replicas: str = index_settings.get("number_of_replicas", "") + if num_replicas: + custom_properties["num_replicas"] = num_replicas + else: + # Extract properties for templates + custom_properties = self._extract_template_custom_properties( + raw_index_metadata, is_composable_template + ) yield MetadataChangeProposalWrapper( entityUrn=dataset_urn, diff --git a/metadata-ingestion/tests/unit/test_elasticsearch_source.py b/metadata-ingestion/tests/unit/test_elasticsearch_source.py index 3b8435e531fb21..872dd882673b8a 100644 --- a/metadata-ingestion/tests/unit/test_elasticsearch_source.py +++ b/metadata-ingestion/tests/unit/test_elasticsearch_source.py @@ -1,7 +1,7 @@ import json import logging import re -from typing import Any, Dict, List, Tuple +from typing import Any, Dict, List, Tuple, cast import pydantic import pytest @@ -2512,3 +2512,51 @@ def test_collapse_urns() -> None: ) == "urn:li:dataset:(urn:li:dataPlatform:elasticsearch,platform1.prefix_datahub_usage_event,PROD)" ) + + +def test_composable_template_structure() -> None: + """Test that composable template structure is correctly handled""" + # Test that the _extract_mcps method correctly handles composable template structure + # This ensures that mappings, settings, and aliases are extracted from the right location + composable_template_response = { + "index_templates": [ + { + "name": "test-template", + "index_template": { + "index_patterns": ["test-*"], + "template": { + "settings": { + "index": { + "number_of_shards": "3", + "number_of_replicas": "2", + } + }, + "mappings": { + "properties": { + "field1": {"type": "text"}, + "field2": {"type": "keyword"}, + } + }, + "aliases": {"test-alias": {}}, + }, + }, + } + ] + } + + # Verify the structure is as expected for composable templates + index_templates = composable_template_response.get("index_templates", [{}]) + template_data = index_templates[0] + raw_index_metadata = cast(Dict[str, Any], template_data.get("index_template", {})) + + # Check that mappings are under template.mappings + assert "template" in raw_index_metadata + assert "mappings" in raw_index_metadata["template"] + assert "properties" in raw_index_metadata["template"]["mappings"] + + # Check that settings are under template.settings + assert "settings" in raw_index_metadata["template"] + assert "index" in raw_index_metadata["template"]["settings"] + + # Check that aliases are under template.aliases + assert "aliases" in raw_index_metadata["template"]