Skip to content

Commit 33d36dc

Browse files
feat(elasticsearch): support for composable index templates (#15089)
Co-authored-by: Cursor Agent <[email protected]>
1 parent a3275c3 commit 33d36dc

File tree

2 files changed

+155
-30
lines changed

2 files changed

+155
-30
lines changed

metadata-ingestion/src/datahub/ingestion/source/elastic_search.py

Lines changed: 106 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -407,12 +407,78 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
407407
for mcp in self._get_data_stream_index_count_mcps():
408408
yield mcp.as_workunit()
409409
if self.source_config.ingest_index_templates:
410-
templates = self.client.indices.get_template()
411-
for template in templates:
410+
# Fetch legacy index templates
411+
legacy_templates = self.client.indices.get_template()
412+
for template in legacy_templates:
412413
if self.source_config.index_template_pattern.allowed(template):
413414
for mcp in self._extract_mcps(template, is_index=False):
414415
yield mcp.as_workunit()
415416

417+
# Fetch composable index templates (ES 7.8+ / OpenSearch)
418+
try:
419+
composable_templates = self.client.indices.get_index_template()
420+
for template_info in composable_templates.get("index_templates", []):
421+
template = template_info.get("name")
422+
if template and self.source_config.index_template_pattern.allowed(
423+
template
424+
):
425+
for mcp in self._extract_mcps(
426+
template, is_index=False, is_composable_template=True
427+
):
428+
yield mcp.as_workunit()
429+
except Exception as e:
430+
logger.warning(f"Unable to fetch composable index templates: {e}")
431+
432+
def _get_template_metadata(
433+
self, template_name: str, is_composable: bool
434+
) -> Dict[str, Any]:
435+
"""Fetch template metadata from Elasticsearch/OpenSearch."""
436+
if is_composable:
437+
# For composable templates (ES 7.8+ / OpenSearch)
438+
raw_response = self.client.indices.get_index_template(name=template_name)
439+
template_data = raw_response.get("index_templates", [{}])[0]
440+
return template_data.get("index_template", {})
441+
else:
442+
# For legacy templates
443+
raw_response = self.client.indices.get_template(name=template_name)
444+
return raw_response[template_name]
445+
446+
def _extract_template_custom_properties(
447+
self, raw_metadata: Dict[str, Any], is_composable: bool
448+
) -> Dict[str, str]:
449+
"""Extract custom properties from template metadata."""
450+
custom_properties: Dict[str, str] = {}
451+
452+
# Extract aliases
453+
if is_composable:
454+
aliases_dict = raw_metadata.get("template", {}).get("aliases", {})
455+
else:
456+
aliases_dict = raw_metadata.get("aliases", {})
457+
index_aliases: List[str] = list(aliases_dict.keys()) if aliases_dict else []
458+
if index_aliases:
459+
custom_properties["aliases"] = ",".join(index_aliases)
460+
461+
# Extract index_patterns
462+
index_patterns: List[str] = raw_metadata.get("index_patterns", [])
463+
if index_patterns:
464+
custom_properties["index_patterns"] = ",".join(index_patterns)
465+
466+
# Extract settings
467+
if is_composable:
468+
index_settings: Dict[str, Any] = (
469+
raw_metadata.get("template", {}).get("settings", {}).get("index", {})
470+
)
471+
else:
472+
index_settings = raw_metadata.get("settings", {}).get("index", {})
473+
num_shards: str = index_settings.get("number_of_shards", "")
474+
if num_shards:
475+
custom_properties["num_shards"] = num_shards
476+
num_replicas: str = index_settings.get("number_of_replicas", "")
477+
if num_replicas:
478+
custom_properties["num_replicas"] = num_replicas
479+
480+
return custom_properties
481+
416482
def _get_data_stream_index_count_mcps(
417483
self,
418484
) -> Iterable[MetadataChangeProposalWrapper]:
@@ -434,9 +500,11 @@ def _get_data_stream_index_count_mcps(
434500
)
435501

436502
def _extract_mcps(
437-
self, index: str, is_index: bool = True
503+
self, index: str, is_index: bool = True, is_composable_template: bool = False
438504
) -> Iterable[MetadataChangeProposalWrapper]:
439-
logger.debug(f"index='{index}', is_index={is_index}")
505+
logger.debug(
506+
f"index='{index}', is_index={is_index}, is_composable_template={is_composable_template}"
507+
)
440508

441509
if is_index:
442510
raw_index = self.client.indices.get(index=index)
@@ -451,15 +519,20 @@ def _extract_mcps(
451519
# This is a duplicate, skip processing it further.
452520
return
453521
else:
454-
raw_index = self.client.indices.get_template(name=index)
455-
raw_index_metadata = raw_index[index]
522+
raw_index_metadata = self._get_template_metadata(
523+
index, is_composable_template
524+
)
456525
collapsed_index_name = collapse_name(
457526
name=index, collapse_urns=self.source_config.collapse_urns
458527
)
459528

460529
# 1. Construct and emit the schemaMetadata aspect
461530
# 1.1 Generate the schema fields from ES mappings.
462-
index_mappings = raw_index_metadata["mappings"]
531+
# For composable templates, mappings are under 'template.mappings'
532+
if is_composable_template:
533+
index_mappings = raw_index_metadata.get("template", {}).get("mappings", {})
534+
else:
535+
index_mappings = raw_index_metadata.get("mappings", {})
463536
index_mappings_json_str: str = json.dumps(index_mappings)
464537
md5_hash = md5(index_mappings_json_str.encode()).hexdigest()
465538
schema_fields = list(
@@ -517,28 +590,32 @@ def _extract_mcps(
517590
),
518591
)
519592

520-
# 4. Construct and emit properties if needed. Will attempt to get the following properties
521-
custom_properties: Dict[str, str] = {}
522-
# 4.1 aliases
523-
index_aliases: List[str] = raw_index_metadata.get("aliases", {}).keys()
524-
if index_aliases:
525-
custom_properties["aliases"] = ",".join(index_aliases)
526-
# 4.2 index_patterns
527-
index_patterns: List[str] = raw_index_metadata.get("index_patterns", [])
528-
if index_patterns:
529-
custom_properties["index_patterns"] = ",".join(index_patterns)
530-
531-
# 4.3 number_of_shards
532-
index_settings: Dict[str, Any] = raw_index_metadata.get("settings", {}).get(
533-
"index", {}
534-
)
535-
num_shards: str = index_settings.get("number_of_shards", "")
536-
if num_shards:
537-
custom_properties["num_shards"] = num_shards
538-
# 4.4 number_of_replicas
539-
num_replicas: str = index_settings.get("number_of_replicas", "")
540-
if num_replicas:
541-
custom_properties["num_replicas"] = num_replicas
593+
# 4. Construct and emit properties
594+
if is_index:
595+
custom_properties: Dict[str, str] = {}
596+
# Extract properties for indices
597+
index_aliases: List[str] = list(
598+
raw_index_metadata.get("aliases", {}).keys()
599+
)
600+
if index_aliases:
601+
custom_properties["aliases"] = ",".join(index_aliases)
602+
index_patterns: List[str] = raw_index_metadata.get("index_patterns", [])
603+
if index_patterns:
604+
custom_properties["index_patterns"] = ",".join(index_patterns)
605+
index_settings: Dict[str, Any] = raw_index_metadata.get("settings", {}).get(
606+
"index", {}
607+
)
608+
num_shards: str = index_settings.get("number_of_shards", "")
609+
if num_shards:
610+
custom_properties["num_shards"] = num_shards
611+
num_replicas: str = index_settings.get("number_of_replicas", "")
612+
if num_replicas:
613+
custom_properties["num_replicas"] = num_replicas
614+
else:
615+
# Extract properties for templates
616+
custom_properties = self._extract_template_custom_properties(
617+
raw_index_metadata, is_composable_template
618+
)
542619

543620
yield MetadataChangeProposalWrapper(
544621
entityUrn=dataset_urn,

metadata-ingestion/tests/unit/test_elasticsearch_source.py

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import json
22
import logging
33
import re
4-
from typing import Any, Dict, List, Tuple
4+
from typing import Any, Dict, List, Tuple, cast
55

66
import pydantic
77
import pytest
@@ -2512,3 +2512,51 @@ def test_collapse_urns() -> None:
25122512
)
25132513
== "urn:li:dataset:(urn:li:dataPlatform:elasticsearch,platform1.prefix_datahub_usage_event,PROD)"
25142514
)
2515+
2516+
2517+
def test_composable_template_structure() -> None:
2518+
"""Test that composable template structure is correctly handled"""
2519+
# Test that the _extract_mcps method correctly handles composable template structure
2520+
# This ensures that mappings, settings, and aliases are extracted from the right location
2521+
composable_template_response = {
2522+
"index_templates": [
2523+
{
2524+
"name": "test-template",
2525+
"index_template": {
2526+
"index_patterns": ["test-*"],
2527+
"template": {
2528+
"settings": {
2529+
"index": {
2530+
"number_of_shards": "3",
2531+
"number_of_replicas": "2",
2532+
}
2533+
},
2534+
"mappings": {
2535+
"properties": {
2536+
"field1": {"type": "text"},
2537+
"field2": {"type": "keyword"},
2538+
}
2539+
},
2540+
"aliases": {"test-alias": {}},
2541+
},
2542+
},
2543+
}
2544+
]
2545+
}
2546+
2547+
# Verify the structure is as expected for composable templates
2548+
index_templates = composable_template_response.get("index_templates", [{}])
2549+
template_data = index_templates[0]
2550+
raw_index_metadata = cast(Dict[str, Any], template_data.get("index_template", {}))
2551+
2552+
# Check that mappings are under template.mappings
2553+
assert "template" in raw_index_metadata
2554+
assert "mappings" in raw_index_metadata["template"]
2555+
assert "properties" in raw_index_metadata["template"]["mappings"]
2556+
2557+
# Check that settings are under template.settings
2558+
assert "settings" in raw_index_metadata["template"]
2559+
assert "index" in raw_index_metadata["template"]["settings"]
2560+
2561+
# Check that aliases are under template.aliases
2562+
assert "aliases" in raw_index_metadata["template"]

0 commit comments

Comments
 (0)