From 1d8b07e2c8bb85b30e06fb549d817313ace56174 Mon Sep 17 00:00:00 2001 From: Hk669 Date: Sat, 8 Nov 2025 00:31:30 +0530 Subject: [PATCH 1/5] feat: new tool for creating domains, products --- modelcontextprotocol/tools/__init__.py | 2 + modelcontextprotocol/tools/domain.py | 294 +++++++++++++++++++++++++ modelcontextprotocol/tools/models.py | 27 +++ 3 files changed, 323 insertions(+) create mode 100644 modelcontextprotocol/tools/domain.py diff --git a/modelcontextprotocol/tools/__init__.py b/modelcontextprotocol/tools/__init__.py index f9d18f6..01ba060 100644 --- a/modelcontextprotocol/tools/__init__.py +++ b/modelcontextprotocol/tools/__init__.py @@ -8,6 +8,7 @@ create_glossary_assets, create_glossary_term_assets, ) +from .domain import create_domain_assets from .models import ( CertificateStatus, UpdatableAttribute, @@ -27,6 +28,7 @@ "create_glossary_category_assets", "create_glossary_assets", "create_glossary_term_assets", + "create_domain_assets", "CertificateStatus", "UpdatableAttribute", "UpdatableAsset", diff --git a/modelcontextprotocol/tools/domain.py b/modelcontextprotocol/tools/domain.py new file mode 100644 index 0000000..5bd8efe --- /dev/null +++ b/modelcontextprotocol/tools/domain.py @@ -0,0 +1,294 @@ +from __future__ import annotations +import logging +from typing import Dict, Any, List, Union + +from pyatlan.model.assets import DataDomain, DataProduct, Asset +from pyatlan.model.search import IndexSearchRequest +from client import get_atlan_client +from .models import ( + CertificateStatus, + DataDomainSpec, + DataSubDomainSpec, + DataProductSpec, +) + +logger = logging.getLogger(__name__) + + +def save_assets(assets: List[Asset]) -> List[Dict[str, Any]]: + """ + Common bulk save and response processing for any asset type. + + Args: + assets (List[Asset]): List of Asset objects to save. + + Returns: + List[Dict[str, Any]]: List of dictionaries with details for each created asset. + + Raises: + Exception: If there's an error saving the assets. + """ + logger.info("Starting bulk save operation") + client = get_atlan_client() + try: + response = client.asset.save(assets) + except Exception as e: + logger.error(f"Error saving assets: {e}") + raise e + results: List[Dict[str, Any]] = [] + created_assets = response.mutated_entities.CREATE + + logger.info(f"Save operation completed, processing {len(created_assets)} results") + + results = [ + { + "guid": created_asset.guid, + "name": created_asset.name, + "qualified_name": created_asset.qualified_name, + } + for created_asset in created_assets + ] + + logger.info(f"Bulk save completed successfully for {len(results)} assets") + return results + + +def create_data_domain_assets( + domains: Union[Dict[str, Any], List[Dict[str, Any]]], +) -> List[Dict[str, Any]]: + """ + Create one or multiple Data Domain assets in Atlan. + + Args: + domains (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single domain + specification (dict) or a list of domain specifications. Each specification + can be a dictionary containing: + - name (str): Name of the domain (required) + - user_description (str, optional): Detailed description of the domain + - certificate_status (str, optional): Certification status + ("VERIFIED", "DRAFT", or "DEPRECATED") + + Returns: + List[Dict[str, Any]]: List of dictionaries, each with details for a created domain: + - guid: The GUID of the created domain + - name: The name of the domain + - qualified_name: The qualified name of the created domain + + Raises: + Exception: If there's an error creating the domain assets. + """ + data = domains if isinstance(domains, list) else [domains] + logger.info(f"Creating {len(data)} data domain asset(s)") + logger.debug(f"Domain specifications: {data}") + + specs = [DataDomainSpec(**item) for item in data] + + assets: List[DataDomain] = [] + for spec in specs: + logger.debug(f"Creating DataDomain for: {spec.name}") + domain = DataDomain.creator(name=spec.name) + domain.user_description = spec.user_description + + if spec.certificate_status is not None: + cs = ( + CertificateStatus(spec.certificate_status) + if isinstance(spec.certificate_status, str) + else spec.certificate_status + ) + domain.certificate_status = cs.value + logger.debug(f"Set certificate status for {spec.name}: {cs.value}") + + assets.append(domain) + + return save_assets(assets) + + +def create_data_subdomain_assets( + subdomains: Union[Dict[str, Any], List[Dict[str, Any]]], +) -> List[Dict[str, Any]]: + """ + Create one or multiple Sub Domain (Data Domain with parent) assets in Atlan. + + Args: + subdomains (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single subdomain + specification (dict) or a list of subdomain specifications. Each specification + can be a dictionary containing: + - name (str): Name of the subdomain (required) + - parent_domain_qualified_name (str): Qualified name of the parent domain (required) + - user_description (str, optional): Detailed description of the subdomain + - certificate_status (str, optional): Certification status + ("VERIFIED", "DRAFT", or "DEPRECATED") + + Returns: + List[Dict[str, Any]]: List of dictionaries, each with details for a created subdomain: + - guid: The GUID of the created subdomain + - name: The name of the subdomain + - qualified_name: The qualified name of the created subdomain + + Raises: + Exception: If there's an error creating the subdomain assets. + """ + data = subdomains if isinstance(subdomains, list) else [subdomains] + logger.info(f"Creating {len(data)} data subdomain asset(s)") + logger.debug(f"Subdomain specifications: {data}") + + specs = [DataSubDomainSpec(**item) for item in data] + + assets: List[DataDomain] = [] + for spec in specs: + logger.debug( + f"Creating DataDomain (subdomain) for: {spec.name} under {spec.parent_domain_qualified_name}" + ) + subdomain = DataDomain.creator( + name=spec.name, + parent_domain_qualified_name=spec.parent_domain_qualified_name, + ) + subdomain.user_description = spec.user_description + + if spec.certificate_status is not None: + cs = ( + CertificateStatus(spec.certificate_status) + if isinstance(spec.certificate_status, str) + else spec.certificate_status + ) + subdomain.certificate_status = cs.value + logger.debug(f"Set certificate status for {spec.name}: {cs.value}") + + assets.append(subdomain) + + return save_assets(assets) + + +def create_data_product_assets( + products: Union[Dict[str, Any], List[Dict[str, Any]]], +) -> List[Dict[str, Any]]: + """ + Create one or multiple Data Product assets in Atlan. + + Args: + products (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single product + specification (dict) or a list of product specifications. Each specification + can be a dictionary containing: + - name (str): Name of the product (required) + - domain_qualified_name (str): Qualified name of the domain this product belongs to (required) + - user_description (str, optional): Detailed description of the product + - certificate_status (str, optional): Certification status + ("VERIFIED", "DRAFT", or "DEPRECATED") + - asset_selection (dict, optional): Asset selection query as a dictionary. + This should be a FluentSearch request dictionary that defines which assets + to link to the product. + + Returns: + List[Dict[str, Any]]: List of dictionaries, each with details for a created product: + - guid: The GUID of the created product + - name: The name of the product + - qualified_name: The qualified name of the created product + + Raises: + Exception: If there's an error creating the product assets. + """ + data = products if isinstance(products, list) else [products] + logger.info(f"Creating {len(data)} data product asset(s)") + logger.debug(f"Product specifications: {data}") + + specs = [DataProductSpec(**item) for item in data] + + assets: List[DataProduct] = [] + for spec in specs: + logger.debug( + f"Creating DataProduct for: {spec.name} under {spec.domain_qualified_name}" + ) + + # Handle asset selection if provided + asset_selection = None + if spec.asset_selection is not None: + try: + # Convert dict to IndexSearchRequest if needed + asset_selection = IndexSearchRequest(**spec.asset_selection) + logger.debug(f"Set asset selection for {spec.name}") + except Exception as e: + logger.warning( + f"Invalid asset_selection format for {spec.name}: {e}. Creating product without asset selection." + ) + + product = DataProduct.creator( + name=spec.name, + domain_qualified_name=spec.domain_qualified_name, + asset_selection=asset_selection, + ) + product.user_description = spec.user_description + + if spec.certificate_status is not None: + cs = ( + CertificateStatus(spec.certificate_status) + if isinstance(spec.certificate_status, str) + else spec.certificate_status + ) + product.certificate_status = cs.value + logger.debug(f"Set certificate status for {spec.name}: {cs.value}") + + assets.append(product) + + return save_assets(assets) + + +def create_domain_assets( + items: Union[Dict[str, Any], List[Dict[str, Any]]], +) -> List[Dict[str, Any]]: + """ + Create Data Domains, Sub Domains, or Data Products based on the specification. + + This is a unified function that determines the type based on the presence of + specific fields in the specification: + - If 'parent_domain_qualified_name' is present -> Sub Domain + - If 'domain_qualified_name' is present (and no parent) -> Data Product + - Otherwise -> Data Domain + + Args: + items (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single item + specification (dict) or a list of item specifications. Each specification + should contain fields appropriate for the type being created. + + Returns: + List[Dict[str, Any]]: List of dictionaries, each with details for a created asset: + - guid: The GUID of the created asset + - name: The name of the asset + - qualified_name: The qualified name of the created asset + + Raises: + Exception: If there's an error creating the assets. + """ + data = items if isinstance(items, list) else [items] + logger.info(f"Creating {len(data)} domain-related asset(s)") + + # Separate items by type + domains = [] + subdomains = [] + products = [] + + for item in data: + if "parent_domain_qualified_name" in item: + subdomains.append(item) + elif "domain_qualified_name" in item: + products.append(item) + else: + domains.append(item) + + results = [] + + # Create domains + if domains: + logger.info(f"Creating {len(domains)} data domain(s)") + results.extend(create_data_domain_assets(domains)) + + # Create subdomains + if subdomains: + logger.info(f"Creating {len(subdomains)} data subdomain(s)") + results.extend(create_data_subdomain_assets(subdomains)) + + # Create products + if products: + logger.info(f"Creating {len(products)} data product(s)") + results.extend(create_data_product_assets(products)) + + return results diff --git a/modelcontextprotocol/tools/models.py b/modelcontextprotocol/tools/models.py index 814de7f..ec63314 100644 --- a/modelcontextprotocol/tools/models.py +++ b/modelcontextprotocol/tools/models.py @@ -74,3 +74,30 @@ class GlossaryTerm(BaseModel): user_description: Optional[str] = None certificate_status: Optional[CertificateStatus] = None category_guids: Optional[List[str]] = None + + +class DataDomainSpec(BaseModel): + """Payload model for creating a Data Domain asset.""" + + name: str + user_description: Optional[str] = None + certificate_status: Optional[CertificateStatus] = None + + +class DataSubDomainSpec(BaseModel): + """Payload model for creating a Sub Domain (Data Domain with parent) asset.""" + + name: str + parent_domain_qualified_name: str + user_description: Optional[str] = None + certificate_status: Optional[CertificateStatus] = None + + +class DataProductSpec(BaseModel): + """Payload model for creating a Data Product asset.""" + + name: str + domain_qualified_name: str + user_description: Optional[str] = None + certificate_status: Optional[CertificateStatus] = None + asset_selection: Optional[dict] = None From e8eee7ac44f44345fa8b498644bb82edff383c85 Mon Sep 17 00:00:00 2001 From: Hk669 Date: Sat, 8 Nov 2025 00:31:42 +0530 Subject: [PATCH 2/5] integrate into server --- modelcontextprotocol/server.py | 110 +++++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) diff --git a/modelcontextprotocol/server.py b/modelcontextprotocol/server.py index 3290859..26cbb61 100644 --- a/modelcontextprotocol/server.py +++ b/modelcontextprotocol/server.py @@ -12,6 +12,7 @@ create_glossary_category_assets, create_glossary_assets, create_glossary_term_assets, + create_domain_assets, UpdatableAttribute, CertificateStatus, UpdatableAsset, @@ -905,6 +906,115 @@ def create_glossary_categories(categories) -> List[Dict[str, Any]]: return create_glossary_category_assets(categories) +@mcp.tool() +def create_domains(items) -> List[Dict[str, Any]]: + """ + Create Data Domains, Sub Domains, or Data Products in Atlan. + + This unified tool automatically determines the type based on the presence of + specific fields in the specification: + - If 'parent_domain_qualified_name' is present -> Sub Domain + - If 'domain_qualified_name' is present (and no parent) -> Data Product + - Otherwise -> Data Domain + + IMPORTANT BUSINESS RULES & CONSTRAINTS: + - Before creating a domain/subdomain/product, you may want to search for existing + domains to avoid duplicates or to get the qualified_name for parent relationships + - Domain names must be unique at the top level + - Subdomain names must be unique within the same parent domain + - Product names must be unique within the same domain + + Args: + items (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single item + specification (dict) or a list of item specifications. + + For Data Domain: + - name (str): Name of the domain (required) + - user_description (str, optional): Detailed description + - certificate_status (str, optional): "VERIFIED", "DRAFT", or "DEPRECATED" + + For Sub Domain: + - name (str): Name of the subdomain (required) + - parent_domain_qualified_name (str): Qualified name of parent domain (required) + - user_description (str, optional): Detailed description + - certificate_status (str, optional): "VERIFIED", "DRAFT", or "DEPRECATED" + + For Data Product: + - name (str): Name of the product (required) + - domain_qualified_name (str): Qualified name of the domain (required) + - user_description (str, optional): Detailed description + - certificate_status (str, optional): "VERIFIED", "DRAFT", or "DEPRECATED" + - asset_selection (dict, optional): Asset selection query dictionary. + This should be a FluentSearch request that defines which assets to link. + Example structure: {"where": {...}, "select": [...]} + + Returns: + List[Dict[str, Any]]: List of dictionaries, each with details for a created asset: + - guid: The GUID of the created asset + - name: The name of the asset + - qualified_name: The qualified name of the created asset + + Examples: + # Create a single Data Domain + create_domains({ + "name": "Marketing", + "user_description": "Marketing data domain", + "certificate_status": "VERIFIED" + }) + + # Create a Sub Domain under an existing domain + create_domains({ + "name": "Social Marketing", + "parent_domain_qualified_name": "default/domain/marketing", + "user_description": "Social media marketing subdomain", + "certificate_status": "DRAFT" + }) + + # Create a Data Product with asset selection + create_domains({ + "name": "Marketing Influence", + "domain_qualified_name": "default/domain/marketing", + "user_description": "Product for marketing influence analysis", + "asset_selection": { + "where": { + "bool": { + "must": [ + {"term": {"__typeName.keyword": "Table"}}, + {"term": {"certificateStatus": "VERIFIED"}}, + {"term": {"__atlanTags": "Marketing"}} + ] + } + } + } + }) + + # Create multiple items of different types in one call + create_domains([ + { + "name": "Sales", + "user_description": "Sales data domain" + }, + { + "name": "E-commerce Sales", + "parent_domain_qualified_name": "default/domain/sales", + "user_description": "E-commerce sales subdomain" + }, + { + "name": "Sales Analytics", + "domain_qualified_name": "default/domain/sales", + "user_description": "Sales analytics product" + } + ]) + """ + # Parse parameters to handle JSON strings using shared utility + try: + items = parse_json_parameter(items) + except json.JSONDecodeError as e: + return {"error": f"Invalid JSON format for items parameter: {str(e)}"} + + return create_domain_assets(items) + + def main(): """Main entry point for the Atlan MCP Server.""" From e3eb02fb3d3187915886278097404a0d2fa0f177 Mon Sep 17 00:00:00 2001 From: Hk669 Date: Fri, 28 Nov 2025 00:10:48 +0530 Subject: [PATCH 3/5] fix: get domains and data products working --- modelcontextprotocol/server.py | 23 +-- modelcontextprotocol/tools/domain.py | 201 ++++++------------------- modelcontextprotocol/tools/glossary.py | 75 ++------- modelcontextprotocol/tools/models.py | 16 +- modelcontextprotocol/utils/__init__.py | 2 + modelcontextprotocol/utils/assets.py | 52 +++++++ 6 files changed, 128 insertions(+), 241 deletions(-) create mode 100644 modelcontextprotocol/utils/assets.py diff --git a/modelcontextprotocol/server.py b/modelcontextprotocol/server.py index 26cbb61..df344f0 100644 --- a/modelcontextprotocol/server.py +++ b/modelcontextprotocol/server.py @@ -942,11 +942,10 @@ def create_domains(items) -> List[Dict[str, Any]]: For Data Product: - name (str): Name of the product (required) - domain_qualified_name (str): Qualified name of the domain (required) + - asset_guids (List[str]): List of asset GUIDs to link to this product (required). + At least one asset GUID must be provided. Use search_assets_tool to find asset GUIDs. - user_description (str, optional): Detailed description - certificate_status (str, optional): "VERIFIED", "DRAFT", or "DEPRECATED" - - asset_selection (dict, optional): Asset selection query dictionary. - This should be a FluentSearch request that defines which assets to link. - Example structure: {"where": {...}, "select": [...]} Returns: List[Dict[str, Any]]: List of dictionaries, each with details for a created asset: @@ -970,22 +969,13 @@ def create_domains(items) -> List[Dict[str, Any]]: "certificate_status": "DRAFT" }) - # Create a Data Product with asset selection + # Create a Data Product with linked assets (asset_guids required) + # First, search for assets to get their GUIDs using search_assets_tool create_domains({ "name": "Marketing Influence", "domain_qualified_name": "default/domain/marketing", "user_description": "Product for marketing influence analysis", - "asset_selection": { - "where": { - "bool": { - "must": [ - {"term": {"__typeName.keyword": "Table"}}, - {"term": {"certificateStatus": "VERIFIED"}}, - {"term": {"__atlanTags": "Marketing"}} - ] - } - } - } + "asset_guids": ["asset-guid-1", "asset-guid-2"] # GUIDs from search_assets_tool }) # Create multiple items of different types in one call @@ -1002,7 +992,8 @@ def create_domains(items) -> List[Dict[str, Any]]: { "name": "Sales Analytics", "domain_qualified_name": "default/domain/sales", - "user_description": "Sales analytics product" + "user_description": "Sales analytics product", + "asset_guids": ["table-guid-1", "table-guid-2"] # Required for data products } ]) """ diff --git a/modelcontextprotocol/tools/domain.py b/modelcontextprotocol/tools/domain.py index 5bd8efe..a6e3423 100644 --- a/modelcontextprotocol/tools/domain.py +++ b/modelcontextprotocol/tools/domain.py @@ -2,68 +2,28 @@ import logging from typing import Dict, Any, List, Union -from pyatlan.model.assets import DataDomain, DataProduct, Asset -from pyatlan.model.search import IndexSearchRequest -from client import get_atlan_client -from .models import ( - CertificateStatus, - DataDomainSpec, - DataSubDomainSpec, - DataProductSpec, -) +from pyatlan.model.assets import Asset, DataDomain, DataProduct +from pyatlan.model.fluent_search import CompoundQuery, FluentSearch -logger = logging.getLogger(__name__) - - -def save_assets(assets: List[Asset]) -> List[Dict[str, Any]]: - """ - Common bulk save and response processing for any asset type. - - Args: - assets (List[Asset]): List of Asset objects to save. +from utils import save_assets +from .models import DataDomainSpec, DataProductSpec - Returns: - List[Dict[str, Any]]: List of dictionaries with details for each created asset. - - Raises: - Exception: If there's an error saving the assets. - """ - logger.info("Starting bulk save operation") - client = get_atlan_client() - try: - response = client.asset.save(assets) - except Exception as e: - logger.error(f"Error saving assets: {e}") - raise e - results: List[Dict[str, Any]] = [] - created_assets = response.mutated_entities.CREATE - - logger.info(f"Save operation completed, processing {len(created_assets)} results") - - results = [ - { - "guid": created_asset.guid, - "name": created_asset.name, - "qualified_name": created_asset.qualified_name, - } - for created_asset in created_assets - ] - - logger.info(f"Bulk save completed successfully for {len(results)} assets") - return results +logger = logging.getLogger(__name__) def create_data_domain_assets( domains: Union[Dict[str, Any], List[Dict[str, Any]]], ) -> List[Dict[str, Any]]: """ - Create one or multiple Data Domain assets in Atlan. + Create one or multiple Data Domain or Sub Domain assets in Atlan. Args: domains (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single domain specification (dict) or a list of domain specifications. Each specification can be a dictionary containing: - name (str): Name of the domain (required) + - parent_domain_qualified_name (str, optional): Qualified name of the parent + domain. If provided, creates a Sub Domain under that parent. - user_description (str, optional): Detailed description of the domain - certificate_status (str, optional): Certification status ("VERIFIED", "DRAFT", or "DEPRECATED") @@ -85,76 +45,27 @@ def create_data_domain_assets( assets: List[DataDomain] = [] for spec in specs: - logger.debug(f"Creating DataDomain for: {spec.name}") - domain = DataDomain.creator(name=spec.name) - domain.user_description = spec.user_description - - if spec.certificate_status is not None: - cs = ( - CertificateStatus(spec.certificate_status) - if isinstance(spec.certificate_status, str) - else spec.certificate_status + if spec.parent_domain_qualified_name: + logger.debug( + f"Creating Sub Domain: {spec.name} under {spec.parent_domain_qualified_name}" ) - domain.certificate_status = cs.value - logger.debug(f"Set certificate status for {spec.name}: {cs.value}") - - assets.append(domain) - - return save_assets(assets) - - -def create_data_subdomain_assets( - subdomains: Union[Dict[str, Any], List[Dict[str, Any]]], -) -> List[Dict[str, Any]]: - """ - Create one or multiple Sub Domain (Data Domain with parent) assets in Atlan. - - Args: - subdomains (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single subdomain - specification (dict) or a list of subdomain specifications. Each specification - can be a dictionary containing: - - name (str): Name of the subdomain (required) - - parent_domain_qualified_name (str): Qualified name of the parent domain (required) - - user_description (str, optional): Detailed description of the subdomain - - certificate_status (str, optional): Certification status - ("VERIFIED", "DRAFT", or "DEPRECATED") - - Returns: - List[Dict[str, Any]]: List of dictionaries, each with details for a created subdomain: - - guid: The GUID of the created subdomain - - name: The name of the subdomain - - qualified_name: The qualified name of the created subdomain - - Raises: - Exception: If there's an error creating the subdomain assets. - """ - data = subdomains if isinstance(subdomains, list) else [subdomains] - logger.info(f"Creating {len(data)} data subdomain asset(s)") - logger.debug(f"Subdomain specifications: {data}") - - specs = [DataSubDomainSpec(**item) for item in data] + domain = DataDomain.creator( + name=spec.name, + parent_domain_qualified_name=spec.parent_domain_qualified_name, + ) + else: + logger.debug(f"Creating DataDomain: {spec.name}") + domain = DataDomain.creator(name=spec.name) - assets: List[DataDomain] = [] - for spec in specs: - logger.debug( - f"Creating DataDomain (subdomain) for: {spec.name} under {spec.parent_domain_qualified_name}" - ) - subdomain = DataDomain.creator( - name=spec.name, - parent_domain_qualified_name=spec.parent_domain_qualified_name, - ) - subdomain.user_description = spec.user_description + domain.user_description = spec.user_description - if spec.certificate_status is not None: - cs = ( - CertificateStatus(spec.certificate_status) - if isinstance(spec.certificate_status, str) - else spec.certificate_status + if spec.certificate_status: + domain.certificate_status = spec.certificate_status.value + logger.debug( + f"Set certificate status for {spec.name}: {spec.certificate_status.value}" ) - subdomain.certificate_status = cs.value - logger.debug(f"Set certificate status for {spec.name}: {cs.value}") - assets.append(subdomain) + assets.append(domain) return save_assets(assets) @@ -171,12 +82,10 @@ def create_data_product_assets( can be a dictionary containing: - name (str): Name of the product (required) - domain_qualified_name (str): Qualified name of the domain this product belongs to (required) + - asset_guids (List[str]): List of asset GUIDs to link to this product (required, at least one) - user_description (str, optional): Detailed description of the product - certificate_status (str, optional): Certification status ("VERIFIED", "DRAFT", or "DEPRECATED") - - asset_selection (dict, optional): Asset selection query as a dictionary. - This should be a FluentSearch request dictionary that defines which assets - to link to the product. Returns: List[Dict[str, Any]]: List of dictionaries, each with details for a created product: @@ -186,6 +95,7 @@ def create_data_product_assets( Raises: Exception: If there's an error creating the product assets. + ValueError: If no asset_guids are provided. """ data = products if isinstance(products, list) else [products] logger.info(f"Creating {len(data)} data product asset(s)") @@ -195,21 +105,24 @@ def create_data_product_assets( assets: List[DataProduct] = [] for spec in specs: + # Validate that asset_guids is provided and not empty + if not spec.asset_guids: + raise ValueError( + f"Data product '{spec.name}' requires at least one asset GUID. " + "Please provide asset_guids to link assets to this product." + ) + logger.debug( - f"Creating DataProduct for: {spec.name} under {spec.domain_qualified_name}" + f"Creating DataProduct: {spec.name} under {spec.domain_qualified_name}" ) + logger.debug(f"Linking {len(spec.asset_guids)} asset(s) to product") - # Handle asset selection if provided - asset_selection = None - if spec.asset_selection is not None: - try: - # Convert dict to IndexSearchRequest if needed - asset_selection = IndexSearchRequest(**spec.asset_selection) - logger.debug(f"Set asset selection for {spec.name}") - except Exception as e: - logger.warning( - f"Invalid asset_selection format for {spec.name}: {e}. Creating product without asset selection." - ) + # Build FluentSearch to select assets by their GUIDs + asset_selection = ( + FluentSearch() + .where(CompoundQuery.active_assets()) + .where(Asset.GUID.within(spec.asset_guids)) + ).to_request() product = DataProduct.creator( name=spec.name, @@ -218,14 +131,11 @@ def create_data_product_assets( ) product.user_description = spec.user_description - if spec.certificate_status is not None: - cs = ( - CertificateStatus(spec.certificate_status) - if isinstance(spec.certificate_status, str) - else spec.certificate_status + if spec.certificate_status: + product.certificate_status = spec.certificate_status.value + logger.debug( + f"Set certificate status for {spec.name}: {spec.certificate_status.value}" ) - product.certificate_status = cs.value - logger.debug(f"Set certificate status for {spec.name}: {cs.value}") assets.append(product) @@ -240,9 +150,8 @@ def create_domain_assets( This is a unified function that determines the type based on the presence of specific fields in the specification: - - If 'parent_domain_qualified_name' is present -> Sub Domain - - If 'domain_qualified_name' is present (and no parent) -> Data Product - - Otherwise -> Data Domain + - If 'domain_qualified_name' is present -> Data Product + - Otherwise -> Data Domain (or Sub Domain if 'parent_domain_qualified_name' is present) Args: items (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single item @@ -261,32 +170,22 @@ def create_domain_assets( data = items if isinstance(items, list) else [items] logger.info(f"Creating {len(data)} domain-related asset(s)") - # Separate items by type + # Separate items by type: products vs domains (including subdomains) domains = [] - subdomains = [] products = [] for item in data: - if "parent_domain_qualified_name" in item: - subdomains.append(item) - elif "domain_qualified_name" in item: + if "domain_qualified_name" in item: products.append(item) else: domains.append(item) results = [] - # Create domains if domains: - logger.info(f"Creating {len(domains)} data domain(s)") + logger.info(f"Creating {len(domains)} data domain/subdomain(s)") results.extend(create_data_domain_assets(domains)) - # Create subdomains - if subdomains: - logger.info(f"Creating {len(subdomains)} data subdomain(s)") - results.extend(create_data_subdomain_assets(subdomains)) - - # Create products if products: logger.info(f"Creating {len(products)} data product(s)") results.extend(create_data_product_assets(products)) diff --git a/modelcontextprotocol/tools/glossary.py b/modelcontextprotocol/tools/glossary.py index ccf8d96..23a6558 100644 --- a/modelcontextprotocol/tools/glossary.py +++ b/modelcontextprotocol/tools/glossary.py @@ -6,12 +6,10 @@ AtlasGlossary, AtlasGlossaryCategory, AtlasGlossaryTerm, - Asset, ) -from utils.parameters import parse_list_parameter -from client import get_atlan_client + +from utils import parse_list_parameter, save_assets from .models import ( - CertificateStatus, Glossary, GlossaryCategory, GlossaryTerm, @@ -20,44 +18,6 @@ logger = logging.getLogger(__name__) -def save_assets(assets: List[Asset]) -> List[Dict[str, Any]]: - """ - Common bulk save and response processing for any asset type. - - Args: - assets (List[Asset]): List of Asset objects to save. - - Returns: - List[Dict[str, Any]]: List of dictionaries with details for each created asset. - - Raises: - Exception: If there's an error saving the assets. - """ - logger.info("Starting bulk save operation") - client = get_atlan_client() - try: - response = client.asset.save(assets) - except Exception as e: - logger.error(f"Error saving assets: {e}") - raise e - results: List[Dict[str, Any]] = [] - created_assets = response.mutated_entities.CREATE - - logger.info(f"Save operation completed, processing {len(created_assets)} results") - - results = [ - { - "guid": created_asset.guid, - "name": created_asset.name, - "qualified_name": created_asset.qualified_name, - } - for created_asset in created_assets - ] - - logger.info(f"Bulk save completed successfully for {len(results)} assets") - return results - - def create_glossary_assets( glossaries: Union[Dict[str, Any], List[Dict[str, Any]]], ) -> List[Dict[str, Any]]: @@ -94,14 +54,11 @@ def create_glossary_assets( logger.debug(f"Creating AtlasGlossary for: {spec.name}") glossary = AtlasGlossary.creator(name=spec.name) glossary.user_description = spec.user_description - if spec.certificate_status is not None: - cs = ( - CertificateStatus(spec.certificate_status) - if isinstance(spec.certificate_status, str) - else spec.certificate_status + if spec.certificate_status: + glossary.certificate_status = spec.certificate_status.value + logger.debug( + f"Set certificate status for {spec.name}: {spec.certificate_status.value}" ) - glossary.certificate_status = cs.value - logger.debug(f"Set certificate status for {spec.name}: {cs.value}") assets.append(glossary) return save_assets(assets) @@ -155,14 +112,11 @@ def create_glossary_category_assets( ), ) category.user_description = spec.user_description - if spec.certificate_status is not None: - cs = ( - CertificateStatus(spec.certificate_status) - if isinstance(spec.certificate_status, str) - else spec.certificate_status + if spec.certificate_status: + category.certificate_status = spec.certificate_status.value + logger.debug( + f"Set certificate status for {spec.name}: {spec.certificate_status.value}" ) - category.certificate_status = cs.value - logger.debug(f"Set certificate status for {spec.name}: {cs.value}") assets.append(category) @@ -213,13 +167,8 @@ def create_glossary_term_assets( categories=[AtlasGlossaryCategory.ref_by_guid(g) for g in guids] or None, ) term.user_description = spec.user_description - if spec.certificate_status is not None: - cs = ( - CertificateStatus(spec.certificate_status) - if isinstance(spec.certificate_status, str) - else spec.certificate_status - ) - term.certificate_status = cs.value + if spec.certificate_status: + term.certificate_status = spec.certificate_status.value assets.append(term) return save_assets(assets) diff --git a/modelcontextprotocol/tools/models.py b/modelcontextprotocol/tools/models.py index ec63314..c390609 100644 --- a/modelcontextprotocol/tools/models.py +++ b/modelcontextprotocol/tools/models.py @@ -77,18 +77,12 @@ class GlossaryTerm(BaseModel): class DataDomainSpec(BaseModel): - """Payload model for creating a Data Domain asset.""" + """Payload model for creating a Data Domain or Sub Domain asset.""" name: str - user_description: Optional[str] = None - certificate_status: Optional[CertificateStatus] = None - - -class DataSubDomainSpec(BaseModel): - """Payload model for creating a Sub Domain (Data Domain with parent) asset.""" - - name: str - parent_domain_qualified_name: str + parent_domain_qualified_name: Optional[str] = ( + None # if passed, will be created as a sub domain + ) user_description: Optional[str] = None certificate_status: Optional[CertificateStatus] = None @@ -98,6 +92,6 @@ class DataProductSpec(BaseModel): name: str domain_qualified_name: str + asset_guids: List[str] # Required: at least one asset GUID for data products user_description: Optional[str] = None certificate_status: Optional[CertificateStatus] = None - asset_selection: Optional[dict] = None diff --git a/modelcontextprotocol/utils/__init__.py b/modelcontextprotocol/utils/__init__.py index 6de6b6d..bd896d2 100644 --- a/modelcontextprotocol/utils/__init__.py +++ b/modelcontextprotocol/utils/__init__.py @@ -4,6 +4,7 @@ This package provides common utilities used across the server components. """ +from .assets import save_assets from .constants import DEFAULT_SEARCH_ATTRIBUTES from .search import SearchUtils from .parameters import ( @@ -16,4 +17,5 @@ "SearchUtils", "parse_json_parameter", "parse_list_parameter", + "save_assets", ] diff --git a/modelcontextprotocol/utils/assets.py b/modelcontextprotocol/utils/assets.py new file mode 100644 index 0000000..a4ef47d --- /dev/null +++ b/modelcontextprotocol/utils/assets.py @@ -0,0 +1,52 @@ +""" +Asset utilities for the Atlan MCP server. + +This module provides reusable functions for asset operations +that are commonly used across different MCP tools. +""" + +import logging +from typing import Any, Dict, List + +from pyatlan.model.assets import Asset + +from client import get_atlan_client + +logger = logging.getLogger(__name__) + + +def save_assets(assets: List[Asset]) -> List[Dict[str, Any]]: + """ + Common bulk save and response processing for any asset type. + + Args: + assets (List[Asset]): List of Asset objects to save. + + Returns: + List[Dict[str, Any]]: List of dictionaries with details for each created asset. + + Raises: + Exception: If there's an error saving the assets. + """ + logger.info("Starting bulk save operation") + client = get_atlan_client() + try: + response = client.asset.save(assets) + except Exception as e: + logger.error(f"Error saving assets: {e}") + raise + + created_assets = response.mutated_entities.CREATE + logger.info(f"Save operation completed, processing {len(created_assets)} results") + + results = [ + { + "guid": asset.guid, + "name": asset.name, + "qualified_name": asset.qualified_name, + } + for asset in created_assets + ] + + logger.info(f"Bulk save completed successfully for {len(results)} assets") + return results From aa221b25e6310793c9e98cf191015e8ff02a5098 Mon Sep 17 00:00:00 2001 From: Hk669 Date: Mon, 1 Dec 2025 22:52:52 +0530 Subject: [PATCH 4/5] feat: new tools --- modelcontextprotocol/server.py | 107 ++++++++++++++++++++++----------- 1 file changed, 71 insertions(+), 36 deletions(-) diff --git a/modelcontextprotocol/server.py b/modelcontextprotocol/server.py index df344f0..3b27dbf 100644 --- a/modelcontextprotocol/server.py +++ b/modelcontextprotocol/server.py @@ -12,7 +12,8 @@ create_glossary_category_assets, create_glossary_assets, create_glossary_term_assets, - create_domain_assets, + create_data_domain_assets, + create_data_product_assets, UpdatableAttribute, CertificateStatus, UpdatableAsset, @@ -907,26 +908,19 @@ def create_glossary_categories(categories) -> List[Dict[str, Any]]: @mcp.tool() -def create_domains(items) -> List[Dict[str, Any]]: +def create_domains(domains) -> List[Dict[str, Any]]: """ - Create Data Domains, Sub Domains, or Data Products in Atlan. - - This unified tool automatically determines the type based on the presence of - specific fields in the specification: - - If 'parent_domain_qualified_name' is present -> Sub Domain - - If 'domain_qualified_name' is present (and no parent) -> Data Product - - Otherwise -> Data Domain + Create Data Domains or Sub Domains in Atlan. IMPORTANT BUSINESS RULES & CONSTRAINTS: - - Before creating a domain/subdomain/product, you may want to search for existing + - Before creating a domain/subdomain, you may want to search for existing domains to avoid duplicates or to get the qualified_name for parent relationships - Domain names must be unique at the top level - Subdomain names must be unique within the same parent domain - - Product names must be unique within the same domain Args: - items (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single item - specification (dict) or a list of item specifications. + domains (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single domain + specification (dict) or a list of domain specifications. For Data Domain: - name (str): Name of the domain (required) @@ -939,14 +933,6 @@ def create_domains(items) -> List[Dict[str, Any]]: - user_description (str, optional): Detailed description - certificate_status (str, optional): "VERIFIED", "DRAFT", or "DEPRECATED" - For Data Product: - - name (str): Name of the product (required) - - domain_qualified_name (str): Qualified name of the domain (required) - - asset_guids (List[str]): List of asset GUIDs to link to this product (required). - At least one asset GUID must be provided. Use search_assets_tool to find asset GUIDs. - - user_description (str, optional): Detailed description - - certificate_status (str, optional): "VERIFIED", "DRAFT", or "DEPRECATED" - Returns: List[Dict[str, Any]]: List of dictionaries, each with details for a created asset: - guid: The GUID of the created asset @@ -969,16 +955,7 @@ def create_domains(items) -> List[Dict[str, Any]]: "certificate_status": "DRAFT" }) - # Create a Data Product with linked assets (asset_guids required) - # First, search for assets to get their GUIDs using search_assets_tool - create_domains({ - "name": "Marketing Influence", - "domain_qualified_name": "default/domain/marketing", - "user_description": "Product for marketing influence analysis", - "asset_guids": ["asset-guid-1", "asset-guid-2"] # GUIDs from search_assets_tool - }) - - # Create multiple items of different types in one call + # Create multiple domains in one call create_domains([ { "name": "Sales", @@ -988,22 +965,80 @@ def create_domains(items) -> List[Dict[str, Any]]: "name": "E-commerce Sales", "parent_domain_qualified_name": "default/domain/sales", "user_description": "E-commerce sales subdomain" - }, + } + ]) + """ + # Parse parameters to handle JSON strings using shared utility + try: + domains = parse_json_parameter(domains) + except json.JSONDecodeError as e: + return {"error": f"Invalid JSON format for domains parameter: {str(e)}"} + + return create_data_domain_assets(domains) + + +@mcp.tool() +def create_data_products(products) -> List[Dict[str, Any]]: + """ + Create Data Products in Atlan. + + IMPORTANT BUSINESS RULES & CONSTRAINTS: + - Before creating a product, you may want to search for existing domains + to get the qualified_name for the domain relationship + - Product names must be unique within the same domain + - At least one asset GUID must be provided for each product + + Args: + products (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single product + specification (dict) or a list of product specifications. + + For Data Product: + - name (str): Name of the product (required) + - domain_qualified_name (str): Qualified name of the domain (required) + - asset_guids (List[str]): List of asset GUIDs to link to this product (required). + At least one asset GUID must be provided. Use search_assets_tool to find asset GUIDs. + - user_description (str, optional): Detailed description + - certificate_status (str, optional): "VERIFIED", "DRAFT", or "DEPRECATED" + + Returns: + List[Dict[str, Any]]: List of dictionaries, each with details for a created asset: + - guid: The GUID of the created asset + - name: The name of the asset + - qualified_name: The qualified name of the created asset + + Examples: + # Create a Data Product with linked assets (asset_guids required) + # First, search for assets to get their GUIDs using search_assets_tool + create_data_products({ + "name": "Marketing Influence", + "domain_qualified_name": "default/domain/marketing", + "user_description": "Product for marketing influence analysis", + "asset_guids": ["asset-guid-1", "asset-guid-2"] # GUIDs from search_assets_tool + }) + + # Create multiple products in one call + create_data_products([ { "name": "Sales Analytics", "domain_qualified_name": "default/domain/sales", "user_description": "Sales analytics product", - "asset_guids": ["table-guid-1", "table-guid-2"] # Required for data products + "asset_guids": ["table-guid-1", "table-guid-2"] + }, + { + "name": "Customer Insights", + "domain_qualified_name": "default/domain/marketing", + "user_description": "Customer insights product", + "asset_guids": ["view-guid-1"] } ]) """ # Parse parameters to handle JSON strings using shared utility try: - items = parse_json_parameter(items) + products = parse_json_parameter(products) except json.JSONDecodeError as e: - return {"error": f"Invalid JSON format for items parameter: {str(e)}"} + return {"error": f"Invalid JSON format for products parameter: {str(e)}"} - return create_domain_assets(items) + return create_data_product_assets(products) def main(): From e0183599dcfb1a7e1a1c4f0d1a167f221227fbc1 Mon Sep 17 00:00:00 2001 From: Hk669 Date: Mon, 1 Dec 2025 22:53:13 +0530 Subject: [PATCH 5/5] fix: address comments --- modelcontextprotocol/tools/__init__.py | 5 +- modelcontextprotocol/tools/domain.py | 100 +++++++------------------ modelcontextprotocol/tools/models.py | 13 +++- modelcontextprotocol/utils/assets.py | 45 ++++++++--- 4 files changed, 73 insertions(+), 90 deletions(-) diff --git a/modelcontextprotocol/tools/__init__.py b/modelcontextprotocol/tools/__init__.py index 01ba060..82f789f 100644 --- a/modelcontextprotocol/tools/__init__.py +++ b/modelcontextprotocol/tools/__init__.py @@ -8,7 +8,7 @@ create_glossary_assets, create_glossary_term_assets, ) -from .domain import create_domain_assets +from .domain import create_data_domain_assets, create_data_product_assets from .models import ( CertificateStatus, UpdatableAttribute, @@ -28,7 +28,8 @@ "create_glossary_category_assets", "create_glossary_assets", "create_glossary_term_assets", - "create_domain_assets", + "create_data_domain_assets", + "create_data_product_assets", "CertificateStatus", "UpdatableAttribute", "UpdatableAsset", diff --git a/modelcontextprotocol/tools/domain.py b/modelcontextprotocol/tools/domain.py index a6e3423..955e298 100644 --- a/modelcontextprotocol/tools/domain.py +++ b/modelcontextprotocol/tools/domain.py @@ -1,6 +1,7 @@ from __future__ import annotations + import logging -from typing import Dict, Any, List, Union +from typing import Any, Dict, List, Union from pyatlan.model.assets import Asset, DataDomain, DataProduct from pyatlan.model.fluent_search import CompoundQuery, FluentSearch @@ -45,22 +46,24 @@ def create_data_domain_assets( assets: List[DataDomain] = [] for spec in specs: - if spec.parent_domain_qualified_name: - logger.debug( - f"Creating Sub Domain: {spec.name} under {spec.parent_domain_qualified_name}" - ) - domain = DataDomain.creator( - name=spec.name, - parent_domain_qualified_name=spec.parent_domain_qualified_name, + logger.debug( + f"Creating DataDomain: {spec.name}" + + ( + f" under {spec.parent_domain_qualified_name}" + if spec.parent_domain_qualified_name + else "" ) - else: - logger.debug(f"Creating DataDomain: {spec.name}") - domain = DataDomain.creator(name=spec.name) - + ) + domain = DataDomain.creator( + name=spec.name, + parent_domain_qualified_name=spec.parent_domain_qualified_name, + ) domain.user_description = spec.user_description + domain.certificate_status = ( + spec.certificate_status.value if spec.certificate_status else None + ) if spec.certificate_status: - domain.certificate_status = spec.certificate_status.value logger.debug( f"Set certificate status for {spec.name}: {spec.certificate_status.value}" ) @@ -81,8 +84,10 @@ def create_data_product_assets( specification (dict) or a list of product specifications. Each specification can be a dictionary containing: - name (str): Name of the product (required) - - domain_qualified_name (str): Qualified name of the domain this product belongs to (required) - - asset_guids (List[str]): List of asset GUIDs to link to this product (required, at least one) + - domain_qualified_name (str): Qualified name of the domain this product + belongs to (required) + - asset_guids (List[str]): List of asset GUIDs to link to this product + (required, at least one) - user_description (str, optional): Detailed description of the product - certificate_status (str, optional): Certification status ("VERIFIED", "DRAFT", or "DEPRECATED") @@ -95,23 +100,17 @@ def create_data_product_assets( Raises: Exception: If there's an error creating the product assets. - ValueError: If no asset_guids are provided. + ValueError: If no asset_guids are provided (validated in DataProductSpec model). """ data = products if isinstance(products, list) else [products] logger.info(f"Creating {len(data)} data product asset(s)") logger.debug(f"Product specifications: {data}") + # Validation for asset_guids is now handled by DataProductSpec model specs = [DataProductSpec(**item) for item in data] assets: List[DataProduct] = [] for spec in specs: - # Validate that asset_guids is provided and not empty - if not spec.asset_guids: - raise ValueError( - f"Data product '{spec.name}' requires at least one asset GUID. " - "Please provide asset_guids to link assets to this product." - ) - logger.debug( f"Creating DataProduct: {spec.name} under {spec.domain_qualified_name}" ) @@ -130,9 +129,11 @@ def create_data_product_assets( asset_selection=asset_selection, ) product.user_description = spec.user_description + product.certificate_status = ( + spec.certificate_status.value if spec.certificate_status else None + ) if spec.certificate_status: - product.certificate_status = spec.certificate_status.value logger.debug( f"Set certificate status for {spec.name}: {spec.certificate_status.value}" ) @@ -140,54 +141,3 @@ def create_data_product_assets( assets.append(product) return save_assets(assets) - - -def create_domain_assets( - items: Union[Dict[str, Any], List[Dict[str, Any]]], -) -> List[Dict[str, Any]]: - """ - Create Data Domains, Sub Domains, or Data Products based on the specification. - - This is a unified function that determines the type based on the presence of - specific fields in the specification: - - If 'domain_qualified_name' is present -> Data Product - - Otherwise -> Data Domain (or Sub Domain if 'parent_domain_qualified_name' is present) - - Args: - items (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single item - specification (dict) or a list of item specifications. Each specification - should contain fields appropriate for the type being created. - - Returns: - List[Dict[str, Any]]: List of dictionaries, each with details for a created asset: - - guid: The GUID of the created asset - - name: The name of the asset - - qualified_name: The qualified name of the created asset - - Raises: - Exception: If there's an error creating the assets. - """ - data = items if isinstance(items, list) else [items] - logger.info(f"Creating {len(data)} domain-related asset(s)") - - # Separate items by type: products vs domains (including subdomains) - domains = [] - products = [] - - for item in data: - if "domain_qualified_name" in item: - products.append(item) - else: - domains.append(item) - - results = [] - - if domains: - logger.info(f"Creating {len(domains)} data domain/subdomain(s)") - results.extend(create_data_domain_assets(domains)) - - if products: - logger.info(f"Creating {len(products)} data product(s)") - results.extend(create_data_product_assets(products)) - - return results diff --git a/modelcontextprotocol/tools/models.py b/modelcontextprotocol/tools/models.py index c390609..0e9b75a 100644 --- a/modelcontextprotocol/tools/models.py +++ b/modelcontextprotocol/tools/models.py @@ -1,7 +1,7 @@ from enum import Enum from typing import Optional, List -from pydantic import BaseModel +from pydantic import BaseModel, field_validator class CertificateStatus(str, Enum): @@ -95,3 +95,14 @@ class DataProductSpec(BaseModel): asset_guids: List[str] # Required: at least one asset GUID for data products user_description: Optional[str] = None certificate_status: Optional[CertificateStatus] = None + + @field_validator("asset_guids") + @classmethod + def validate_asset_guids(cls, v: List[str]) -> List[str]: + """Validate that asset_guids is not empty.""" + if not v: + raise ValueError( + "Data products require at least one asset GUID. " + "Please provide asset_guids to link assets to this product." + ) + return v diff --git a/modelcontextprotocol/utils/assets.py b/modelcontextprotocol/utils/assets.py index a4ef47d..756cdda 100644 --- a/modelcontextprotocol/utils/assets.py +++ b/modelcontextprotocol/utils/assets.py @@ -23,7 +23,8 @@ def save_assets(assets: List[Asset]) -> List[Dict[str, Any]]: assets (List[Asset]): List of Asset objects to save. Returns: - List[Dict[str, Any]]: List of dictionaries with details for each created asset. + List[Dict[str, Any]]: List of dictionaries with details for each created + or updated asset. Raises: Exception: If there's an error saving the assets. @@ -36,17 +37,37 @@ def save_assets(assets: List[Asset]) -> List[Dict[str, Any]]: logger.error(f"Error saving assets: {e}") raise - created_assets = response.mutated_entities.CREATE - logger.info(f"Save operation completed, processing {len(created_assets)} results") - - results = [ - { - "guid": asset.guid, - "name": asset.name, - "qualified_name": asset.qualified_name, - } - for asset in created_assets - ] + created_assets = response.mutated_entities.CREATE or [] + updated_assets = response.mutated_entities.UPDATE or [] + + logger.info( + f"Save operation completed: {len(created_assets)} created, " + f"{len(updated_assets)} updated" + ) + + results = [] + + # Process created assets + for asset in created_assets: + results.append( + { + "guid": asset.guid, + "name": asset.name, + "qualified_name": asset.qualified_name, + "operation": "CREATE", + } + ) + + # Process updated assets + for asset in updated_assets: + results.append( + { + "guid": asset.guid, + "name": asset.name, + "qualified_name": asset.qualified_name, + "operation": "UPDATE", + } + ) logger.info(f"Bulk save completed successfully for {len(results)} assets") return results