Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ ATLAN_WORKFLOW_MAX_TIMEOUT_HOURS=1
ATLAN_MAX_CONCURRENT_ACTIVITIES=5
ATLAN_APP_DASHBOARD_HOST=localhost
ATLAN_APP_DASHBOARD_PORT=3000
AUTOMATION_ENGINE_API_HOST=localhost
AUTOMATION_ENGINE_API_PORT=8080
ATLAN_APP_HTTP_HOST=0.0.0.0
ATLAN_APP_HTTP_PORT=8000
ATLAN_WORKFLOW_METRICS_PORT=8234
Expand Down
16 changes: 16 additions & 0 deletions application_sdk/application/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Dict, List, Optional, Tuple, Type

from application_sdk.activities import ActivitiesInterface
from application_sdk.clients.base import BaseClient
from application_sdk.clients.utils import get_workflow_client
from application_sdk.constants import ENABLE_MCP
from application_sdk.decorators.automation_activity import ACTIVITY_SPECS, flush_activity_registrations
from application_sdk.events.models import EventRegistration
from application_sdk.handlers.base import BaseHandler
from application_sdk.observability.logger_adaptor import get_logger
Expand Down Expand Up @@ -152,6 +154,20 @@ async def setup_workflow(
workflow_and_activities_classes=workflow_and_activities_classes
)

# Register activities via HTTP API for automation engine (non-blocking)
# The 5 second delay allows the automation engine's server to come up
async def _register_activities_with_delay():
"""Register activities after a 5 second delay to allow automation
engine server to start."""
await asyncio.sleep(1)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Comment states 5 seconds but code sleeps 1

The comment on line 158 states "The 5 second delay allows the automation engine's server to come up" and the docstring on line 160 says "after a 5 second delay", but the actual sleep on line 162 is only 1 second. This mismatch suggests either incomplete testing or a last-minute change that wasn't reflected in documentation.

Fix in Cursor Fix in Web

await asyncio.to_thread(
flush_activity_registrations,
app_name=self.application_name,
activity_specs=ACTIVITY_SPECS,
)

asyncio.create_task(_register_activities_with_delay())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Background task lacks exception handler

The background task created with asyncio.create_task has no exception handler. If flush_activity_registrations raises an exception (which it does at lines 205-207 in automation_activity.py), it becomes an unhandled exception that will only surface when the event loop terminates. This contradicts the "non-blocking" intent stated in the comment and PR description. The task should wrap the call in try-except to log failures without crashing.

Fix in Cursor Fix in Web

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Fire-and-forget task risks early garbage collection

Creating a task without storing a reference to it risks the task being garbage collected before completion in some Python implementations. While CPython typically keeps running tasks alive, storing the task reference ensures the registration completes reliably across all implementations, similar to how _token_refresh_task is stored in the temporal client.

Fix in Cursor Fix in Web

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Background task exceptions silently ignored

The background task created with asyncio.create_task has no exception handler or reference saved, so exceptions raised by flush_activity_registrations will be silently lost. While the function catches and re-raises exceptions on line 207-209, these will become unhandled exceptions that only appear in asyncio warnings, making debugging difficult.

Fix in Cursor Fix in Web


async def start_workflow(self, workflow_args, workflow_class) -> Any:
"""
Start a new workflow execution.
Expand Down
8 changes: 8 additions & 0 deletions application_sdk/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@
SQL_SERVER_MIN_VERSION = os.getenv("ATLAN_SQL_SERVER_MIN_VERSION")
#: Path to the SQL queries directory
SQL_QUERIES_PATH = os.getenv("ATLAN_SQL_QUERIES_PATH", "app/sql")
#: Host address for the automation engine API server
AUTOMATION_ENGINE_API_HOST = os.getenv("AUTOMATION_ENGINE_API_HOST", "localhost")
#: Port number for the automation engine API server
AUTOMATION_ENGINE_API_PORT = os.getenv("AUTOMATION_ENGINE_API_PORT", "8080")
#: Base URL for automation engine API server (constructed from host and port)
AUTOMATION_ENGINE_API_URL = (
f"http://{AUTOMATION_ENGINE_API_HOST}:{AUTOMATION_ENGINE_API_PORT}"
)

# Output Path Constants
#: Output path format for workflows.
Expand Down
207 changes: 207 additions & 0 deletions application_sdk/decorators/automation_activity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
"""
Activity registration decorator for automatic registration with the automation engine.
"""

import inspect
import json
from typing import Any, Callable, Dict, List, Union, get_args, get_origin

from pydantic import BaseModel
import requests

from application_sdk.constants import AUTOMATION_ENGINE_API_URL
from application_sdk.observability.logger_adaptor import get_logger

logger = get_logger(__name__)


# Global registry to collect decorated activities
ACTIVITY_SPECS: List[dict[str, Any]] = []
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Activity registry shared across multiple apps

The global ACTIVITY_SPECS list is shared across all application instances in the same Python process. When multiple apps register activities, they'll all be added to the same registry, causing each app to register not only its own activities but also activities from other apps, leading to incorrect registrations.

Fix in Cursor Fix in Web



def _type_to_json_schema(annotation: Any) -> Dict[str, Any]:
"""Convert Python type annotation to JSON Schema using reflection."""
if inspect.isclass(annotation) and issubclass(annotation, BaseModel):
schema = annotation.model_json_schema(mode="serialization")
if "$defs" in schema:
defs = schema.pop("$defs")
schema = {**schema, **defs}
return schema

origin = get_origin(annotation)
if origin is Union:
args = get_args(annotation)
if type(None) in args:
non_none_types = [arg for arg in args if arg is not type(None)]
if non_none_types:
schema = _type_to_json_schema(non_none_types[0])
schema["nullable"] = True
return schema
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Non-Optional Union types return empty schema

When processing Union types that don't include None (like Union[str, int]), the code checks if type(None) in args at line 33, which evaluates to False, causing the function to fall through without returning a schema. This results in an empty schema {} being returned instead of a proper union schema. Non-Optional union types are valid Python type hints but are completely ignored by this implementation.

Fix in Cursor Fix in Web


if origin is dict or annotation is dict:
args = get_args(annotation)
if args:
return {
"type": "object",
"additionalProperties": _type_to_json_schema(args[1]),
}
return {"type": "object"}

if origin is list or annotation is list:
args = get_args(annotation)
if args:
return {"type": "array", "items": _type_to_json_schema(args[0])}
return {"type": "array"}

type_mapping = {
str: {"type": "string"},
int: {"type": "integer"},
float: {"type": "number"},
bool: {"type": "boolean"},
Any: {},
}

if annotation in type_mapping:
return type_mapping[annotation]

return {}


def _generate_input_schema(func: Any) -> Dict[str, Any]:
"""Generate JSON Schema for function inputs using reflection."""
sig = inspect.signature(func)
properties = {}
required: list[str] = []

for param_name, param in sig.parameters.items():
if param_name == "self":
continue

param_schema = (
_type_to_json_schema(param.annotation)
if param.annotation != inspect.Parameter.empty
else {}
)

if param.default != inspect.Parameter.empty:
if isinstance(param.default, (str, int, float, bool, type(None))):
param_schema["default"] = param.default
else:
required.append(param_name)

properties[param_name] = param_schema

schema = {"type": "object", "properties": properties}
if required:
schema["required"] = required

return schema


def _generate_output_schema(func: Any) -> Dict[str, Any]:
"""Generate JSON Schema for function outputs using reflection."""
return_annotation = inspect.signature(func).return_annotation
if return_annotation == inspect.Signature.empty:
return {}
return _type_to_json_schema(return_annotation)


def automation_activity(
name: str,
description: str,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""Decorator to mark an activity for automatic registration."""

def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
input_schema: dict[str, Any] = _generate_input_schema(func)
output_schema: dict[str, Any] = _generate_output_schema(func)

logger.info(f"Collected automation activity: {name}")
ACTIVITY_SPECS.append(
{
"name": name,
"description": description,
"func": func,
"input_schema": json.dumps(input_schema) if input_schema else None,
"output_schema": json.dumps(output_schema) if output_schema else None,
}
)
return func

return decorator


def flush_activity_registrations(
app_name: str,
activity_specs: List[dict[str, Any]],
) -> None:
"""Flush all collected registrations by calling the activities create API via HTTP."""
if not activity_specs:
logger.info("No activities to register")
return

if not AUTOMATION_ENGINE_API_URL:
logger.warning(
"Automation engine API URL not configured. Skipping activity registration."
)
return

# Perform health check first
try:
health_check_url: str = f"{AUTOMATION_ENGINE_API_URL}/api/health"
health_response: requests.Response = requests.get(health_check_url, timeout=5.0)
health_response.raise_for_status()
logger.info("Automation engine health check passed")
except Exception as e:
logger.warning(
f"Automation engine health check failed: {e}. "
"Skipping activity registration. "
"Check if the automation engine is deployed and accessible."
)
return

logger.info(
f"Registering {len(ACTIVITY_SPECS)} activities with automation engine"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Wrong variable used in log message

The log message incorrectly references the global ACTIVITY_SPECS instead of the activity_specs parameter passed to the function. This causes the logged count to reflect all globally registered activities rather than the specific activities being registered in this call, potentially misleading during debugging.

Fix in Cursor Fix in Web

)

# Generate app qualified name
app_qualified_name: str = f"default/apps/{app_name}"

# Build tools payload without function objects (not JSON serializable)
tools = [
{
"name": item["name"],
"description": item["description"],
"input_schema": item["input_schema"],
"output_schema": item["output_schema"],
}
for item in activity_specs
]

payload = {
"app_qualified_name": app_qualified_name,
"app_name": app_name,
"tools": tools,
}

try:
response: requests.Response = requests.post(
f"{AUTOMATION_ENGINE_API_URL}/api/tools",
json=payload,
timeout=30.0,
)
response.raise_for_status()
result = response.json()

if result.get("status") == "success":
logger.info(
f"Successfully registered {len(tools)} activities with automation engine"
)
else:
logger.warning(
f"Failed to register activities with automation engine: {result.get('message')}"
)
except Exception as e:
raise Exception(
f"Failed to register activities with automation engine: {e}"
) from e
Loading