-
Notifications
You must be signed in to change notification settings - Fork 11
fix: time based state refresh for each activity #796
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
1d75391
76ab9fd
e3ec139
e4c2e07
54470ba
f3ebaed
2c08a4b
d367dfe
d10af7e
5d1e49f
c91d6e4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,5 @@ | ||
| import os | ||
| from datetime import datetime | ||
| from typing import ( | ||
| TYPE_CHECKING, | ||
| Any, | ||
|
|
@@ -68,6 +69,7 @@ class BaseSQLMetadataExtractionActivitiesState(ActivitiesState): | |
| sql_client: Optional[BaseSQLClient] = None | ||
| handler: Optional[BaseSQLHandler] = None | ||
| transformer: Optional[TransformerInterface] = None | ||
| last_updated_timestamp: Optional[datetime] = None | ||
|
|
||
|
|
||
| class BaseSQLMetadataExtractionActivities(ActivitiesInterface): | ||
|
|
@@ -157,13 +159,30 @@ async def _set_state(self, workflow_args: Dict[str, Any]): | |
|
|
||
| Args: | ||
| workflow_args (Dict[str, Any]): Arguments passed to the workflow. | ||
|
|
||
| Note: | ||
| This method creates and configures the new SQL client before closing | ||
| the old one to ensure state is never left with a closed client if | ||
| initialization fails. The timestamp is only updated after the new | ||
| client is successfully created and assigned. | ||
| """ | ||
| workflow_id = get_workflow_id() | ||
| if not self._state.get(workflow_id): | ||
| self._state[workflow_id] = BaseSQLMetadataExtractionActivitiesState() | ||
|
|
||
| await super()._set_state(workflow_args) | ||
| existing_state = self._state[workflow_id] | ||
|
|
||
| # Update workflow_args early, but preserve old timestamp until new client is ready | ||
| # This ensures that if initialization fails, the state can still be refreshed | ||
| existing_state.workflow_args = workflow_args | ||
|
|
||
| # Store reference to old client for cleanup after new client is ready | ||
| old_sql_client = None | ||
| if existing_state and existing_state.sql_client is not None: | ||
| old_sql_client = existing_state.sql_client | ||
|
|
||
| # Create and configure new client BEFORE closing old one | ||
| # This ensures state is never left with a closed client if initialization fails | ||
| sql_client = self.sql_client_class() | ||
|
|
||
| # Load credentials BEFORE creating handler to avoid race condition | ||
|
|
@@ -173,10 +192,29 @@ async def _set_state(self, workflow_args: Dict[str, Any]): | |
| ) | ||
| await sql_client.load(credentials) | ||
|
|
||
| # Assign sql_client and handler to state AFTER credentials are loaded | ||
| # Only after new client is successfully created and configured, | ||
| # close old client and assign new one to state | ||
| if old_sql_client is not None: | ||
| try: | ||
| await old_sql_client.close() | ||
| logger.debug( | ||
| f"Closed existing SQL client for workflow {workflow_id} during state refresh" | ||
| ) | ||
| except Exception as e: | ||
| logger.warning( | ||
| f"Failed to close existing SQL client for workflow {workflow_id}: {e}", | ||
| exc_info=True, | ||
| ) | ||
| # Continue even if close fails - new client is already ready | ||
|
|
||
| # Assign sql_client and handler to state AFTER new client is ready | ||
| self._state[workflow_id].sql_client = sql_client | ||
| handler = self.handler_class(sql_client) | ||
| self._state[workflow_id].handler = handler | ||
| # Update timestamp only after successful client creation and assignment | ||
| # This ensures that if initialization fails, the old timestamp remains | ||
| # and the state can be refreshed again immediately | ||
| self._state[workflow_id].last_updated_timestamp = datetime.now() | ||
abhishekagrawal-atlan marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Partial state initialization prevents automatic state refreshThe refactored Additional Locations (1) |
||
|
|
||
| # Create transformer with required parameters from ApplicationConstants | ||
| transformer_params = { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.