|
1 | 1 | import logging |
2 | | -from typing import List, Union, Dict, Any |
| 2 | +from typing import List, Union, Dict, Any, Optional |
3 | 3 | from client import get_atlan_client |
4 | 4 | from .models import UpdatableAsset, UpdatableAttribute, CertificateStatus |
5 | 5 | from pyatlan.model.assets import Readme |
6 | 6 | from pyatlan.model.fluent_search import CompoundQuery, FluentSearch |
| 7 | +from pyatlan.model.fields.atlan_fields import AtlanField |
| 8 | +from utils.asset_history import ( |
| 9 | + validate_asset_history_params, |
| 10 | + create_audit_search_request, |
| 11 | + process_audit_result, |
| 12 | + create_sort_item, |
| 13 | + convert_attributes_to_camel_case, |
| 14 | +) |
7 | 15 |
|
8 | 16 | # Initialize logging |
9 | 17 | logger = logging.getLogger(__name__) |
@@ -128,3 +136,96 @@ def update_assets( |
128 | 136 | error_msg = f"Error updating assets: {str(e)}" |
129 | 137 | logger.error(error_msg) |
130 | 138 | return {"updated_count": 0, "errors": [error_msg]} |
| 139 | + |
| 140 | + |
| 141 | +def get_asset_history( |
| 142 | + guid: Optional[str] = None, |
| 143 | + qualified_name: Optional[str] = None, |
| 144 | + type_name: Optional[str] = None, |
| 145 | + size: int = 10, |
| 146 | + sort_order: str = "DESC", |
| 147 | + include_attributes: Optional[List[Union[str, AtlanField]]] = None, |
| 148 | +) -> Dict[str, Any]: |
| 149 | + """ |
| 150 | + Get the audit history of an asset by GUID or qualified name. |
| 151 | +
|
| 152 | + Args: |
| 153 | + guid (Optional[str]): GUID of the asset to get history for. |
| 154 | + Either guid or qualified_name must be provided. |
| 155 | + qualified_name (Optional[str]): Qualified name of the asset to get history for. |
| 156 | + Either guid or qualified_name must be provided. |
| 157 | + type_name (Optional[str]): Type name of the asset (required when using qualified_name). |
| 158 | + Examples: "Table", "Column", "DbtModel", "AtlasGlossary" |
| 159 | + size (int): Number of history entries to return. Defaults to 10. |
| 160 | + sort_order (str): Sort order for results. "ASC" for oldest first, "DESC" for newest first. |
| 161 | + Defaults to "DESC". |
| 162 | + include_attributes (List[Union[str, AtlanField]], optional): List of additional attributes to include in results. |
| 163 | + Can be string attribute names or AtlanField objects. These will be added to the default set. |
| 164 | +
|
| 165 | + Returns: |
| 166 | + Dict[str, Any]: Dictionary containing: |
| 167 | + - entityAudits: List of audit entries |
| 168 | + - count: Number of audit entries returned |
| 169 | + - totalCount: Total number of audit entries available |
| 170 | + - errors: List of any errors encountered |
| 171 | +
|
| 172 | + Raises: |
| 173 | + Exception: If there's an error retrieving the asset history |
| 174 | + """ |
| 175 | + try: |
| 176 | + # Validate input parameters |
| 177 | + validation_error = validate_asset_history_params( |
| 178 | + guid, qualified_name, type_name, sort_order |
| 179 | + ) |
| 180 | + if validation_error: |
| 181 | + logger.error(validation_error) |
| 182 | + return { |
| 183 | + "errors": [validation_error], |
| 184 | + "entityAudits": [], |
| 185 | + "count": 0, |
| 186 | + "totalCount": 0, |
| 187 | + } |
| 188 | + |
| 189 | + logger.info( |
| 190 | + f"Retrieving asset history with parameters: guid={guid}, qualified_name={qualified_name}, size={size}" |
| 191 | + ) |
| 192 | + |
| 193 | + # Get Atlan client |
| 194 | + client = get_atlan_client() |
| 195 | + |
| 196 | + # Create sort item |
| 197 | + sort_item = create_sort_item(sort_order) |
| 198 | + |
| 199 | + # Convert include_attributes from snake_case to camelCase if needed |
| 200 | + if include_attributes: |
| 201 | + include_attributes = convert_attributes_to_camel_case(include_attributes) |
| 202 | + |
| 203 | + # Create and execute audit search request |
| 204 | + request = create_audit_search_request( |
| 205 | + guid, qualified_name, type_name, size, sort_item, include_attributes |
| 206 | + ) |
| 207 | + response = client.audit.search(criteria=request, bulk=False) |
| 208 | + |
| 209 | + # Process audit results - use current_page() to respect size parameter |
| 210 | + entity_audits = [ |
| 211 | + process_audit_result(result, include_attributes) |
| 212 | + for result in response.current_page() |
| 213 | + ] |
| 214 | + |
| 215 | + result_data = { |
| 216 | + "entityAudits": entity_audits, |
| 217 | + "count": len(entity_audits), |
| 218 | + "totalCount": response.total_count, |
| 219 | + "errors": [], |
| 220 | + } |
| 221 | + |
| 222 | + logger.info( |
| 223 | + f"Successfully retrieved {len(entity_audits)} audit entries for asset" |
| 224 | + ) |
| 225 | + return result_data |
| 226 | + |
| 227 | + except Exception as e: |
| 228 | + error_msg = f"Error retrieving asset history: {str(e)}" |
| 229 | + logger.error(error_msg) |
| 230 | + logger.exception("Exception details:") |
| 231 | + return {"errors": [error_msg], "entityAudits": [], "count": 0, "totalCount": 0} |
0 commit comments