Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions modelcontextprotocol/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
UpdatableAttribute,
CertificateStatus,
UpdatableAsset,
get_asset_source_or_destination,
)
from pyatlan.model.lineage import LineageDirection
from utils.parameters import (
Expand Down Expand Up @@ -336,6 +337,68 @@ def get_assets_by_dsl_tool(dsl_query):
"""
return get_assets_by_dsl(dsl_query)

@mcp.tool()
def get_asset_source_or_destination_tool(
guid,
direction,
depth=1000000,
size=100,
ignore_types: List[str] = ['process', 'aimodel'],
):
"""
Get the root source or final destination assets for a given asset by traversing lineage.

This is not for getting the full lineage of an asset, but rather for getting the root source or final destination asset.
This function identifies root source or final destination assets in the lineage graph
either root source assets (those with no upstream dependencies) or final destination assets (those with no downstream
dependencies). Asset types can be filtered out using the ignore_types parameter.


Args:
guid (str): GUID of the starting asset
direction (str): Direction to traverse ("UPSTREAM" for sources, "DOWNSTREAM" for destinations)
depth (int, optional): Maximum depth to traverse. Should default to 1000000.
size (int, optional): Maximum number of results to return. Should default to 100.
ignore_types (List[str], optional): List of asset type keywords to ignore
(case-insensitive matching). Defaults to ['process', 'aimodel'].

Returns:
Dict[str, Any]: Dictionary containing:
- assets: List of terminal assets (sources or destinations) with processed attributes
- error: None if no error occurred, otherwise the error message

Examples:
# Find source assets (no upstream dependencies) for a table
sources = get_asset_source_or_destination_tool(
guid="table-guid-here",
direction="UPSTREAM",
depth=1000,
size=20
)

# Find destination assets (no downstream dependencies) for a dataset
destinations = get_asset_source_or_destination_tool(
guid="dataset-guid-here",
direction="DOWNSTREAM",
depth=500,
size=15,
ignore_types=["process", "aimodel", "aimodelversion"]
)
"""
try:
direction_enum = LineageDirection[direction.upper()]
except KeyError:
raise ValueError(
f"Invalid direction: {direction}. Must be either 'UPSTREAM' or 'DOWNSTREAM'"
)

return get_asset_source_or_destination(
guid=guid,
direction=direction_enum,
depth=int(depth),
size=int(size),
ignore_types=ignore_types,
)

@mcp.tool()
def traverse_lineage_tool(
Expand Down
3 changes: 2 additions & 1 deletion modelcontextprotocol/tools/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from .search import search_assets
from .dsl import get_assets_by_dsl
from .lineage import traverse_lineage
from .lineage import traverse_lineage, get_asset_source_or_destination
from .assets import update_assets
from .glossary import (
create_glossary_category_assets,
Expand All @@ -20,6 +20,7 @@
"search_assets",
"get_assets_by_dsl",
"traverse_lineage",
"get_asset_source_or_destination",
"update_assets",
"create_glossary_category_assets",
"create_glossary_assets",
Expand Down
78 changes: 78 additions & 0 deletions modelcontextprotocol/tools/lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,81 @@ def traverse_lineage(
except Exception as e:
logger.error(f"Error traversing lineage: {str(e)}")
return {"assets": [], "error": str(e)}

def get_asset_source_or_destination(
guid: str,
direction: LineageDirection,
depth: int = 1000000,
size: int = 100,
ignore_types: Optional[List[str]] = ['process', 'aimodel']
) -> Dict[str, Any]:
"""
Get the source or destination assets for a given asset by traversing lineage.

This function identifies terminal assets in the lineage graph - either source assets
(those with no upstream dependencies) or destination assets (those with no downstream
dependencies). Asset types can be filtered out using the ignore_types parameter.

Args:
guid (str): GUID of the starting asset
direction (LineageDirection): Direction to traverse (UPSTREAM for sources, DOWNSTREAM for destinations)
depth (int, optional): Maximum depth to traverse. Defaults to 1000000.
size (int, optional): Maximum number of results to return. Defaults to 100.
ignore_types (Optional[List[str]], optional): List of asset type keywords to ignore
(case-insensitive matching). Defaults to ['process', 'aimodel'].

Returns:
Dict[str, Any]: Dictionary containing:
- assets: List of terminal assets (sources or destinations) with processed attributes
- error: None if no error occurred, otherwise the error message

Raises:
ValueError: If direction is not UPSTREAM or DOWNSTREAM
Exception: If there's an error executing the lineage request
"""

logger.info(f"Starting lineage source check for {guid}, depth={depth}, size={size}")
logger.info(f"Ignore types: {ignore_types}")

try:
client = get_atlan_client()
request = (
FluentLineage(starting_guid=guid)
.direction(direction)
.immediate_neighbors(True)
.depth(depth)
.size(size)
.request
)
response = client.asset.get_lineage_list(request)
assets = []
for asset in response:
if direction == LineageDirection.UPSTREAM: # This is a source check
# Check if the asset has the 'immediate_upstream' attribute and if it's not empty
if not hasattr(asset, 'immediate_upstream') or not asset.immediate_upstream:
# Skip assets that contain any of the exclude keywords in their type_name
if hasattr(asset, 'type_name'):
asset_type = asset.type_name.lower()
if any(keyword in asset_type for keyword in ignore_types):
continue # Skip assets with excluded keywords
assets.append(asset.dict(by_alias=True, exclude_unset=True))
logger.info(f"Total assets with no immediate {direction} lineage: {len(assets)}")
elif direction == LineageDirection.DOWNSTREAM: # This is a destination check
# Check to make sure it's not the same asset
if asset.guid == guid:
continue
# Check if the asset has the 'immediate_downstream' attribute and if it's not empty
if not hasattr(asset, 'immediate_downstream') or not asset.immediate_downstream:
# Skip assets that contain any of the exclude keywords in their type_name
if hasattr(asset, 'type_name'):
asset_type = asset.type_name.lower()
if any(keyword in asset_type for keyword in ignore_types):
continue # Skip assets with excluded keywords
assets.append(asset.dict(by_alias=True, exclude_unset=True))
logger.info(f"Total assets with no immediate {direction} lineage: {len(assets)}")
else:
raise ValueError(f"Invalid direction: {direction}. Must be either 'UPSTREAM' or 'DOWNSTREAM'")
return {"assets": assets, "error": None}
except Exception as e:
logger.error(f"Error traversing lineage source or destination: {str(e)}")
return {"assets": [], "error": str(e)}
Loading