Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions application_sdk/activities/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import os
from abc import ABC
from datetime import datetime, timedelta
from typing import Any, Dict, Generic, Optional, TypeVar

from pydantic import BaseModel
Expand Down Expand Up @@ -62,6 +63,7 @@ class ActivitiesState(BaseModel, Generic[HandlerType]):
model_config = {"arbitrary_types_allowed": True}
handler: Optional[HandlerType] = None
workflow_args: Optional[Dict[str, Any]] = None
last_updated_timestamp: Optional[datetime] = None


ActivitiesStateType = TypeVar("ActivitiesStateType", bound=ActivitiesState)
Expand Down Expand Up @@ -113,12 +115,15 @@ async def _set_state(self, workflow_args: Dict[str, Any]) -> None:
Note:
The workflow ID is automatically retrieved from the current activity context.
If no state exists for the current workflow, a new one will be created.
This method also updates the last_updated_timestamp to enable time-based
state refresh functionality.
"""
workflow_id = get_workflow_id()
if not self._state.get(workflow_id):
self._state[workflow_id] = ActivitiesState()

self._state[workflow_id].workflow_args = workflow_args
self._state[workflow_id].last_updated_timestamp = datetime.now()

async def _get_state(self, workflow_args: Dict[str, Any]) -> ActivitiesStateType:
"""Retrieve the state for the current workflow.
Expand All @@ -142,6 +147,15 @@ async def _get_state(self, workflow_args: Dict[str, Any]) -> ActivitiesStateType
workflow_id = get_workflow_id()
if workflow_id not in self._state:
await self._set_state(workflow_args)

else:
current_timestamp = datetime.now()
# if difference of current_timestamp and last_updated_timestamp is greater than 15 minutes, then again _set_state
last_updated = self._state[workflow_id].last_updated_timestamp
if last_updated and current_timestamp - last_updated > timedelta(
minutes=15
):
await self._set_state(workflow_args)
return self._state[workflow_id]
except OrchestratorError as e:
logger.error(
Expand Down
42 changes: 40 additions & 2 deletions application_sdk/activities/metadata_extraction/sql.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from datetime import datetime
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -68,6 +69,7 @@ class BaseSQLMetadataExtractionActivitiesState(ActivitiesState):
sql_client: Optional[BaseSQLClient] = None
handler: Optional[BaseSQLHandler] = None
transformer: Optional[TransformerInterface] = None
last_updated_timestamp: Optional[datetime] = None


class BaseSQLMetadataExtractionActivities(ActivitiesInterface):
Expand Down Expand Up @@ -157,13 +159,30 @@ async def _set_state(self, workflow_args: Dict[str, Any]):

Args:
workflow_args (Dict[str, Any]): Arguments passed to the workflow.

Note:
This method creates and configures the new SQL client before closing
the old one to ensure state is never left with a closed client if
initialization fails. The timestamp is only updated after the new
client is successfully created and assigned.
"""
workflow_id = get_workflow_id()
if not self._state.get(workflow_id):
self._state[workflow_id] = BaseSQLMetadataExtractionActivitiesState()

await super()._set_state(workflow_args)
existing_state = self._state[workflow_id]

# Update workflow_args early, but preserve old timestamp until new client is ready
# This ensures that if initialization fails, the state can still be refreshed
existing_state.workflow_args = workflow_args

# Store reference to old client for cleanup after new client is ready
old_sql_client = None
if existing_state and existing_state.sql_client is not None:
old_sql_client = existing_state.sql_client

# Create and configure new client BEFORE closing old one
# This ensures state is never left with a closed client if initialization fails
sql_client = self.sql_client_class()

# Load credentials BEFORE creating handler to avoid race condition
Expand All @@ -173,10 +192,29 @@ async def _set_state(self, workflow_args: Dict[str, Any]):
)
await sql_client.load(credentials)

# Assign sql_client and handler to state AFTER credentials are loaded
# Only after new client is successfully created and configured,
# close old client and assign new one to state
if old_sql_client is not None:
try:
await old_sql_client.close()
logger.debug(
f"Closed existing SQL client for workflow {workflow_id} during state refresh"
)
except Exception as e:
logger.warning(
f"Failed to close existing SQL client for workflow {workflow_id}: {e}",
exc_info=True,
)
# Continue even if close fails - new client is already ready

# Assign sql_client and handler to state AFTER new client is ready
self._state[workflow_id].sql_client = sql_client
handler = self.handler_class(sql_client)
self._state[workflow_id].handler = handler
# Update timestamp only after successful client creation and assignment
# This ensures that if initialization fails, the old timestamp remains
# and the state can be refreshed again immediately
self._state[workflow_id].last_updated_timestamp = datetime.now()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Partial state initialization prevents automatic state refresh

The refactored _set_state method creates the state object and sets workflow_args (lines 170-177) before initializing the SQL client. If client initialization fails (lines 186-193), last_updated_timestamp is never set (line 217 not reached). On subsequent calls to _get_state, the base class check if last_updated and ... evaluates to False when last_updated is None, so it returns the broken state without attempting re-initialization. The old code called super()._set_state() first, which set last_updated_timestamp before client creation could fail, allowing time-based refresh to eventually recover. The fix could be to set last_updated_timestamp earlier, or add a check for incomplete state (e.g., sql_client is None).

Additional Locations (1)

Fix in Cursor Fix in Web


# Create transformer with required parameters from ApplicationConstants
transformer_params = {
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ classifiers = [
keywords = ["atlan", "sdk", "platform", "app", "development"]
dependencies = [
"aiohttp>=3.10.0",
"opentelemetry-exporter-otlp>=1.27.0",
"opentelemetry-exporter-otlp==1.38.0",
"psutil>=7.0.0",
"fastapi[standard]==0.120.2",
"pyatlan>=8.0.2",
Expand Down
Loading
Loading