diff --git a/modelcontextprotocol/server.py b/modelcontextprotocol/server.py index 3290859..7cf7a08 100644 --- a/modelcontextprotocol/server.py +++ b/modelcontextprotocol/server.py @@ -16,6 +16,8 @@ CertificateStatus, UpdatableAsset, TermOperations, + AnnouncementData, + AnnouncementType, ) from pyatlan.model.lineage import LineageDirection from utils.parameters import ( @@ -480,11 +482,13 @@ def update_assets_tool( Can be a single UpdatableAsset or a list of UpdatableAsset objects. For asset of type_name=AtlasGlossaryTerm or type_name=AtlasGlossaryCategory, each asset dictionary MUST include a "glossary_guid" key which is the GUID of the glossary that the term belongs to. attribute_name (str): Name of the attribute to update. - Supports "user_description", "certificate_status", "readme", and "term". + Supports "user_description", "certificate_status", "readme", "term", and "announcement". attribute_values (List[Union[str, Dict[str, Any]]]): List of values to set for the attribute. For certificateStatus, only "VERIFIED", "DRAFT", or "DEPRECATED" are allowed. For readme, the value must be a valid Markdown string. For term, the value must be a dict with "operation" and "term_guids" keys. + For announcement, the value must be a dict with "announcement_title", "announcement_type" ("information", "warning", or "issue"), and optional "announcement_message". + Note: Only one announcement can exist per asset. Updating an announcement will replace any existing announcement. Returns: Dict[str, Any]: Dictionary containing: @@ -602,6 +606,73 @@ def update_assets_tool( "term_guids": ["term-guid-to-remove"] }] ) + + # Add an informational announcement to a single asset + # Note: Only one announcement can exist per asset. This will replace any existing announcement. + result = update_assets_tool( + assets={ + "guid": "asset-guid-here", + "name": "Customer Data Table", + "type_name": "Table", + "qualified_name": "default/snowflake/123456/abc/CUSTOMER_DATA" + }, + attribute_name="announcement", + attribute_values=[{ + "announcement_title": "Scheduled Maintenance", + "announcement_type": "information", + "announcement_message": "This table will be unavailable for maintenance on 2024-01-15 from 2 AM to 4 AM EST." + }] + ) + + # Add announcements to multiple assets (each asset gets its own announcement) + # Note: Each asset can only have one announcement. Updating will replace any existing announcement. + result = update_assets_tool( + assets=[ + { + "guid": "asset-guid-1", + "name": "Table 1", + "type_name": "Table", + "qualified_name": "default/snowflake/123456/abc/TABLE_1" + }, + { + "guid": "asset-guid-2", + "name": "Table 2", + "type_name": "Table", + "qualified_name": "default/snowflake/123456/abc/TABLE_2" + } + ], + attribute_name="announcement", + attribute_values=[ + { + "announcement_title": "Data Quality Issue", + "announcement_type": "warning", + "announcement_message": "This table contains incomplete data. Please verify before use." + }, + { + "announcement_title": "Schema Change Pending", + "announcement_type": "warning", + "announcement_message": "Schema changes will be applied next week." + } + ] + ) + + # Add an issue announcement to a glossary term + # Note: Only one announcement can exist per asset. This will replace any existing announcement. + result = update_assets_tool( + assets={ + "guid": "term-guid-here", + "name": "Customer", + "type_name": "AtlasGlossaryTerm", + "qualified_name": "term-qualified-name", + "glossary_guid": "glossary-guid-here" + }, + attribute_name="announcement", + attribute_values=[{ + "announcement_title": "Term Deprecation Notice", + "announcement_type": "issue", + "announcement_message": "This term is being deprecated. Please use 'Client' instead." + }] + ) """ try: # Parse JSON parameters @@ -623,6 +694,28 @@ def update_assets_tool( "updated_count": 0, } parsed_attribute_values = term_operations + # Handle announcement operations - convert dict to AnnouncementData object + elif attr_enum == UpdatableAttribute.ANNOUNCEMENT: + announcement_data_list = [] + for value in parsed_attribute_values: + if isinstance(value, dict): + # Convert announcement_type string to enum + if "announcement_type" in value: + announcement_type_str = value["announcement_type"].lower() + try: + value["announcement_type"] = AnnouncementType(announcement_type_str) + except ValueError: + return { + "error": f"Invalid announcement_type: {announcement_type_str}. Must be 'information', 'warning', or 'issue'", + "updated_count": 0, + } + announcement_data_list.append(AnnouncementData(**value)) + else: + return { + "error": "Announcement attribute values must be dictionaries with 'announcement_title', 'announcement_type', and optional 'announcement_message' keys", + "updated_count": 0, + } + parsed_attribute_values = announcement_data_list # For certificate status, convert values to enum elif attr_enum == UpdatableAttribute.CERTIFICATE_STATUS: parsed_attribute_values = [ diff --git a/modelcontextprotocol/tools/__init__.py b/modelcontextprotocol/tools/__init__.py index f9d18f6..2cb0523 100644 --- a/modelcontextprotocol/tools/__init__.py +++ b/modelcontextprotocol/tools/__init__.py @@ -13,6 +13,8 @@ UpdatableAttribute, UpdatableAsset, TermOperations, + AnnouncementData, + AnnouncementType, Glossary, GlossaryCategory, GlossaryTerm, @@ -31,6 +33,8 @@ "UpdatableAttribute", "UpdatableAsset", "TermOperations", + "AnnouncementData", + "AnnouncementType", "Glossary", "GlossaryCategory", "GlossaryTerm", diff --git a/modelcontextprotocol/tools/assets.py b/modelcontextprotocol/tools/assets.py index b90f2c9..0fca3d3 100644 --- a/modelcontextprotocol/tools/assets.py +++ b/modelcontextprotocol/tools/assets.py @@ -7,9 +7,13 @@ CertificateStatus, TermOperation, TermOperations, + AnnouncementData, + AnnouncementType, ) from pyatlan.model.assets import Readme, AtlasGlossaryTerm, AtlasGlossaryCategory from pyatlan.model.fluent_search import CompoundQuery, FluentSearch +from pyatlan.model.core import Announcement +from pyatlan.model.enums import AnnouncementType as PyAtlanAnnouncementType # Initialize logging logger = logging.getLogger(__name__) @@ -18,7 +22,7 @@ def update_assets( updatable_assets: Union[UpdatableAsset, List[UpdatableAsset]], attribute_name: UpdatableAttribute, - attribute_values: List[Union[str, CertificateStatus, TermOperations]], + attribute_values: List[Union[str, CertificateStatus, TermOperations, AnnouncementData]], ) -> Dict[str, Any]: """ Update one or multiple assets with different values for attributes or term operations. @@ -28,11 +32,13 @@ def update_assets( Can be a single UpdatableAsset or a list of UpdatableAssets. For asset of type_name=AtlasGlossaryTerm or type_name=AtlasGlossaryCategory, each asset dictionary MUST include a "glossary_guid" key which is the GUID of the glossary that the term belongs to. attribute_name (UpdatableAttribute): Name of the attribute to update. - Supports userDescription, certificateStatus, readme, and term. - attribute_values (List[Union[str, CertificateStatus, TermOperations]]): List of values to set for the attribute. + Supports userDescription, certificateStatus, readme, term, and announcement. + attribute_values (List[Union[str, CertificateStatus, TermOperations, AnnouncementData]]): List of values to set for the attribute. For certificateStatus, only VERIFIED, DRAFT, or DEPRECATED are allowed. For readme, the value must be a valid Markdown string. For term, the value must be a TermOperations object with operation and term_guids. + For announcement, the value must be an AnnouncementData object with announcement_title, announcement_type, and optional announcement_message. + Note: Only one announcement can exist per asset. Updating an announcement will replace any existing announcement. Returns: Dict[str, Any]: Dictionary containing: @@ -168,6 +174,69 @@ def update_assets( error_msg = f"Error updating terms on asset {updatable_asset.qualified_name}: {str(e)}" logger.error(error_msg) result["errors"].append(error_msg) + elif attribute_name == UpdatableAttribute.ANNOUNCEMENT: + # Special handling for announcement updates + announcement_value = attribute_values[index] + if not isinstance(announcement_value, AnnouncementData): + error_msg = f"Announcement value must be an AnnouncementData object for asset {updatable_asset.qualified_name}" + logger.error(error_msg) + result["errors"].append(error_msg) + continue + + # Convert AnnouncementType enum to PyAtlan AnnouncementType + announcement_type_map = { + AnnouncementType.INFORMATION: PyAtlanAnnouncementType.INFORMATION, + AnnouncementType.WARNING: PyAtlanAnnouncementType.WARNING, + AnnouncementType.ISSUE: PyAtlanAnnouncementType.ISSUE, + } + pyatlan_announcement_type = announcement_type_map.get( + announcement_value.announcement_type + ) + if not pyatlan_announcement_type: + error_msg = f"Invalid announcement type: {announcement_value.announcement_type}" + logger.error(error_msg) + result["errors"].append(error_msg) + continue + + # Create PyAtlan Announcement object + pyatlan_announcement = Announcement( + announcement_title=announcement_value.announcement_title, + announcement_type=pyatlan_announcement_type, + announcement_message=announcement_value.announcement_message, + ) + + try: + # Use the client's update_announcement method + glossary_guid = ( + updatable_asset.glossary_guid + if ( + updatable_asset.type_name == AtlasGlossaryTerm.__name__ + or updatable_asset.type_name == AtlasGlossaryCategory.__name__ + ) + else None + ) + updated_asset = client.asset.update_announcement( + asset_type=asset_cls, + qualified_name=updatable_asset.qualified_name, + name=updatable_asset.name, + announcement=pyatlan_announcement, + glossary_guid=glossary_guid, + ) + + if updated_asset: + result["updated_count"] += 1 + logger.info( + f"Successfully updated announcement on asset: {updatable_asset.qualified_name}" + ) + else: + error_msg = f"Failed to update announcement on asset {updatable_asset.qualified_name}" + logger.error(error_msg) + result["errors"].append(error_msg) + + except Exception as e: + error_msg = f"Error updating announcement on asset {updatable_asset.qualified_name}: {str(e)}" + logger.error(error_msg) + result["errors"].append(error_msg) else: # Regular attribute update flow setattr(asset, attribute_name.value, attribute_values[index]) diff --git a/modelcontextprotocol/tools/models.py b/modelcontextprotocol/tools/models.py index 814de7f..34dfb2f 100644 --- a/modelcontextprotocol/tools/models.py +++ b/modelcontextprotocol/tools/models.py @@ -19,6 +19,7 @@ class UpdatableAttribute(str, Enum): CERTIFICATE_STATUS = "certificate_status" README = "readme" TERM = "term" + ANNOUNCEMENT = "announcement" class TermOperation(str, Enum): @@ -36,6 +37,22 @@ class TermOperations(BaseModel): term_guids: List[str] +class AnnouncementType(str, Enum): + """Enum for announcement types.""" + + INFORMATION = "information" + WARNING = "warning" + ISSUE = "issue" + + +class AnnouncementData(BaseModel): + """Model for announcement data on assets.""" + + announcement_title: str + announcement_type: AnnouncementType + announcement_message: Optional[str] = None + + class UpdatableAsset(BaseModel): """Class representing an asset that can be updated.""" diff --git a/modelcontextprotocol/utils/constants.py b/modelcontextprotocol/utils/constants.py index 534f623..315cd0e 100644 --- a/modelcontextprotocol/utils/constants.py +++ b/modelcontextprotocol/utils/constants.py @@ -15,4 +15,9 @@ "readme", "owner_groups", "asset_tags", + "announcement_title", + "announcement_message", + "announcement_type", + "announcement_updated_at", + "announcement_updated_by", ]