diff --git a/modelcontextprotocol/server.py b/modelcontextprotocol/server.py index e8a29fd..44816e0 100644 --- a/modelcontextprotocol/server.py +++ b/modelcontextprotocol/server.py @@ -8,6 +8,7 @@ get_assets_by_dsl, traverse_lineage, update_assets, + get_custom_metadata_context, create_glossary_category_assets, create_glossary_assets, create_glossary_term_assets, @@ -42,6 +43,7 @@ @mcp.tool() def search_assets_tool( conditions=None, + custom_metadata_conditions=None, negative_conditions=None, some_conditions=None, min_somes=1, @@ -65,6 +67,8 @@ def search_assets_tool( Args: conditions (Dict[str, Any], optional): Dictionary of attribute conditions to match. Format: {"attribute_name": value} or {"attribute_name": {"operator": operator, "value": value}} + custom_metadata_conditions (List[Dict[str, Any]], optional): List of custom metadata conditions to match. + Format: [{"custom_metadata_filter": {"display_name": "Business Metadata Name", "property_filters": [{"property_name": "property", "property_value": "value", "operator": "eq"}]}}] negative_conditions (Dict[str, Any], optional): Dictionary of attribute conditions to exclude. Format: {"attribute_name": value} or {"attribute_name": {"operator": operator, "value": value}} some_conditions (Dict[str, Any], optional): Conditions for where_some() queries that require min_somes of them to match. @@ -110,6 +114,86 @@ def search_assets_tool( include_attributes=["owner_users", "owner_groups"] ) + # Search for assets with custom metadata having a specific property filter (eq) + assets = search_assets( + custom_metadata_conditions=[{ + "custom_metadata_filter": { + "display_name": "Business Ownership", # This is the display name of the business metadata + "property_filters": [{ + "property_name": "business_owner", # This is the display name of the property + "property_value": "John", # This is the value of the property + "operator": "eq" + }] + } + }], + include_attributes=["name", "qualified_name", "type_name", "description", "certificate_status"] + ) + + # Search for assets with custom metadata having a specific property filter (gt) + assets = search_assets( + custom_metadata_conditions=[{ + "custom_metadata_filter": { + "display_name": "Data Quality", + "property_filters": [{ + "property_name": "quality_score", + "property_value": 80, + "operator": "gt" + }] + } + }], + include_attributes=["name", "qualified_name", "type_name", "description", "certificate_status"] + ) + + # Search for assets with custom metadata having multiple property filters (eq and gte) + assets = search_assets( + custom_metadata_conditions=[{ + "custom_metadata_filter": { + "display_name": "Data Governance", + "property_filters": [ + { + "property_name": "data_owner", + "property_value": "John Smith", + "operator": "eq" + }, + { + "property_name": "retention_period", + "property_value": 365, + "operator": "gte" + } + ] + } + }], + include_attributes=["name", "qualified_name", "type_name", "description", "certificate_status"] + ) + + # Search for assets with custom metadata having multiple business metadata filters (eq and gte) + assets = search_assets( + custom_metadata_conditions=[ + { + "custom_metadata_filter": { + "display_name": "Data Classification", + "property_filters": [{ + "property_name": "sensitivity_level", + "property_value": "sensitive", + "operator": "eq" + }] + } + }, + { + "custom_metadata_filter": { + "display_name": "Data Quality", + "property_filters": [{ + "property_name": "quality_score", + "property_value": 80, + "operator": "gte" + }] + } + } + ], + include_attributes=["name", "qualified_name", "type_name", "description", "certificate_status"] + ) + + # Search for columns with specific certificate status columns = search_assets( asset_type="Column", @@ -234,6 +318,7 @@ def search_assets_tool( try: # Parse JSON string parameters if needed conditions = parse_json_parameter(conditions) + custom_metadata_conditions = parse_json_parameter(custom_metadata_conditions) negative_conditions = parse_json_parameter(negative_conditions) some_conditions = parse_json_parameter(some_conditions) date_range = parse_json_parameter(date_range) @@ -244,6 +329,7 @@ def search_assets_tool( return search_assets( conditions, + custom_metadata_conditions, negative_conditions, some_conditions, min_somes, @@ -694,6 +780,107 @@ def create_glossary_categories(categories) -> List[Dict[str, Any]]: return create_glossary_category_assets(categories) +@mcp.tool() +def get_custom_metadata_context_tool() -> Dict[str, Any]: + """ + Fetch the custom metadata context for all business metadata definitions in the Atlan instance. + + This tool is used to get the custom metadata context for all business metadata definitions + present in the Atlan instance. + + Eventually, this tool helps to prepare the payload for search_assets tool, when users + want to search for assets with filters on custom metadata. + + This tool can only be called once in a chat conversation. + + Returns: + List[Dict[str, Any]]: List of business metadata definitions, each containing: + - prompt: Formatted string prompt for the business metadata definition + - metadata: Dictionary with business metadata details including: + - name: Internal name of the business metadata + - display_name: Display name of the business metadata + - description: Description of the business metadata + - attributes: List of attribute definitions with name, display_name, data_type, description, and optional enumEnrichment + - id: GUID of the business metadata definition + + Raises: + Exception: If there's an error retrieving the custom metadata context + + Examples: + # Step 1: Get custom metadata context to understand available business metadata + context = get_custom_metadata_context_tool() + + # Step 2: Use the context to prepare custom_metadata_conditions for search_assets_tool + # Example context result might show business metadata like "Data Classification" with attributes + + # Example 1: Equality operator (eq) - exact match + assets = search_assets_tool( + asset_type="Table", + custom_metadata_conditions=[{ + "custom_metadata_filter": { + "display_name": "Data Classification", # This is the display name of the business metadata + "property_filters": [{ + "property_name": "sensitivity_level", # This is the display name of the property + "property_value": "sensitive", # This is the value of the property + "operator": "eq" + }] + } + }], + include_attributes=["name", "qualified_name", "type_name", "description", "certificate_status"] + ) + + # Example 2: Equality with case insensitive matching + assets = search_assets_tool( + custom_metadata_conditions=[{ + "custom_metadata_filter": { + "display_name": "Data Classification", + "property_filters": [{ + "property_name": "sensitivity_level", + "property_value": "SENSITIVE", + "operator": "eq", + "case_insensitive": True + }] + } + }], + include_attributes=["name", "qualified_name", "type_name", "description", "certificate_status"] + ) + + # Example 3: Starts with operator with case insensitive matching + assets = search_assets_tool( + custom_metadata_conditions=[{ + "custom_metadata_filter": { + "display_name": "Business Ownership", + "property_filters": [{ + "property_name": "business_owner", + "property_value": "john", + "operator": "startswith", + "case_insensitive": True + }] + } + }], + include_attributes=["name", "qualified_name", "type_name", "description", "certificate_status"] + ) + + # Example 4: Has any value operator (has_any_value) - check if field is populated + assets = search_assets_tool( + custom_metadata_conditions=[{ + "custom_metadata_filter": { + "display_name": "Business Ownership", + "property_filters": [{ + "property_name": "business_owner", + "operator": "has_any_value" + }] + } + }], + include_attributes=["name", "qualified_name", "type_name", "description", "certificate_status"] + ) + """ + try: + return get_custom_metadata_context() + except Exception as e: + return {"error": f"Error getting custom metadata context: {str(e)}"} + + def main(): mcp.run() diff --git a/modelcontextprotocol/settings.py b/modelcontextprotocol/settings.py index a1c4ab6..e54d4a0 100644 --- a/modelcontextprotocol/settings.py +++ b/modelcontextprotocol/settings.py @@ -1,6 +1,11 @@ """Configuration settings for the application.""" +import requests +from typing import Any, Dict, Optional +from urllib.parse import urlencode + from pydantic_settings import BaseSettings + from version import __version__ as MCP_VERSION @@ -12,6 +17,7 @@ class Settings(BaseSettings): ATLAN_AGENT_ID: str = "NA" ATLAN_AGENT: str = "atlan-mcp" ATLAN_MCP_USER_AGENT: str = f"Atlan MCP Server {MCP_VERSION}" + ATLAN_TYPEDEF_API_ENDPOINT: Optional[str] = "/api/meta/types/typedefs/" @property def headers(self) -> dict: @@ -23,6 +29,70 @@ def headers(self) -> dict: "X-Atlan-Client-Origin": self.ATLAN_AGENT, } + @staticmethod + def build_api_url(path: str, query_params: Optional[Dict[str, Any]] = None) -> str: + current_settings = Settings() + if not current_settings: + raise ValueError( + "Atlan API URL (ATLAN_API_URL) is not configured in settings." + ) + + base_url = current_settings.ATLAN_BASE_URL.rstrip("/") + + if ( + path + and not path.startswith("/") + and not base_url.endswith("/") + and not path.startswith(("http://", "https://")) + ): + full_path = f"{base_url}/{path.lstrip('/')}" + elif path.startswith(("http://", "https://")): + full_path = path + else: + full_path = f"{base_url}{path}" + + if query_params: + active_query_params = { + k: v for k, v in query_params.items() if v is not None + } + if active_query_params: + query_string = urlencode(active_query_params) + return f"{full_path}?{query_string}" + return full_path + + @staticmethod + def get_atlan_typedef_api_endpoint(param: str) -> str: + current_settings = Settings() + if not current_settings.ATLAN_TYPEDEF_API_ENDPOINT: + raise ValueError( + "Default API endpoint for typedefs (api_endpoint) is not configured in settings." + ) + + return Settings.build_api_url( + path=current_settings.ATLAN_TYPEDEF_API_ENDPOINT, + query_params={"type": param}, + ) + + @staticmethod + def make_request(url: str) -> Optional[Dict[str, Any]]: + current_settings = Settings() + headers = { + "Authorization": f"Bearer {current_settings.ATLAN_API_KEY}", + "x-atlan-client-origin": "atlan-search-app", + } + try: + response = requests.get( + url, + headers=headers, + ) + if response.status_code != 200: + raise Exception( + f"Failed to make request to {url}: {response.status_code} {response.text}" + ) + return response.json() + except Exception as e: + raise Exception(f"Failed to make request to {url}: {e}") + class Config: env_file = ".env" env_file_encoding = "utf-8" diff --git a/modelcontextprotocol/tools/__init__.py b/modelcontextprotocol/tools/__init__.py index 5e057c5..81dbf5a 100644 --- a/modelcontextprotocol/tools/__init__.py +++ b/modelcontextprotocol/tools/__init__.py @@ -2,6 +2,7 @@ from .dsl import get_assets_by_dsl from .lineage import traverse_lineage from .assets import update_assets +from .custom_metadata_context import get_custom_metadata_context from .glossary import ( create_glossary_category_assets, create_glossary_assets, @@ -21,6 +22,7 @@ "get_assets_by_dsl", "traverse_lineage", "update_assets", + "get_custom_metadata_context", "create_glossary_category_assets", "create_glossary_assets", "create_glossary_term_assets", diff --git a/modelcontextprotocol/tools/custom_metadata_context.py b/modelcontextprotocol/tools/custom_metadata_context.py new file mode 100644 index 0000000..16eff3e --- /dev/null +++ b/modelcontextprotocol/tools/custom_metadata_context.py @@ -0,0 +1,173 @@ +import logging +from typing import Any, Dict, List, Optional +from settings import Settings + +logger = logging.getLogger(__name__) + + +def process_business_metadata( + bm_def: Dict[str, Any], +) -> Dict[str, Any]: + """ + Generates context prompt for a given Atlan business metadata definition. + + Args: + bm_def: A dictionary representing the business metadata definition. + Expected keys: 'displayName', 'description', 'attributeDefs'. + + Returns: + A list containing a single string: the formatted semantic search prompt, + and a list containing the metadata dictionary. + """ + bm_def_name_for_prompt = bm_def.get("name", "N/A") + bm_def_display_name = bm_def.get("displayName", "N/A") + description_for_prompt = bm_def.get("description", "No description available.") + + attribute_defs = bm_def.get("attributeDefs", []) + guid = bm_def.get("guid") + + # For prompt: comma separated attribute names and descriptions + attributes_list_for_prompt: List[str] = [] + if attribute_defs: + for attr in attribute_defs: + attr_name = attr.get("displayName", attr.get("name", "Unnamed attribute")) + attr_desc = attr.get( + "description", "No description" + ) # As per schema: names and descriptions + attributes_list_for_prompt.append(f"{str(attr_name)}:{str(attr_desc)}") + attributes_str_for_prompt = ( + ", ".join(attributes_list_for_prompt) if attributes_list_for_prompt else "None" + ) + + # For metadata: list of attribute objects + parsed_attributes_for_metadata: List[Dict[str, Any]] = [] + if attribute_defs: + for attr_def_item in attribute_defs: + base_description = attr_def_item.get("description", "") + + # Check for enum enrichment and enhance description + enum_enrichment = attr_def_item.get("enumEnrichment") + enhanced_description = base_description + if enum_enrichment and enum_enrichment.get("values"): + enum_values = enum_enrichment["values"] + if enum_values: + # Create comma-separated quoted values + quoted_values = ", ".join([f"'{value}'" for value in enum_values]) + enum_suffix = ( + f" This attribute can have enum values: {quoted_values}." + ) + enhanced_description = f"{base_description}{enum_suffix}".strip() + + attribute_metadata = { + "name": attr_def_item.get("name"), + "display_name": attr_def_item.get("displayName"), + "data_type": attr_def_item.get( + "typeName" + ), # Assuming typeName is data_type + "description": enhanced_description, + } + + # Include enum enrichment data if present + if enum_enrichment: + attribute_metadata["enumEnrichment"] = enum_enrichment + + parsed_attributes_for_metadata.append(attribute_metadata) + + metadata: Dict[str, Any] = { + "name": bm_def_name_for_prompt, + "display_name": bm_def_display_name, + "description": description_for_prompt, + "attributes": parsed_attributes_for_metadata, + } + + prompt = f"""{bm_def_display_name}|{description_for_prompt}|{attributes_str_for_prompt}""" + + return {"prompt": prompt, "metadata": metadata, "id": guid} + + +def get_custom_metadata_context() -> Dict[str, Any]: + display_name: str = "Business Metadata" + business_metadata_results: List[Dict[str, Any]] = [] + + try: + # Fetch enum definitions for enrichment + enum_endpoint: str = Settings.get_atlan_typedef_api_endpoint(param="ENUM") + enum_response: Optional[Dict[str, Any]] = Settings.make_request(enum_endpoint) + enum_lookup: Dict[str, Dict[str, Any]] = {} + if enum_response: + enum_defs = enum_response.get("enumDefs", []) + for enum_def in enum_defs: + enum_name = enum_def.get("name", "") + if enum_name: + enum_lookup[enum_name] = { + "guid": enum_def.get("guid", ""), + "description": enum_def.get("description", ""), + "values": [ + element.get("value", "") + for element in enum_def.get("elementDefs", []) + ], + "elementDefs": enum_def.get("elementDefs", []), + "version": enum_def.get("version", 1), + "createTime": enum_def.get("createTime", 0), + "updateTime": enum_def.get("updateTime", 0), + } + + # Fetch business metadata definitions + business_metadata_endpoint: str = Settings.get_atlan_typedef_api_endpoint( + param="BUSINESS_METADATA" + ) + business_metadata_response: Optional[Dict[str, Any]] = Settings.make_request( + business_metadata_endpoint + ) + if business_metadata_response is None: + logger.error( + f"Service: Failed to make request to {business_metadata_endpoint} for {display_name}. No data returned." + ) + return [] + + business_metadata_defs: List[Dict[str, Any]] = business_metadata_response.get( + "businessMetadataDefs", [] + ) + + # Enrich business metadata with enum information before processing + for business_metadata_def in business_metadata_defs: + # Enrich each business metadata definition with enum data + attribute_defs = business_metadata_def.get("attributeDefs", []) + for attribute in attribute_defs: + options = attribute.get("options", {}) + is_enum = options.get("isEnum") == "true" + + if is_enum: + enum_type = options.get("enumType", "") + if enum_type and enum_type in enum_lookup: + enum_def = enum_lookup[enum_type] + attribute["enumEnrichment"] = { + "status": "ENRICHED", + "enumType": enum_type, + "enumGuid": enum_def["guid"], + "enumDescription": enum_def["description"], + "enumVersion": enum_def["version"], + "values": enum_def["values"], + "elementDefs": enum_def["elementDefs"], + "enrichedTimestamp": None, + } + + # Process the enriched business metadata + business_metadata_results.append( + process_business_metadata(business_metadata_def) + ) + + except Exception as e: + logger.error( + f"Service: Error fetching or processing {display_name}: {e}", + exc_info=True, + ) + return [] + + logger.info( + f"Fetched {len(business_metadata_results)} {display_name} definitions with enum enrichment." + ) + return { + "context": "This is the list of business metadata definitions used in the data catalog to add more information to an asset", + "business_metadata_results": business_metadata_results, + } diff --git a/modelcontextprotocol/tools/search.py b/modelcontextprotocol/tools/search.py index 3a1c399..b679b6c 100644 --- a/modelcontextprotocol/tools/search.py +++ b/modelcontextprotocol/tools/search.py @@ -14,6 +14,7 @@ def search_assets( conditions: Optional[Union[Dict[str, Any], str]] = None, + custom_metadata_conditions: Optional[List[Dict[str, Any]]] = None, negative_conditions: Optional[Dict[str, Any]] = None, some_conditions: Optional[Dict[str, Any]] = None, min_somes: int = 1, @@ -40,6 +41,8 @@ def search_assets( Args: conditions (Dict[str, Any], optional): Dictionary of attribute conditions to match. Format: {"attribute_name": value} or {"attribute_name": {"operator": operator, "value": value}} + custom_metadata_conditions (List[Dict[str, Any]], optional): List of custom metadata conditions to match. + Format: [{"custom_metadata": value}] or [{"custom_metadata": {"operator": operator, "value": value}}] negative_conditions (Dict[str, Any], optional): Dictionary of attribute conditions to exclude. Format: {"attribute_name": value} or {"attribute_name": {"operator": operator, "value": value}} some_conditions (Dict[str, Any], optional): Conditions for where_some() queries that require min_somes of them to match. @@ -187,6 +190,19 @@ def search_assets( ) search = search.min_somes(min_somes) + if custom_metadata_conditions: + logger.debug( + f"Applying custom metadata conditions: {custom_metadata_conditions}" + ) + for custom_metadata_filter_object in custom_metadata_conditions: + if isinstance(custom_metadata_filter_object, dict): + _, condition = next(iter(custom_metadata_filter_object.items())) + else: + condition = custom_metadata_filter_object + search = SearchUtils._process_custom_metadata_condition( + search, condition, "where" + ) + # Apply date range filters if date_range: logger.debug(f"Applying date range filters: {date_range}") diff --git a/modelcontextprotocol/utils/search.py b/modelcontextprotocol/utils/search.py index b69377d..b30d028 100644 --- a/modelcontextprotocol/utils/search.py +++ b/modelcontextprotocol/utils/search.py @@ -1,11 +1,28 @@ -from typing import Dict, Any import logging +from client import get_atlan_client +from typing import Dict, Any from pyatlan.model.assets import Asset +from pyatlan.model.fields.atlan_fields import CustomMetadataField +from pyatlan.model.fluent_search import FluentSearch logger = logging.getLogger(__name__) class SearchUtils: + + CUSTOM_METADATAFIELD_OPERATOR_MAP = { + "eq": lambda custom_metadata_field_class, value, ci: custom_metadata_field_class.eq(value, case_insensitive=ci), + "startswith": lambda custom_metadata_field_class, value, ci: custom_metadata_field_class.startswith(value, case_insensitive=ci), + "lt": lambda custom_metadata_field_class, value: custom_metadata_field_class.lt(value), + "lte": lambda custom_metadata_field_class, value: custom_metadata_field_class.lte(value), + "gt": lambda custom_metadata_field_class, value: custom_metadata_field_class.gt(value), + "gte": lambda custom_metadata_field_class, value: custom_metadata_field_class.gte(value), + "match": lambda custom_metadata_field_class, value: custom_metadata_field_class.match(value), + "has_any_value": lambda attr: attr.has_any_value(), + } + + CUSTOM_METADATAFIELD_NO_CASE_INSENSITIVE_OPERATORS = {"lt", "lte", "gt", "gte", "match"} + @staticmethod def process_results(results: Any) -> Dict[str, Any]: """ @@ -170,3 +187,90 @@ def _process_condition( ) search = search_method(attr.eq(condition)) return search + + @staticmethod + def _process_custom_metadata_condition( + search: FluentSearch, condition: Dict[str, Any], search_method_name: str + ): + """ + Process a single custom metadata condition and apply it to the search using the specified method. + + Args: + search: The FluentSearch object + condition: Dictionary containing display_name (display name of the business metadata), property_filters (list of propert or attribute filters) + search_method_name: The search method to use ('where', 'where_not', 'where_some') + + Returns: + FluentSearch: The updated search object + """ + + # Validate required fields + required_fields = ["display_name", "property_filters"] + if not all(field in condition for field in required_fields): + logger.warning( + f"Custom metadata condition missing required fields: {required_fields}" + ) + return search + + # Get the search method + search_method = getattr(search, search_method_name) + + try: + + # Initializes the AtlanClient class from pyatlan.client.atlan by executing the get_atlan_client function from client.py + # This registers the client in the thread-local storage (TLS) + client = get_atlan_client() + + # Process each property filter + for property_filter in condition["property_filters"]: + operator = property_filter.get("operator", "eq") + property_name = property_filter.get("property_name") + property_value = property_filter.get("property_value") + case_insensitive = property_filter.get("case_insensitive", False) + + # Create the custom metadata field for this specific property + custom_metadata_field = CustomMetadataField( + client=client, set_name=condition["display_name"], attribute_name=property_name + ) + + # Custom handling for between and within operators + if operator == "between": + if isinstance(property_value, (list, tuple)) and len(property_value) == 2: + query_condition = custom_metadata_field.between(property_value[0], property_value[1]) + else: + raise ValueError( + f"Invalid value format for 'between' operator: {property_value}, expected [start, end]" + ) + elif operator == "within": + if isinstance(property_value, list): + query_condition = custom_metadata_field.within(property_value) + else: + raise ValueError( + f"Invalid value format for 'within' operator: {property_value}, expected list" + ) + elif operator in SearchUtils.CUSTOM_METADATAFIELD_OPERATOR_MAP: + # Get the operator method dynamically based on the operator from the property filter + # Supports case insensitive matching for eq and startswith operators + operator_method = SearchUtils.CUSTOM_METADATAFIELD_OPERATOR_MAP[operator] + + if operator not in SearchUtils.CUSTOM_METADATAFIELD_NO_CASE_INSENSITIVE_OPERATORS: + query_condition = operator_method(custom_metadata_field, property_value, case_insensitive) + else: + query_condition = operator_method(custom_metadata_field, property_value) + else: + # Fallback to eq if operator not found + logger.warning(f"Operator '{operator}' not found, falling back to 'eq' operator for custom metadata field") + query_condition = custom_metadata_field.eq(property_value, case_insensitive) + + + # Apply the condition to the search object + search = search_method(query_condition) + logger.debug( + f"Applied custom metadata condition: {condition['display_name']}.{condition['property_name']} {operator} {condition['property_value']}" + ) + + except Exception as e: + logger.error(f"Error processing custom metadata condition: {e}") + logger.exception("Exception details:") + + return search