Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 136 additions & 0 deletions modelcontextprotocol/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
create_glossary_category_assets,
create_glossary_assets,
create_glossary_term_assets,
create_data_domain_assets,
create_data_product_assets,
create_dq_rules,
UpdatableAttribute,
CertificateStatus,
Expand Down Expand Up @@ -906,6 +908,140 @@ def create_glossary_categories(categories) -> List[Dict[str, Any]]:
return create_glossary_category_assets(categories)


@mcp.tool()
def create_domains(domains) -> List[Dict[str, Any]]:
"""
Create Data Domains or Sub Domains in Atlan.

IMPORTANT BUSINESS RULES & CONSTRAINTS:
- Before creating a domain/subdomain, you may want to search for existing
domains to avoid duplicates or to get the qualified_name for parent relationships
- Domain names must be unique at the top level
- Subdomain names must be unique within the same parent domain

Args:
domains (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single domain
specification (dict) or a list of domain specifications.

For Data Domain:
- name (str): Name of the domain (required)
- user_description (str, optional): Detailed description
- certificate_status (str, optional): "VERIFIED", "DRAFT", or "DEPRECATED"

For Sub Domain:
- name (str): Name of the subdomain (required)
- parent_domain_qualified_name (str): Qualified name of parent domain (required)
- user_description (str, optional): Detailed description
- certificate_status (str, optional): "VERIFIED", "DRAFT", or "DEPRECATED"

Returns:
List[Dict[str, Any]]: List of dictionaries, each with details for a created asset:
- guid: The GUID of the created asset
- name: The name of the asset
- qualified_name: The qualified name of the created asset

Examples:
# Create a single Data Domain
create_domains({
"name": "Marketing",
"user_description": "Marketing data domain",
"certificate_status": "VERIFIED"
})

# Create a Sub Domain under an existing domain
create_domains({
"name": "Social Marketing",
"parent_domain_qualified_name": "default/domain/marketing",
"user_description": "Social media marketing subdomain",
"certificate_status": "DRAFT"
})

# Create multiple domains in one call
create_domains([
{
"name": "Sales",
"user_description": "Sales data domain"
},
{
"name": "E-commerce Sales",
"parent_domain_qualified_name": "default/domain/sales",
"user_description": "E-commerce sales subdomain"
}
])
"""
# Parse parameters to handle JSON strings using shared utility
try:
domains = parse_json_parameter(domains)
except json.JSONDecodeError as e:
return {"error": f"Invalid JSON format for domains parameter: {str(e)}"}

return create_data_domain_assets(domains)


@mcp.tool()
def create_data_products(products) -> List[Dict[str, Any]]:
"""
Create Data Products in Atlan.

IMPORTANT BUSINESS RULES & CONSTRAINTS:
- Before creating a product, you may want to search for existing domains
to get the qualified_name for the domain relationship
- Product names must be unique within the same domain
- At least one asset GUID must be provided for each product

Args:
products (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single product
specification (dict) or a list of product specifications.

For Data Product:
- name (str): Name of the product (required)
- domain_qualified_name (str): Qualified name of the domain (required)
- asset_guids (List[str]): List of asset GUIDs to link to this product (required).
At least one asset GUID must be provided. Use search_assets_tool to find asset GUIDs.
- user_description (str, optional): Detailed description
- certificate_status (str, optional): "VERIFIED", "DRAFT", or "DEPRECATED"

Returns:
List[Dict[str, Any]]: List of dictionaries, each with details for a created asset:
- guid: The GUID of the created asset
- name: The name of the asset
- qualified_name: The qualified name of the created asset

Examples:
# Create a Data Product with linked assets (asset_guids required)
# First, search for assets to get their GUIDs using search_assets_tool
create_data_products({
"name": "Marketing Influence",
"domain_qualified_name": "default/domain/marketing",
"user_description": "Product for marketing influence analysis",
"asset_guids": ["asset-guid-1", "asset-guid-2"] # GUIDs from search_assets_tool
})

# Create multiple products in one call
create_data_products([
{
"name": "Sales Analytics",
"domain_qualified_name": "default/domain/sales",
"user_description": "Sales analytics product",
"asset_guids": ["table-guid-1", "table-guid-2"]
},
{
"name": "Customer Insights",
"domain_qualified_name": "default/domain/marketing",
"user_description": "Customer insights product",
"asset_guids": ["view-guid-1"]
}
])
"""
# Parse parameters to handle JSON strings using shared utility
try:
products = parse_json_parameter(products)
except json.JSONDecodeError as e:
return {"error": f"Invalid JSON format for products parameter: {str(e)}"}

return create_data_product_assets(products)


@mcp.tool()
def create_dq_rules_tool(rules):
"""
Expand Down
3 changes: 3 additions & 0 deletions modelcontextprotocol/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
create_glossary_assets,
create_glossary_term_assets,
)
from .domain import create_data_domain_assets, create_data_product_assets
from .models import (
CertificateStatus,
UpdatableAttribute,
Expand All @@ -30,6 +31,8 @@
"create_glossary_category_assets",
"create_glossary_assets",
"create_glossary_term_assets",
"create_data_domain_assets",
"create_data_product_assets",
"CertificateStatus",
"UpdatableAttribute",
"UpdatableAsset",
Expand Down
143 changes: 143 additions & 0 deletions modelcontextprotocol/tools/domain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
from __future__ import annotations

import logging
from typing import Any, Dict, List, Union

from pyatlan.model.assets import Asset, DataDomain, DataProduct
from pyatlan.model.fluent_search import CompoundQuery, FluentSearch

from utils import save_assets
from .models import DataDomainSpec, DataProductSpec

logger = logging.getLogger(__name__)


def create_data_domain_assets(
domains: Union[Dict[str, Any], List[Dict[str, Any]]],
) -> List[Dict[str, Any]]:
"""
Create one or multiple Data Domain or Sub Domain assets in Atlan.

Args:
domains (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single domain
specification (dict) or a list of domain specifications. Each specification
can be a dictionary containing:
- name (str): Name of the domain (required)
- parent_domain_qualified_name (str, optional): Qualified name of the parent
domain. If provided, creates a Sub Domain under that parent.
- user_description (str, optional): Detailed description of the domain
- certificate_status (str, optional): Certification status
("VERIFIED", "DRAFT", or "DEPRECATED")

Returns:
List[Dict[str, Any]]: List of dictionaries, each with details for a created domain:
- guid: The GUID of the created domain
- name: The name of the domain
- qualified_name: The qualified name of the created domain

Raises:
Exception: If there's an error creating the domain assets.
"""
data = domains if isinstance(domains, list) else [domains]
logger.info(f"Creating {len(data)} data domain asset(s)")
logger.debug(f"Domain specifications: {data}")

specs = [DataDomainSpec(**item) for item in data]

assets: List[DataDomain] = []
for spec in specs:
logger.debug(
f"Creating DataDomain: {spec.name}"
+ (
f" under {spec.parent_domain_qualified_name}"
if spec.parent_domain_qualified_name
else ""
)
)
domain = DataDomain.creator(
name=spec.name,
parent_domain_qualified_name=spec.parent_domain_qualified_name,
)
domain.user_description = spec.user_description
domain.certificate_status = (
spec.certificate_status.value if spec.certificate_status else None
)

if spec.certificate_status:
logger.debug(
f"Set certificate status for {spec.name}: {spec.certificate_status.value}"
)

assets.append(domain)

return save_assets(assets)


def create_data_product_assets(
products: Union[Dict[str, Any], List[Dict[str, Any]]],
) -> List[Dict[str, Any]]:
"""
Create one or multiple Data Product assets in Atlan.

Args:
products (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single product
specification (dict) or a list of product specifications. Each specification
can be a dictionary containing:
- name (str): Name of the product (required)
- domain_qualified_name (str): Qualified name of the domain this product
belongs to (required)
- asset_guids (List[str]): List of asset GUIDs to link to this product
(required, at least one)
- user_description (str, optional): Detailed description of the product
- certificate_status (str, optional): Certification status
("VERIFIED", "DRAFT", or "DEPRECATED")

Returns:
List[Dict[str, Any]]: List of dictionaries, each with details for a created product:
- guid: The GUID of the created product
- name: The name of the product
- qualified_name: The qualified name of the created product

Raises:
Exception: If there's an error creating the product assets.
ValueError: If no asset_guids are provided (validated in DataProductSpec model).
"""
data = products if isinstance(products, list) else [products]
logger.info(f"Creating {len(data)} data product asset(s)")
logger.debug(f"Product specifications: {data}")

# Validation for asset_guids is now handled by DataProductSpec model
specs = [DataProductSpec(**item) for item in data]

assets: List[DataProduct] = []
for spec in specs:
logger.debug(
f"Creating DataProduct: {spec.name} under {spec.domain_qualified_name}"
)
logger.debug(f"Linking {len(spec.asset_guids)} asset(s) to product")

# Build FluentSearch to select assets by their GUIDs
asset_selection = (
FluentSearch()
.where(CompoundQuery.active_assets())
.where(Asset.GUID.within(spec.asset_guids))
).to_request()

product = DataProduct.creator(
name=spec.name,
domain_qualified_name=spec.domain_qualified_name,
asset_selection=asset_selection,
)
product.user_description = spec.user_description
product.certificate_status = (
spec.certificate_status.value if spec.certificate_status else None
)

if spec.certificate_status:
logger.debug(
f"Set certificate status for {spec.name}: {spec.certificate_status.value}"
)

assets.append(product)

return save_assets(assets)
Loading