Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
327 changes: 327 additions & 0 deletions modelcontextprotocol/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
create_glossary_category_assets,
create_glossary_assets,
create_glossary_term_assets,
get_workflow_package_names,
get_workflows_by_type,
get_workflow_by_id,
get_workflow_runs,
get_workflow_runs_by_status_and_time_range,
UpdatableAttribute,
CertificateStatus,
UpdatableAsset,
Expand Down Expand Up @@ -905,6 +910,328 @@ def create_glossary_categories(categories) -> List[Dict[str, Any]]:
return create_glossary_category_assets(categories)


@mcp.tool()
def get_workflow_package_names_tool() -> List[str]:
"""
Get the values of all available workflow packages.

This tool returns a list of all workflow package names that can be used
with other workflow tools.

Returns:
List[str]: List of workflow package names (e.g., ['atlan-snowflake', 'atlan-bigquery', ...])

Examples:
# Get all workflow package names
packages = get_workflow_package_names_tool()
"""
return get_workflow_package_names()


@mcp.tool()
def get_workflows_by_type_tool(workflow_package_name: str, max_results: int = 10) -> Dict[str, Any]:
"""
Retrieve workflows (WorkflowTemplate) by workflow package name.

Naming Convention Note: The Argo/Kubernetes API uses terminology that can be confusing:
- "WorkflowTemplate" (kind="WorkflowTemplate") = workflow (template definition)
- "Workflow" (kind="Workflow") = workflow_run (executed instance/run)

IMPORTANT: This tool returns only basic metadata. It does NOT return full workflow
instructions, steps, or transformations. To get complete workflow details including all steps and
transformations, use get_workflow_by_id_tool instead.

Note: This tool only returns workflows (WorkflowTemplate), not workflow_runs.
To get workflow_run information, use functions that start with get_workflow_run.

When to use this tool:
- Use when you need to list multiple workflows by package type
- Use when you only need metadata (package info, certification status) without full workflow details
- Do NOT use if you need the complete workflow specification with all steps and transformations
- Do NOT use if you need workflow_run (execution) information

Args:
workflow_package_name (str): The workflow package name (e.g., 'atlan-snowflake', 'atlan-bigquery', 'SNOWFLAKE').
Can be the full package name like 'atlan-snowflake' or the enum name like 'SNOWFLAKE'.
max_results (int, optional): Maximum number of workflows to return. Defaults to 10.

Returns:
Dict[str, Any]: Dictionary containing:
- workflows: List of workflow (WorkflowTemplate) dictionaries. Each workflow contains:
- template_name: Name of the template
- package_name, package_version: Package information
- source_system, source_category, workflow_type: Source information
- certified, verified: Certification status
- creator_email, creator_username: Creator information
- modifier_email, modifier_username: Last modifier information
- creation_timestamp: When template was created
- package_author, package_description, package_repository: Package metadata
- total: Total count of workflows found

Examples:
# Get Snowflake workflows
result = get_workflows_by_type_tool("atlan-snowflake")

# Get BigQuery workflows with custom limit
result = get_workflows_by_type_tool("atlan-bigquery", max_results=50)

# Get workflows using enum name
result = get_workflows_by_type_tool("SNOWFLAKE")
"""
return get_workflows_by_type(workflow_package_name=workflow_package_name, max_results=max_results)


@mcp.tool()
def get_workflow_by_id_tool(id: str) -> Dict[str, Any]:
"""
Retrieve a specific workflow (WorkflowTemplate) by its ID.

Naming Convention Note: The Argo/Kubernetes API uses terminology that can be confusing:
- "WorkflowTemplate" (kind="WorkflowTemplate") = workflow (template definition)
- "Workflow" (kind="Workflow") = workflow_run (executed instance/run)

IMPORTANT: This is the ONLY tool that returns full workflow instructions with all steps and transformations.
All other workflow tools (get_workflows_by_type_tool, get_workflow_runs_tool, etc.) return only metadata
and execution status, but NOT the complete workflow definition, steps, or transformation details.

This tool uses include_dag=True internally, which means it returns the complete workflow specification
including the workflow DAG (directed acyclic graph), all steps, transformations, and execution details.

Note: This tool only returns workflows (WorkflowTemplate), not workflow_runs.
To get workflow_run information, use functions that start with get_workflow_run.

When to use this tool:
- Use when you need the COMPLETE workflow definition with all steps and transformations
- Use when you need to understand the full workflow execution flow
- Use when you need workflow_spec and workflow_steps data
- Do NOT use if you only need basic metadata (use other workflow tools instead)
- Do NOT use if you need workflow_run (execution) information

Args:
id (str): The unique identifier (ID) of the workflow (e.g., 'atlan-snowflake-miner-1714638976').

Returns:
Dict[str, Any]: Dictionary containing:
- workflow: The workflow (WorkflowTemplate) dictionary with comprehensive metadata, or None if not found.
The workflow dictionary contains:
- template_name: Name of the template
- package_name, package_version: Package information
- source_system, source_category, workflow_type: Source information
- certified, verified: Certification status
- creator_email, creator_username: Creator information
- modifier_email, modifier_username: Last modifier information
- creation_timestamp: When template was created
- package_author, package_description, package_repository: Package metadata
- workflow_steps: Workflow steps (when include_dag=True)
- workflow_spec: Full workflow specification (when include_dag=True)
- error: None if no error occurred, otherwise the error message

Examples:
# Get a specific workflow by ID to see full workflow instructions
result = get_workflow_by_id_tool("atlan-snowflake-miner-1714638976")

# Access full workflow steps and transformations
workflow = result.get("workflow")
# The workflow object contains complete workflow definition with workflow_steps and workflow_spec
"""
return get_workflow_by_id(id=id)


@mcp.tool()
def get_workflow_runs_tool(
workflow_name: str,
workflow_phase: str,
from_: int = 0,
size: int = 100,
) -> Dict[str, Any]:
"""
Retrieve all workflow_runs for a specific workflow and phase.

Naming Convention Note: The Argo/Kubernetes API uses terminology that can be confusing:
- "WorkflowTemplate" (kind="WorkflowTemplate") = workflow (template definition)
- "Workflow" (kind="Workflow") = workflow_run (executed instance/run)

This tool returns workflow_run instances (kind="Workflow") that match the specified workflow name and phase.
Each workflow_run contains execution details, timing, resource usage, and status information.

IMPORTANT: This tool returns only metadata and execution status. It does NOT return full workflow
instructions, steps, or transformations. To get complete workflow details including all steps and
transformations, use get_workflow_by_id_tool instead.

When to use this tool:
- Use when you need to find all workflow_runs of a specific workflow by execution phase
- Use when you need execution metadata (status, timing, resource usage) for multiple workflow_runs
- Use when you need to filter workflow_runs by phase (Succeeded, Failed, Running, etc.)
- Do NOT use if you need the complete workflow specification (use get_workflow_by_id_tool instead)

Args:
workflow_name (str): Name of the workflow (template) as displayed in the UI (e.g., 'atlan-snowflake-miner-1714638976').
This refers to the workflow name, not individual workflow_run IDs.
workflow_phase (str): Phase of the workflow_run. Common values: 'Succeeded', 'Running', 'Failed', 'Error', 'Pending'.
Case-insensitive matching is supported.
from_ (int, optional): Starting index of the search results for pagination. Defaults to 0.
size (int, optional): Maximum number of search results to return. Defaults to 100.

Returns:
Dict[str, Any]: Dictionary containing:
- runs: List of workflow_run dictionaries. Each workflow_run contains:
Run Metadata:
- run_id: Unique identifier for this workflow_run
- run_phase: Execution phase (Succeeded, Running, Failed, etc.)
- run_started_at: When the workflow_run started (ISO timestamp)
- run_finished_at: When the workflow_run finished (ISO timestamp, None if still running)
- run_estimated_duration: Estimated execution duration
- run_progress: Progress indicator (e.g., "1/3")
- run_cpu_usage: CPU resource usage duration
- run_memory_usage: Memory resource usage duration

Workflow Metadata:
- workflow_id: Reference to the workflow (template) used
- workflow_package_name: Package identifier
- workflow_cron_schedule: Cron schedule if scheduled
- workflow_cron_timezone: Timezone for cron schedule
- workflow_creator_id, workflow_creator_email, workflow_creator_username: Creator information
- workflow_modifier_id, workflow_modifier_email, workflow_modifier_username: Last modifier information
- workflow_creation_timestamp: When the workflow was created
- workflow_archiving_status: Archiving status
- total: Total count of workflow_runs matching the criteria
- error: None if no error occurred, otherwise the error message

Examples:
# Get succeeded workflow_runs
result = get_workflow_runs_tool("atlan-snowflake-miner-1714638976", "Succeeded")

# Get running workflow_runs
result = get_workflow_runs_tool("atlan-snowflake-miner-1714638976", "Running")

# Get failed workflow_runs with pagination
result = get_workflow_runs_tool("atlan-snowflake-miner-1714638976", "Failed", from_=0, size=50)

# Analyze workflow_run durations
runs = result.get("runs", [])
for run in runs:
if run.get("run_finished_at") and run.get("run_started_at"):
print(f"Workflow run {run['run_id']} took {run.get('run_estimated_duration')}")
"""
return get_workflow_runs(
workflow_name=workflow_name,
workflow_phase=workflow_phase,
from_=from_,
size=size,
)


@mcp.tool()
def get_workflow_runs_by_status_and_time_range_tool(
status: List[str],
started_at: str | None = None,
finished_at: str | None = None,
from_: int = 0,
size: int = 100,
) -> Dict[str, Any]:
"""
Retrieve workflow_runs based on their status and time range.

Naming Convention Note: The Argo/Kubernetes API uses terminology that can be confusing:
- "WorkflowTemplate" (kind="WorkflowTemplate") = workflow (template definition)
- "Workflow" (kind="Workflow") = workflow_run (executed instance/run)

This tool allows you to search for workflow_run instances (kind="Workflow") by filtering on their execution status
(e.g., Succeeded, Failed, Running) and optionally by time ranges. This is useful for
monitoring, debugging, and analyzing workflow execution patterns.

IMPORTANT: This tool returns only metadata and execution status. It does NOT return full workflow
instructions, steps, or transformations. To get complete workflow details including all steps and
transformations, use get_workflow_by_id_tool instead.

When to use this tool:
- Use when you need to find workflow_runs across multiple workflows filtered by status and time
- Use when monitoring workflow execution patterns or analyzing failures
- Use when you need execution metadata for workflow_runs within a specific time window
- Do NOT use if you need the complete workflow specification (use get_workflow_by_id_tool instead)

Args:
status (List[str]): List of workflow_run phases to filter by. Common values:
- 'Succeeded': Successfully completed workflow_runs
- 'Failed': Workflow_runs that failed
- 'Running': Currently executing workflow_runs
- 'Error': Workflow_runs that encountered errors
- 'Pending': Workflow_runs waiting to start
Case-insensitive matching is supported. Example: ['Succeeded', 'Failed']
started_at (str, optional): Lower bound on 'status.startedAt' timestamp. Accepts:
- Relative time format: 'now-2h', 'now-24h', 'now-7d', 'now-30d'
- ISO 8601 format: '2024-01-01T00:00:00Z'
Filters workflow_runs that started at or after this time.
finished_at (str, optional): Lower bound on 'status.finishedAt' timestamp. Accepts:
- Relative time format: 'now-1h', 'now-12h'
- ISO 8601 format: '2024-01-01T00:00:00Z'
Filters workflow_runs that finished at or after this time.
from_ (int, optional): Starting index of the search results for pagination. Defaults to 0.
size (int, optional): Maximum number of search results to return. Defaults to 100.

Returns:
Dict[str, Any]: Dictionary containing:
- runs: List of workflow_run dictionaries matching the criteria. Each workflow_run contains:
Run Metadata:
- run_id: Unique identifier for this workflow_run
- run_phase: Execution phase (Succeeded, Running, Failed, etc.)
- run_started_at: When the workflow_run started (ISO timestamp)
- run_finished_at: When the workflow_run finished (ISO timestamp, None if still running)
- run_estimated_duration: Estimated execution duration
- run_progress: Progress indicator (e.g., "1/3")
- run_cpu_usage: CPU resource usage duration
- run_memory_usage: Memory resource usage duration

Workflow Metadata:
- workflow_id: Reference to the workflow (template) used
- workflow_package_name: Package identifier
- workflow_cron_schedule: Cron schedule if scheduled
- workflow_cron_timezone: Timezone for cron schedule
- workflow_creator_id, workflow_creator_email, workflow_creator_username: Creator information
- workflow_modifier_id, workflow_modifier_email, workflow_modifier_username: Last modifier information
- workflow_creation_timestamp: When the workflow was created
- workflow_archiving_status: Archiving status
- total: Total count of workflow_runs matching the criteria (may be larger than the returned list if paginated)
- error: None if no error occurred, otherwise the error message

Examples:
# Get succeeded workflow_runs from the last 2 hours
result = get_workflow_runs_by_status_and_time_range_tool(
status=["Succeeded"],
started_at="now-2h"
)

# Get failed workflow_runs from the last 24 hours
result = get_workflow_runs_by_status_and_time_range_tool(
status=["Failed"],
started_at="now-24h"
)

# Get multiple statuses with both time filters
result = get_workflow_runs_by_status_and_time_range_tool(
status=["Succeeded", "Failed"],
started_at="now-7d",
finished_at="now-1h"
)

# Get running workflow_runs
result = get_workflow_runs_by_status_and_time_range_tool(
status=["Running"]
)

# Analyze failure rates
failed_runs = [r for r in result.get("runs", []) if r.get("run_phase") == "Failed"]
print(f"Found {len(failed_runs)} failed workflow_runs in the time range")
"""
return get_workflow_runs_by_status_and_time_range(
status=status,
started_at=started_at,
finished_at=finished_at,
from_=from_,
size=size,
)


def main():
"""Main entry point for the Atlan MCP Server."""

Expand Down
12 changes: 12 additions & 0 deletions modelcontextprotocol/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@
GlossaryCategory,
GlossaryTerm,
)
from .workflows import (
get_workflow_package_names,
get_workflows_by_type,
get_workflow_by_id,
get_workflow_runs,
get_workflow_runs_by_status_and_time_range,
)

__all__ = [
"search_assets",
Expand All @@ -27,6 +34,11 @@
"create_glossary_category_assets",
"create_glossary_assets",
"create_glossary_term_assets",
"get_workflow_package_names",
"get_workflows_by_type",
"get_workflow_by_id",
"get_workflow_runs",
"get_workflow_runs_by_status_and_time_range",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we merge few tools like "get_workflow_runs" and "get_workflow_runs_by_status_and_time_range", just thinking if we can reduce number of tools here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will make those changes! Stay tuned!

"CertificateStatus",
"UpdatableAttribute",
"UpdatableAsset",
Expand Down
Loading
Loading