diff --git a/agentex/database/migrations/alembic/versions/2025_11_04_1923_add_unhealthy_status_a5d67f2d7356.py b/agentex/database/migrations/alembic/versions/2025_11_04_1923_add_unhealthy_status_a5d67f2d7356.py new file mode 100644 index 00000000..c5a708c6 --- /dev/null +++ b/agentex/database/migrations/alembic/versions/2025_11_04_1923_add_unhealthy_status_a5d67f2d7356.py @@ -0,0 +1,32 @@ +"""add unhealthy status + +Revision ID: a5d67f2d7356 +Revises: 329fbafa4ff9 +Create Date: 2025-11-04 19:23:22.904744 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'a5d67f2d7356' +down_revision: Union[str, None] = '329fbafa4ff9' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.execute(""" + ALTER TYPE agentstatus ADD VALUE IF NOT EXISTS 'UNHEALTHY'; + """) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + pass + # ### end Alembic commands ### diff --git a/agentex/database/migrations/alembic/versions/2025_11_04_2340_add_agent_input_type_24429f13b8bd.py b/agentex/database/migrations/alembic/versions/2025_11_04_2340_add_agent_input_type_24429f13b8bd.py new file mode 100644 index 00000000..a7b18706 --- /dev/null +++ b/agentex/database/migrations/alembic/versions/2025_11_04_2340_add_agent_input_type_24429f13b8bd.py @@ -0,0 +1,39 @@ +"""add agent input type + +Revision ID: 24429f13b8bd +Revises: a5d67f2d7356 +Create Date: 2025-11-04 23:40:10.340272 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '24429f13b8bd' +down_revision: Union[str, None] = 'a5d67f2d7356' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + # Alembic doesn't create enum types on table updates, so we need to do it manually + op.execute(""" + DO $$ + BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'agentinputtype') THEN + CREATE TYPE agentinputtype AS ENUM ('TEXT', 'JSON'); + END IF; + END $$; + """) + op.add_column('agents', sa.Column('agent_input_type', sa.Enum('TEXT', 'JSON', name='agentinputtype', create_type=False), nullable=True)) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column('agents', 'agent_input_type') + # ### end Alembic commands ### diff --git a/agentex/database/migrations/migration_history.txt b/agentex/database/migrations/migration_history.txt index b6a9c8ac..529c9222 100644 --- a/agentex/database/migrations/migration_history.txt +++ b/agentex/database/migrations/migration_history.txt @@ -1,4 +1,6 @@ -d7addd4229e8 -> 329fbafa4ff9 (head), change_default_acp_to_async +a5d67f2d7356 -> 24429f13b8bd (head), add agent input type +329fbafa4ff9 -> a5d67f2d7356, add unhealthy status +d7addd4229e8 -> 329fbafa4ff9, change_default_acp_to_async 09368a02d6cc -> d7addd4229e8, soft delete status 739800d3e1ce -> 09368a02d6cc, deployment history dbac39ab82c3 -> 739800d3e1ce, registration metadata diff --git a/agentex/docker-compose.yml b/agentex/docker-compose.yml index 4635ac69..d0af4d2f 100644 --- a/agentex/docker-compose.yml +++ b/agentex/docker-compose.yml @@ -149,6 +149,8 @@ services: - MONGODB_URI=mongodb://agentex-mongodb:27017 - MONGODB_DATABASE_NAME=agentex - WATCHFILES_FORCE_POLLING=true + - ENABLE_HEALTH_CHECK_WORKFLOW=true + - AGENTEX_SERVER_TASK_QUEUE=agentex-server ports: - "5003:5003" volumes: @@ -187,6 +189,45 @@ services: retries: 5 start_period: 30s + agentex-temporal-worker: + container_name: agentex-temporal-worker + build: + context: .. + dockerfile: agentex/Dockerfile + target: dev + environment: + - ENVIRONMENT=development + - DATABASE_URL=postgresql://postgres:postgres@agentex-postgres:5432/agentex + - TEMPORAL_ADDRESS=agentex-temporal:7233 + - TEMPORAL_HOST=agentex-temporal + - REDIS_URL=redis://agentex-redis:6379 + - MONGODB_URI=mongodb://agentex-mongodb:27017 + - MONGODB_DATABASE_NAME=agentex + - AGENTEX_SERVER_TASK_QUEUE=agentex-server + volumes: + - .:/app:cached + depends_on: + agentex-temporal: + condition: service_healthy + agentex-redis: + condition: service_healthy + agentex-postgres: + condition: service_healthy + agentex-mongodb: + condition: service_healthy + networks: + - agentex-network + command: | + bash -c " + echo 'Starting Temporal Worker...' && + export TEMPORAL_ADDRESS=agentex-temporal:7233 && + export TEMPORAL_HOST=agentex-temporal && + export MONGODB_URI=mongodb://agentex-mongodb:27017 && + python src/temporal/run_worker.py + " + user: root + restart: unless-stopped + volumes: agentex-temporal-postgres-data: agentex-postgres-data: diff --git a/agentex/src/adapters/orm.py b/agentex/src/adapters/orm.py index 3eb45e9e..e83f09bf 100644 --- a/agentex/src/adapters/orm.py +++ b/agentex/src/adapters/orm.py @@ -17,7 +17,7 @@ from sqlalchemy.orm import relationship from src.domain.entities.agent_api_keys import AgentAPIKeyType -from src.domain.entities.agents import AgentStatus +from src.domain.entities.agents import AgentInputType, AgentStatus from src.domain.entities.tasks import TaskStatus from src.utils.ids import orm_id @@ -41,6 +41,7 @@ class AgentORM(BaseORM): ) registration_metadata = Column(JSONB, nullable=True) registered_at = Column(DateTime(timezone=True), nullable=True) + agent_input_type = Column(SQLAlchemyEnum(AgentInputType), nullable=True) # Many-to-Many relationship with tasks tasks = relationship("TaskORM", secondary="task_agents", back_populates="agents") diff --git a/agentex/src/adapters/temporal/__init__.py b/agentex/src/adapters/temporal/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/agentex/src/adapters/temporal/adapter_temporal.py b/agentex/src/adapters/temporal/adapter_temporal.py new file mode 100644 index 00000000..6c372cb2 --- /dev/null +++ b/agentex/src/adapters/temporal/adapter_temporal.py @@ -0,0 +1,353 @@ +from datetime import timedelta +from typing import Annotated, Any + +from fastapi import Depends +from temporalio.client import ( + Client, + WorkflowExecution, + WorkflowHandle, +) +from temporalio.common import RetryPolicy, WorkflowIDReusePolicy +from temporalio.exceptions import ( + WorkflowAlreadyStartedError, +) + +from src.adapters.temporal.exceptions import ( + TemporalCancelError, + TemporalConnectionError, + TemporalError, + TemporalInvalidArgumentError, + TemporalQueryError, + TemporalSignalError, + TemporalTerminateError, + TemporalWorkflowAlreadyExistsError, + TemporalWorkflowError, + TemporalWorkflowNotFoundError, +) +from src.adapters.temporal.port import TemporalGateway +from src.utils.logging import make_logger + +logger = make_logger(__name__) + + +class TemporalAdapter(TemporalGateway): + """ + Implementation of the TemporalGateway interface using the Temporal Python SDK. + Provides a clean abstraction over Temporal client operations with proper error handling. + """ + + def __init__(self, temporal_client: Client | None = None): + """ + Initialize the Temporal adapter with a client instance. + + Args: + temporal_client: The Temporal client (may be None if not configured) + """ + self.client = temporal_client + + async def start_workflow( + self, + workflow: str | type, + workflow_id: str, + args: list[Any] | None = None, + task_queue: str | None = None, + execution_timeout: timedelta | None = None, + retry_policy: RetryPolicy | None = None, + id_reuse_policy: WorkflowIDReusePolicy | None = None, + start_delay: timedelta | None = None, + ) -> WorkflowHandle: + """ + Start a new workflow execution. + """ + if not self.client: + raise TemporalConnectionError("Temporal client is not connected") + + try: + # Build workflow options + options: dict[str, Any] = { + "id": workflow_id, + } + + if task_queue: + options["task_queue"] = task_queue + if execution_timeout: + options["execution_timeout"] = execution_timeout + if retry_policy: + options["retry_policy"] = retry_policy + if id_reuse_policy: + options["id_reuse_policy"] = id_reuse_policy + if start_delay: + options["start_delay"] = start_delay + + # Start the workflow + handle = await self.client.start_workflow( + workflow, + *args if args else [], + **options, + ) + + logger.info(f"Started workflow {workflow_id} successfully") + return handle + + except WorkflowAlreadyStartedError as e: + logger.error(f"Workflow {workflow_id} already exists: {e}") + raise TemporalWorkflowAlreadyExistsError( + message=f"Workflow with ID '{workflow_id}' already exists", + detail=str(e), + ) from e + except ValueError as e: + logger.error(f"Invalid arguments for workflow {workflow_id}: {e}") + raise TemporalInvalidArgumentError( + message=f"Invalid arguments provided for workflow '{workflow_id}'", + detail=str(e), + ) from e + except Exception as e: + logger.error(f"Failed to start workflow {workflow_id}: {e}") + raise TemporalWorkflowError( + message=f"Failed to start workflow '{workflow_id}'", + detail=str(e), + ) from e + + async def get_workflow_handle( + self, + workflow_id: str, + run_id: str | None = None, + first_execution_run_id: str | None = None, + ) -> WorkflowHandle: + """ + Get a handle to an existing workflow. + """ + if not self.client: + raise TemporalConnectionError("Temporal client is not connected") + + try: + handle = self.client.get_workflow_handle( + workflow_id, + run_id=run_id, + first_execution_run_id=first_execution_run_id, + ) + return handle + except Exception as e: + logger.error(f"Failed to get workflow handle for {workflow_id}: {e}") + raise TemporalWorkflowNotFoundError( + message=f"Workflow '{workflow_id}' not found", + detail=str(e), + ) from e + + async def signal_workflow( + self, + workflow_id: str, + signal: str, + arg: Any = None, + run_id: str | None = None, + ) -> None: + """ + Send a signal to a running workflow. + """ + if not self.client: + raise TemporalConnectionError("Temporal client is not connected") + + try: + handle = self.client.get_workflow_handle(workflow_id, run_id=run_id) + await handle.signal(signal, arg) + logger.info(f"Sent signal '{signal}' to workflow {workflow_id}") + except Exception as e: + if "not found" in str(e).lower(): + logger.error(f"Workflow {workflow_id} not found: {e}") + raise TemporalWorkflowNotFoundError( + message=f"Workflow '{workflow_id}' not found", + detail=str(e), + ) from e + logger.error(f"Failed to signal workflow {workflow_id}: {e}") + raise TemporalSignalError( + message=f"Failed to send signal '{signal}' to workflow '{workflow_id}'", + detail=str(e), + ) from e + + async def query_workflow( + self, + workflow_id: str, + query: str, + arg: Any = None, + run_id: str | None = None, + ) -> Any: + """ + Query a workflow for its current state. + """ + if not self.client: + raise TemporalConnectionError("Temporal client is not connected") + + try: + handle = self.client.get_workflow_handle(workflow_id, run_id=run_id) + result = await handle.query(query, arg) + logger.info(f"Queried workflow {workflow_id} with query '{query}'") + return result + except Exception as e: + if "not found" in str(e).lower(): + logger.error(f"Workflow {workflow_id} not found: {e}") + raise TemporalWorkflowNotFoundError( + message=f"Workflow '{workflow_id}' not found", + detail=str(e), + ) from e + logger.error(f"Failed to query workflow {workflow_id}: {e}") + raise TemporalQueryError( + message=f"Failed to execute query '{query}' on workflow '{workflow_id}'", + detail=str(e), + ) from e + + async def cancel_workflow( + self, + workflow_id: str, + run_id: str | None = None, + ) -> None: + """ + Cancel a running workflow. + """ + if not self.client: + raise TemporalConnectionError("Temporal client is not connected") + + try: + handle = self.client.get_workflow_handle(workflow_id, run_id=run_id) + await handle.cancel() + logger.info(f"Cancelled workflow {workflow_id}") + except Exception as e: + if "not found" in str(e).lower(): + logger.error(f"Workflow {workflow_id} not found: {e}") + raise TemporalWorkflowNotFoundError( + message=f"Workflow '{workflow_id}' not found", + detail=str(e), + ) from e + logger.error(f"Failed to cancel workflow {workflow_id}: {e}") + raise TemporalCancelError( + message=f"Failed to cancel workflow '{workflow_id}'", + detail=str(e), + ) from e + + async def terminate_workflow( + self, + workflow_id: str, + reason: str | None = None, + run_id: str | None = None, + ) -> None: + """ + Terminate a running workflow immediately. + """ + if not self.client: + raise TemporalConnectionError("Temporal client is not connected") + + try: + handle = self.client.get_workflow_handle(workflow_id, run_id=run_id) + await handle.terminate(reason=reason) + logger.info(f"Terminated workflow {workflow_id} with reason: {reason}") + except Exception as e: + if "not found" in str(e).lower(): + logger.error(f"Workflow {workflow_id} not found: {e}") + raise TemporalWorkflowNotFoundError( + message=f"Workflow '{workflow_id}' not found", + detail=str(e), + ) from e + logger.error(f"Failed to terminate workflow {workflow_id}: {e}") + raise TemporalTerminateError( + message=f"Failed to terminate workflow '{workflow_id}'", + detail=str(e), + ) from e + + async def describe_workflow( + self, + workflow_id: str, + run_id: str | None = None, + ) -> WorkflowExecution: + """ + Get detailed information about a workflow execution. + """ + if not self.client: + raise TemporalConnectionError("Temporal client is not connected") + + try: + handle = self.client.get_workflow_handle(workflow_id, run_id=run_id) + description = await handle.describe() + logger.info(f"Retrieved description for workflow {workflow_id}") + return description + except Exception as e: + if "not found" in str(e).lower(): + logger.error(f"Workflow {workflow_id} not found: {e}") + raise TemporalWorkflowNotFoundError( + message=f"Workflow '{workflow_id}' not found", + detail=str(e), + ) from e + logger.error(f"Failed to describe workflow {workflow_id}: {e}") + raise TemporalWorkflowError( + message=f"Failed to describe workflow '{workflow_id}'", + detail=str(e), + ) from e + + async def list_workflows( + self, + query: str | None = None, + page_size: int = 100, + ) -> list[WorkflowExecution]: + """ + List workflow executions matching the query. + """ + if not self.client: + raise TemporalConnectionError("Temporal client is not connected") + + try: + workflows = [] + async for workflow in self.client.list_workflows( + query=query, + page_size=page_size, + ): + workflows.append(workflow) + if len(workflows) >= page_size: + break + + logger.info(f"Listed {len(workflows)} workflows") + return workflows + except Exception as e: + logger.error(f"Failed to list workflows: {e}") + raise TemporalError( + message="Failed to list workflows", + detail=str(e), + ) from e + + async def get_client(self) -> Client: + """ + Get the underlying Temporal client instance. + """ + if not self.client: + raise TemporalConnectionError("Temporal client is not connected") + return self.client + + async def is_connected(self) -> bool: + """ + Check if the Temporal client is connected and healthy. + """ + if not self.client: + return False + + try: + # Try to list workflows with a very small limit to test connectivity + async for _ in self.client.list_workflows(page_size=1): + break + return True + except Exception as e: + logger.warning(f"Temporal connectivity check failed: {e}") + return False + + +# Dependency injection annotation for FastAPI +async def get_temporal_adapter() -> TemporalAdapter: + """ + Factory function for dependency injection. + Gets the temporal client from global dependencies. + """ + from src.config.dependencies import GlobalDependencies + + global_deps = GlobalDependencies() + await global_deps.load() + client = global_deps.temporal_client + return TemporalAdapter(temporal_client=client) + + +DTemporalAdapter = Annotated[TemporalAdapter, Depends(get_temporal_adapter)] diff --git a/agentex/src/adapters/temporal/client_factory.py b/agentex/src/adapters/temporal/client_factory.py new file mode 100644 index 00000000..f3e64259 --- /dev/null +++ b/agentex/src/adapters/temporal/client_factory.py @@ -0,0 +1,204 @@ +""" +Temporal client factory for creating and configuring Temporal clients. +""" + +import dataclasses +import datetime +from typing import Any + +from temporalio.client import Client +from temporalio.converter import ( + AdvancedJSONEncoder, + CompositePayloadConverter, + DataConverter, + DefaultPayloadConverter, + JSONPlainPayloadConverter, + JSONTypeConverter, + _JSONTypeConverterUnhandled, +) +from temporalio.runtime import OpenTelemetryConfig, Runtime, TelemetryConfig + +from src.adapters.temporal.exceptions import TemporalConnectionError +from src.config.environment_variables import EnvironmentVariables +from src.utils.logging import make_logger + +logger = make_logger(__name__) + + +class DateTimeJSONEncoder(AdvancedJSONEncoder): + """Custom JSON encoder that handles datetime objects.""" + + def default(self, o: Any) -> Any: + if isinstance(o, datetime.datetime): + return o.isoformat() + return super().default(o) + + +class DateTimeJSONTypeConverter(JSONTypeConverter): + """Custom JSON type converter for datetime objects.""" + + def to_typed_value( + self, hint: type, value: Any + ) -> Any | None | _JSONTypeConverterUnhandled: + if hint == datetime.datetime: + return datetime.datetime.fromisoformat(value) + return JSONTypeConverter.Unhandled + + +class DateTimePayloadConverter(CompositePayloadConverter): + """Custom payload converter that handles datetime serialization.""" + + def __init__(self) -> None: + json_converter = JSONPlainPayloadConverter( + encoder=DateTimeJSONEncoder, + custom_type_converters=[DateTimeJSONTypeConverter()], + ) + super().__init__( + *[ + c if not isinstance(c, JSONPlainPayloadConverter) else json_converter + for c in DefaultPayloadConverter.default_encoding_payload_converters + ] + ) + + +# Custom data converter with datetime support +custom_data_converter = dataclasses.replace( + DataConverter.default, + payload_converter_class=DateTimePayloadConverter, +) + + +class TemporalClientFactory: + """ + Factory class for creating and configuring Temporal clients. + Provides a clean interface for client creation with proper error handling. + """ + + @staticmethod + async def create_client( + temporal_address: str, + temporal_namespace: str | None = None, + metrics_url: str | None = None, + data_converter: DataConverter | None = None, + ) -> Client: + """ + Create a Temporal client with the specified configuration. + + Args: + temporal_address: The Temporal server address + temporal_namespace: Optional namespace to connect to + metrics_url: Optional OpenTelemetry metrics endpoint + data_converter: Optional custom data converter + + Returns: + Configured Temporal client + + Raises: + TemporalConnectionError: If client creation fails + """ + if not temporal_address or temporal_address in [ + "false", + "False", + "null", + "None", + "", + "undefined", + ]: + raise TemporalConnectionError( + "Temporal address is not configured or is invalid" + ) + + try: + # Use custom data converter if not provided + if data_converter is None: + data_converter = custom_data_converter + + # Build connection options + connect_options = { + "target_host": temporal_address, + "data_converter": data_converter, + } + + if temporal_namespace: + connect_options["namespace"] = temporal_namespace + + # Add telemetry if metrics URL is provided + if metrics_url: + logger.info( + f"Configuring Temporal client with metrics URL: {metrics_url}" + ) + runtime = Runtime( + telemetry=TelemetryConfig( + metrics=OpenTelemetryConfig(url=metrics_url) + ) + ) + connect_options["runtime"] = runtime + + # Create the client + client = await Client.connect(**connect_options) + logger.info( + f"Successfully created Temporal client for address: {temporal_address}" + ) + return client + + except Exception as e: + logger.error(f"Failed to create Temporal client: {e}") + raise TemporalConnectionError( + message=f"Failed to connect to Temporal at {temporal_address}", + detail=str(e), + ) from e + + @staticmethod + async def create_client_from_env( + environment_variables: EnvironmentVariables | None = None, + metrics_url: str | None = None, + ) -> Client: + """ + Create a Temporal client using environment variables. + + Args: + environment_variables: Environment variables instance (will refresh if None) + metrics_url: Optional OpenTelemetry metrics endpoint + + Returns: + Configured Temporal client + + Raises: + TemporalConnectionError: If client creation fails + """ + if environment_variables is None: + environment_variables = EnvironmentVariables.refresh() + + return await TemporalClientFactory.create_client( + temporal_address=environment_variables.TEMPORAL_ADDRESS, + temporal_namespace=environment_variables.TEMPORAL_NAMESPACE, + metrics_url=metrics_url, + ) + + @staticmethod + def is_temporal_configured( + environment_variables: EnvironmentVariables | None = None, + ) -> bool: + """ + Check if Temporal is properly configured in environment variables. + + Args: + environment_variables: Environment variables instance (will refresh if None) + + Returns: + True if Temporal is configured, False otherwise + """ + if environment_variables is None: + environment_variables = EnvironmentVariables.refresh() + + temporal_address = environment_variables.TEMPORAL_ADDRESS + return temporal_address not in [ + "false", + "False", + "null", + "None", + "", + "undefined", + False, + None, + ] diff --git a/agentex/src/adapters/temporal/exceptions.py b/agentex/src/adapters/temporal/exceptions.py new file mode 100644 index 00000000..4618c62e --- /dev/null +++ b/agentex/src/adapters/temporal/exceptions.py @@ -0,0 +1,114 @@ +from src.domain.exceptions import ClientError, ServiceError + + +class TemporalError(ServiceError): + """Base exception for all Temporal-related errors.""" + + code = 500 + + +class TemporalConnectionError(ServiceError): + """ + Exception raised when the Temporal service connection fails. + This includes network issues, authentication failures, or service unavailability. + """ + + code = 503 # Service Unavailable + + +class TemporalWorkflowError(ServiceError): + """ + Exception raised when a workflow operation fails. + This is a general error for workflow-related issues. + """ + + code = 500 + + +class TemporalWorkflowNotFoundError(ClientError): + """ + Exception raised when attempting to access a workflow that doesn't exist. + """ + + code = 404 + + +class TemporalWorkflowAlreadyExistsError(ClientError): + """ + Exception raised when attempting to create a workflow with an ID that already exists. + """ + + code = 409 # Conflict + + +class TemporalSignalError(ServiceError): + """ + Exception raised when sending a signal to a workflow fails. + """ + + code = 500 + + +class TemporalQueryError(ServiceError): + """ + Exception raised when querying a workflow fails. + """ + + code = 500 + + +class TemporalCancelError(ServiceError): + """ + Exception raised when cancelling a workflow fails. + """ + + code = 500 + + +class TemporalTerminateError(ServiceError): + """ + Exception raised when terminating a workflow fails. + """ + + code = 500 + + +class TemporalScheduleError(ServiceError): + """ + Exception raised when a schedule operation fails. + This is a general error for schedule-related issues. + """ + + code = 500 + + +class TemporalScheduleNotFoundError(ClientError): + """ + Exception raised when attempting to access a schedule that doesn't exist. + """ + + code = 404 + + +class TemporalScheduleAlreadyExistsError(ClientError): + """ + Exception raised when attempting to create a schedule with an ID that already exists. + """ + + code = 409 # Conflict + + +class TemporalTimeoutError(ServiceError): + """ + Exception raised when a Temporal operation times out. + """ + + code = 504 # Gateway Timeout + + +class TemporalInvalidArgumentError(ClientError): + """ + Exception raised when invalid arguments are provided to a Temporal operation. + """ + + code = 400 # Bad Request diff --git a/agentex/src/adapters/temporal/port.py b/agentex/src/adapters/temporal/port.py new file mode 100644 index 00000000..1a9e5fe3 --- /dev/null +++ b/agentex/src/adapters/temporal/port.py @@ -0,0 +1,225 @@ +from abc import ABC, abstractmethod +from datetime import timedelta +from typing import Any + +from temporalio.client import ( + Client, + WorkflowExecution, + WorkflowHandle, +) +from temporalio.common import RetryPolicy, WorkflowIDReusePolicy + + +class TemporalGateway(ABC): + """ + Interface for Temporal workflow orchestration operations. + Provides abstraction over Temporal client operations for workflow and schedule management. + """ + + @abstractmethod + async def start_workflow( + self, + workflow: str | type, + workflow_id: str, + args: list[Any] | None = None, + task_queue: str | None = None, + execution_timeout: timedelta | None = None, + retry_policy: RetryPolicy | None = None, + id_reuse_policy: WorkflowIDReusePolicy | None = None, + ) -> WorkflowHandle: + """ + Start a new workflow execution. + + Args: + workflow: The workflow class or name to execute + workflow_id: Unique identifier for the workflow execution + args: Arguments to pass to the workflow + task_queue: Task queue to use for the workflow + execution_timeout: Maximum time for workflow execution + retry_policy: Retry policy for the workflow + id_reuse_policy: Policy for reusing workflow IDs + + Returns: + Handle to the started workflow + + Raises: + TemporalWorkflowError: If workflow fails to start + """ + pass + + @abstractmethod + async def get_workflow_handle( + self, + workflow_id: str, + run_id: str | None = None, + first_execution_run_id: str | None = None, + ) -> WorkflowHandle: + """ + Get a handle to an existing workflow. + + Args: + workflow_id: The workflow ID + run_id: Optional specific run ID + first_execution_run_id: Optional first execution run ID + + Returns: + Handle to the workflow + + Raises: + TemporalWorkflowNotFoundError: If workflow doesn't exist + """ + pass + + @abstractmethod + async def signal_workflow( + self, + workflow_id: str, + signal: str, + arg: Any = None, + run_id: str | None = None, + ) -> None: + """ + Send a signal to a running workflow. + + Args: + workflow_id: The workflow ID to signal + signal: The signal name + arg: Optional argument to send with the signal + run_id: Optional specific run ID + + Raises: + TemporalWorkflowNotFoundError: If workflow doesn't exist + TemporalSignalError: If signal fails + """ + pass + + @abstractmethod + async def query_workflow( + self, + workflow_id: str, + query: str, + arg: Any = None, + run_id: str | None = None, + ) -> Any: + """ + Query a workflow for its current state. + + Args: + workflow_id: The workflow ID to query + query: The query name + arg: Optional argument for the query + run_id: Optional specific run ID + + Returns: + The query result + + Raises: + TemporalWorkflowNotFoundError: If workflow doesn't exist + TemporalQueryError: If query fails + """ + pass + + @abstractmethod + async def cancel_workflow( + self, + workflow_id: str, + run_id: str | None = None, + ) -> None: + """ + Cancel a running workflow. + + Args: + workflow_id: The workflow ID to cancel + run_id: Optional specific run ID + + Raises: + TemporalWorkflowNotFoundError: If workflow doesn't exist + TemporalCancelError: If cancellation fails + """ + pass + + @abstractmethod + async def terminate_workflow( + self, + workflow_id: str, + reason: str | None = None, + run_id: str | None = None, + ) -> None: + """ + Terminate a running workflow immediately. + + Args: + workflow_id: The workflow ID to terminate + reason: Optional reason for termination + run_id: Optional specific run ID + + Raises: + TemporalWorkflowNotFoundError: If workflow doesn't exist + TemporalTerminateError: If termination fails + """ + pass + + @abstractmethod + async def describe_workflow( + self, + workflow_id: str, + run_id: str | None = None, + ) -> WorkflowExecution: + """ + Get detailed information about a workflow execution. + + Args: + workflow_id: The workflow ID + run_id: Optional specific run ID + + Returns: + Workflow execution details + + Raises: + TemporalWorkflowNotFoundError: If workflow doesn't exist + """ + pass + + @abstractmethod + async def list_workflows( + self, + query: str | None = None, + page_size: int = 100, + ) -> list[WorkflowExecution]: + """ + List workflow executions matching the query. + + Args: + query: Optional query string to filter workflows + page_size: Number of results per page + + Returns: + List of workflow executions + + Raises: + TemporalError: If listing fails + """ + pass + + @abstractmethod + async def get_client(self) -> Client: + """ + Get the underlying Temporal client instance. + + Returns: + The Temporal client + + Raises: + TemporalConnectionError: If client is not connected + """ + pass + + @abstractmethod + async def is_connected(self) -> bool: + """ + Check if the Temporal client is connected and healthy. + + Returns: + True if connected, False otherwise + """ + pass diff --git a/agentex/src/api/routes/agents.py b/agentex/src/api/routes/agents.py index 9a26e4c3..19524b2c 100644 --- a/agentex/src/api/routes/agents.py +++ b/agentex/src/api/routes/agents.py @@ -184,6 +184,7 @@ async def register_agent( agent_id=request.agent_id, acp_type=request.acp_type, registration_metadata=request.registration_metadata, + agent_input_type=request.agent_input_type, ) await authorization_service.grant( AgentexResource.agent(agent_entity.id), diff --git a/agentex/src/api/schemas/agents.py b/agentex/src/api/schemas/agents.py index 73f250e4..da1e57c5 100644 --- a/agentex/src/api/schemas/agents.py +++ b/agentex/src/api/schemas/agents.py @@ -12,6 +12,7 @@ class AgentStatus(str, Enum): FAILED = "Failed" UNKNOWN = "Unknown" DELETED = "Deleted" + UNHEALTHY = "Unhealthy" class ACPType(str, Enum): @@ -28,6 +29,11 @@ class AgentRPCMethod(str, Enum): TASK_CANCEL = "task/cancel" +class AgentInputType(str, Enum): + TEXT = "text" + JSON = "json" + + class Agent(BaseModel): id: str = Field(..., description="The unique identifier of the agent.") name: str = Field(..., description="The unique name of the agent.") @@ -57,6 +63,9 @@ class Agent(BaseModel): default=None, description="The timestamp when the agent was last registered", ) + agent_input_type: AgentInputType | None = Field( + default=None, description="The type of input the agent expects." + ) class Config: orm_mode = True @@ -80,6 +89,9 @@ class RegisterAgentRequest(BaseModel): default=None, description="The metadata for the agent's registration.", ) + agent_input_type: AgentInputType | None = Field( + default=None, description="The type of input the agent expects." + ) class RegisterAgentResponse(Agent): diff --git a/agentex/src/config/dependencies.py b/agentex/src/config/dependencies.py index 69601e0b..9ddf92a6 100644 --- a/agentex/src/config/dependencies.py +++ b/agentex/src/config/dependencies.py @@ -20,7 +20,6 @@ from src.config.environment_variables import Environment, EnvironmentVariables from src.utils.database import async_db_engine_creator from src.utils.logging import make_logger -from src.utils.temporal_client import get_temporal_client logger = make_logger(__name__) @@ -51,23 +50,17 @@ def __init__(self): self._loaded = False async def create_temporal_client(self): - if self.environment_variables.TEMPORAL_ADDRESS in [ - "false", - "False", - "null", - "None", - "", - "undefined", - False, - None, - ]: + # Import locally to avoid circular dependency + from src.adapters.temporal.client_factory import TemporalClientFactory + + if not TemporalClientFactory.is_temporal_configured(self.environment_variables): return None else: logger.info( f"Creating temporal client with address: {self.environment_variables.TEMPORAL_ADDRESS}" ) - return await get_temporal_client( - self.environment_variables.TEMPORAL_ADDRESS + return await TemporalClientFactory.create_client_from_env( + environment_variables=self.environment_variables ) async def load(self): @@ -263,6 +256,10 @@ def resolve(): return Annotated[str, Depends(resolve)] +def httpx_client() -> httpx.AsyncClient: + return GlobalDependencies().httpx_client + + def database_async_read_write_engine() -> AsyncEngine: return GlobalDependencies().database_async_read_write_engine diff --git a/agentex/src/config/environment_variables.py b/agentex/src/config/environment_variables.py index a46d08f0..887b6499 100644 --- a/agentex/src/config/environment_variables.py +++ b/agentex/src/config/environment_variables.py @@ -18,6 +18,7 @@ class EnvVarKeys(str, Enum): OPENAI_API_KEY = "OPENAI_API_KEY" DATABASE_URL = "DATABASE_URL" TEMPORAL_ADDRESS = "TEMPORAL_ADDRESS" + TEMPORAL_NAMESPACE = "TEMPORAL_NAMESPACE" REDIS_URL = "REDIS_URL" AGENTEX_BASE_URL = "AGENTEX_BASE_URL" TEMPORAL_WORKER_ACTIVITY_THREAD_POOL_SIZE = ( @@ -48,6 +49,8 @@ class EnvVarKeys(str, Enum): HTTPX_POOL_TIMEOUT = "HTTPX_POOL_TIMEOUT" HTTPX_STREAMING_READ_TIMEOUT = "HTTPX_STREAMING_READ_TIMEOUT" SSE_KEEPALIVE_PING_INTERVAL = "SSE_KEEPALIVE_PING_INTERVAL" + AGENTEX_SERVER_TASK_QUEUE = "AGENTEX_SERVER_TASK_QUEUE" + ENABLE_HEALTH_CHECK_WORKFLOW = "ENABLE_HEALTH_CHECK_WORKFLOW" class Environment(str, Enum): @@ -64,6 +67,7 @@ class EnvironmentVariables(BaseModel): OPENAI_API_KEY: str | None DATABASE_URL: str | None TEMPORAL_ADDRESS: str | None + TEMPORAL_NAMESPACE: str | None REDIS_URL: str | None AGENTEX_BASE_URL: str | None TEMPORAL_WORKER_ACTIVITY_THREAD_POOL_SIZE: int = 4 # Default 4 for local dev @@ -90,6 +94,8 @@ class EnvironmentVariables(BaseModel): 300.0 # HTTPX streaming read timeout in seconds (5 minutes) ) SSE_KEEPALIVE_PING_INTERVAL: int = 15 # SSE keepalive ping interval in seconds + AGENTEX_SERVER_TASK_QUEUE: str | None = None + ENABLE_HEALTH_CHECK_WORKFLOW: bool = False @classmethod def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None: @@ -104,6 +110,7 @@ def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None: OPENAI_API_KEY=os.environ.get(EnvVarKeys.OPENAI_API_KEY), DATABASE_URL=os.environ.get(EnvVarKeys.DATABASE_URL), TEMPORAL_ADDRESS=os.environ.get(EnvVarKeys.TEMPORAL_ADDRESS), + TEMPORAL_NAMESPACE=os.environ.get(EnvVarKeys.TEMPORAL_NAMESPACE), REDIS_URL=os.environ.get(EnvVarKeys.REDIS_URL), AGENTEX_BASE_URL=os.environ.get(EnvVarKeys.AGENTEX_BASE_URL), BUILD_REGISTRY_URL=os.environ.get(EnvVarKeys.BUILD_REGISTRY_URL), @@ -154,6 +161,13 @@ def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None: SSE_KEEPALIVE_PING_INTERVAL=int( os.environ.get(EnvVarKeys.SSE_KEEPALIVE_PING_INTERVAL, "15") ), + AGENTEX_SERVER_TASK_QUEUE=os.environ.get( + EnvVarKeys.AGENTEX_SERVER_TASK_QUEUE + ), + ENABLE_HEALTH_CHECK_WORKFLOW=( + os.environ.get(EnvVarKeys.ENABLE_HEALTH_CHECK_WORKFLOW, "false") + == "true" + ), ) refreshed_environment_variables = environment_variables return refreshed_environment_variables diff --git a/agentex/src/domain/entities/agents.py b/agentex/src/domain/entities/agents.py index 9eb70203..9e913ad2 100644 --- a/agentex/src/domain/entities/agents.py +++ b/agentex/src/domain/entities/agents.py @@ -12,6 +12,7 @@ class AgentStatus(str, Enum): FAILED = "Failed" UNKNOWN = "Unknown" DELETED = "Deleted" + UNHEALTHY = "Unhealthy" class ACPType(str, Enum): @@ -21,6 +22,11 @@ class ACPType(str, Enum): AGENTIC = "agentic" # deprecated: use ASYNC instead +class AgentInputType(str, Enum): + TEXT = "text" + JSON = "json" + + class AgentEntity(BaseModel): id: str = Field(..., description="The unique identifier of the agent.") docker_image: str | None = Field( @@ -56,3 +62,6 @@ class AgentEntity(BaseModel): registered_at: datetime | None = Field( None, description="The timestamp when the agent was last registered" ) + agent_input_type: AgentInputType | None = Field( + None, description="The type of input the agent expects." + ) diff --git a/agentex/src/domain/repositories/task_repository.py b/agentex/src/domain/repositories/task_repository.py index a228501e..40c8781e 100644 --- a/agentex/src/domain/repositories/task_repository.py +++ b/agentex/src/domain/repositories/task_repository.py @@ -40,7 +40,7 @@ async def list_with_join( agent_id: str | None = None, agent_name: str | None = None, order_by: str | None = None, - order_direction: Literal["asc", "desc"] = "desc", + order_direction: Literal["asc", "desc"] = "asc", limit: int | None = None, page_number: int | None = None, relationships: list[TaskRelationships] | None = None, diff --git a/agentex/src/domain/use_cases/agents_use_case.py b/agentex/src/domain/use_cases/agents_use_case.py index 061511b8..86741b4b 100644 --- a/agentex/src/domain/use_cases/agents_use_case.py +++ b/agentex/src/domain/use_cases/agents_use_case.py @@ -4,11 +4,17 @@ from fastapi import Depends from src.adapters.crud_store.exceptions import DuplicateItemError, ItemDoesNotExist -from src.domain.entities.agents import ACPType, AgentEntity, AgentStatus +from src.adapters.temporal.adapter_temporal import DTemporalAdapter +from src.adapters.temporal.exceptions import ( + TemporalWorkflowAlreadyExistsError, +) +from src.config.environment_variables import EnvironmentVariables +from src.domain.entities.agents import ACPType, AgentEntity, AgentInputType, AgentStatus from src.domain.repositories.agent_repository import DAgentRepository from src.domain.repositories.deployment_history_repository import ( DDeploymentHistoryRepository, ) +from src.temporal.workflows.healthcheck_workflow import HealthCheckWorkflow from src.utils.ids import orm_id from src.utils.logging import make_logger @@ -20,9 +26,11 @@ def __init__( self, agent_repository: DAgentRepository, deployment_history_repository: DDeploymentHistoryRepository, + temporal_adapter: DTemporalAdapter, ): self.agent_repo = agent_repository self.deployment_history_repo = deployment_history_repository + self.temporal_adapter = temporal_adapter async def register_agent( self, @@ -32,6 +40,7 @@ async def register_agent( agent_id: str | None = None, acp_type: ACPType = ACPType.ASYNC, registration_metadata: dict[str, Any] | None = None, + agent_input_type: AgentInputType | None = None, ) -> AgentEntity: # If an agent_id is passed, then the agent expects that it is already in the db if agent_id: @@ -42,6 +51,8 @@ async def register_agent( agent.status = AgentStatus.READY agent.status_reason = "Agent registered successfully." agent.acp_type = acp_type + if agent_input_type: + agent.agent_input_type = agent_input_type if registration_metadata: existing_metadata = agent.registration_metadata or {} existing_metadata.update(registration_metadata) @@ -64,6 +75,8 @@ async def register_agent( existing_metadata = agent.registration_metadata or {} existing_metadata.update(registration_metadata) agent.registration_metadata = existing_metadata + if agent_input_type: + agent.agent_input_type = agent_input_type # Check if any fields have changed by comparing model dumps updated_agent_data = agent.model_dump() @@ -78,6 +91,7 @@ async def register_agent( await self.maybe_update_agent_deployment_history(agent) else: logger.info(f"Agent {name} has not changed, skipping update") + await self.ensure_healthcheck_workflow(agent) return agent except ItemDoesNotExist: logger.info(f"Agent {name} not found, creating new agent") @@ -94,6 +108,7 @@ async def register_agent( acp_type=acp_type, registration_metadata=registration_metadata, registered_at=datetime.now(UTC), + agent_input_type=agent_input_type, ) # This is a problem only if multiple pods spin up and then make a request all at the same time. # In that case, the first pod will create the agent and the rest should succeed silently @@ -104,6 +119,8 @@ async def register_agent( f"Agent {name} was likely created in parallel, skipping creation" ) await self.maybe_update_agent_deployment_history(agent) + + await self.ensure_healthcheck_workflow(agent) return agent async def maybe_update_agent_deployment_history(self, agent: AgentEntity) -> None: @@ -115,6 +132,35 @@ async def maybe_update_agent_deployment_history(self, agent: AgentEntity) -> Non ) return + async def ensure_healthcheck_workflow( + self, + agent: AgentEntity, + ) -> None: + # Checking EnvironmentVariables here to allow turning this on and off without restarting the service + environment_variables = EnvironmentVariables.refresh() + if not environment_variables.ENABLE_HEALTH_CHECK_WORKFLOW: + logger.info(f"Health check workflow is not enabled for {agent.id}") + return + try: + # Start new health check workflow for this agent + await self.temporal_adapter.start_workflow( + workflow_id=f"healthcheck_workflow_{agent.id}", + workflow=HealthCheckWorkflow, + args=[{"agent_id": agent.id, "acp_url": agent.acp_url}], + task_queue=environment_variables.AGENTEX_SERVER_TASK_QUEUE, + ) + logger.info(f"Started new health check workflow for agent {agent.id}") + except TemporalWorkflowAlreadyExistsError: + # Only expected for new registrations + logger.info( + f"New health check workflow already exists for agent {agent.id}" + ) + except Exception as e: + logger.error( + f"Failed to start health check workflow for agent {agent.id}: {e}" + ) + # Not raising an error here because we want to continue with the registration process + async def get(self, id: str | None = None, name: str | None = None) -> AgentEntity: agent = await self.agent_repo.get(id=id, name=name) if agent.status == AgentStatus.DELETED: diff --git a/agentex/src/temporal/__init__.py b/agentex/src/temporal/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/agentex/src/temporal/activities/healthcheck_activities.py b/agentex/src/temporal/activities/healthcheck_activities.py new file mode 100644 index 00000000..39c4e44d --- /dev/null +++ b/agentex/src/temporal/activities/healthcheck_activities.py @@ -0,0 +1,87 @@ +""" +Temporal activities for health check workflows. + +This module provides focused activities for checking the status of an agent via its ACP endpoint. +Each activity has a single responsibility, allowing the workflow to orchestrate +the status checks and the database updates. +""" + +import httpx +from src.domain.entities.agents import AgentStatus +from src.domain.repositories.agent_repository import AgentRepository +from src.utils.logging import make_logger +from temporalio import activity + +logger = make_logger(__name__) + + +# Activity names +CHECK_STATUS_ACTIVITY = "check_status_activity" +UPDATE_AGENT_STATUS_ACTIVITY = "update_agent_status_activity" + + +class HealthCheckException(Exception): + """Exception for health check activities.""" + + pass + + +class HealthCheckActivities: + """ + Activities for health check. + + Each activity is focused on a single responsibility: + - Checking agent health via endpoint + - Updating agent status in the database + """ + + def __init__(self, agent_repo: AgentRepository, http_client: httpx.AsyncClient): + """Initialize with session maker and http client.""" + self.agent_repo = agent_repo + self.http_client = http_client + + @activity.defn(name=CHECK_STATUS_ACTIVITY) + async def check_status_activity(self, agent_id: str, acp_url: str) -> bool: + """ + Check the status of an agent via its ACP endpoint. + + Args: + agent_id: The ID of the agent to check + acp_url: The URL of the agent's ACP endpoint + + Returns: + bool: True if the agent is healthy, False otherwise + """ + logger.info(f"Checking status of agent {agent_id} via {acp_url}") + try: + response = await self.http_client.get(f"{acp_url}/healthz", timeout=5) + if response.status_code == 200: + return True + else: + logger.error( + f"Agent {agent_id} returned non-200 status: {response.status_code}" + ) + except Exception as e: + logger.error(f"Failed to check status of agent {agent_id}: {e}") + raise HealthCheckException(f"Failed to check status of agent {agent_id}") + + @activity.defn(name=UPDATE_AGENT_STATUS_ACTIVITY) + async def update_agent_status_activity(self, agent_id: str, status: str) -> None: + """ + Update the status of an agent in the database. + """ + try: + # Get agent + agent = await self.agent_repo.get(id=agent_id) + if not agent: + raise ValueError(f"Agent {agent_id} not found") + new_status = AgentStatus(status) + if agent.status == new_status: + return + agent.status = new_status + agent.status_reason = "Agent health check reported " + status + await self.agent_repo.update(item=agent) + logger.info(f"Updated agent {agent_id} status to {status}") + except Exception as e: + logger.error(f"Failed to update agent {agent_id} status: {e}") + raise diff --git a/agentex/src/temporal/run_healthcheck_workflow.py b/agentex/src/temporal/run_healthcheck_workflow.py new file mode 100644 index 00000000..3abc58eb --- /dev/null +++ b/agentex/src/temporal/run_healthcheck_workflow.py @@ -0,0 +1,74 @@ +import asyncio + +from src.adapters.temporal.adapter_temporal import TemporalAdapter +from src.adapters.temporal.client_factory import TemporalClientFactory +from src.adapters.temporal.exceptions import ( + TemporalWorkflowAlreadyExistsError, +) +from src.config.dependencies import ( + GlobalDependencies, + database_async_read_write_engine, + database_async_read_write_session_maker, +) +from src.config.environment_variables import EnvironmentVariables +from src.domain.repositories.agent_repository import AgentRepository +from src.temporal.workflows.healthcheck_workflow import HealthCheckWorkflow +from src.utils.logging import make_logger + +logger = make_logger(__name__) + + +async def main() -> None: + """ + Main entry point for ensuring a health check workflow is running for each agent. + """ + # Initialize global dependencies for this thread + global_dependencies = GlobalDependencies() + await global_dependencies.load() + + # Check if health check workflow is enabled and configured + environment_variables = EnvironmentVariables.refresh() + if not environment_variables: + logger.error("Environment variables are not configured") + return + if not environment_variables.ENABLE_HEALTH_CHECK_WORKFLOW: + logger.info("Health check workflow is not enabled") + return + task_queue = environment_variables.AGENTEX_SERVER_TASK_QUEUE + if not task_queue: + logger.error("Health check task queue is not configured") + return + # Check if Temporal is configured + if not TemporalClientFactory.is_temporal_configured(environment_variables): + logger.error("Temporal is not configured, skipping workflow creation") + return + + # Initialize repository and list agents + engine = database_async_read_write_engine() + session_maker = database_async_read_write_session_maker(engine) + agent_repo = AgentRepository(session_maker) + agents = await agent_repo.list() + + adapter = TemporalAdapter(temporal_client=global_dependencies.temporal_client) + logger.info(f"Adding Health Check workflows to task queue: {task_queue}") + # Try to add health check workflows to task queue for each agent + for agent in agents: + try: + await adapter.start_workflow( + workflow_id=f"healthcheck_workflow_{agent.id}", + workflow=HealthCheckWorkflow, + args=[{"agent_id": agent.id, "acp_url": agent.acp_url}], + task_queue=task_queue, + ) + except TemporalWorkflowAlreadyExistsError: + # Expected if workflow is already running for existing agent registration + logger.info(f"Health check workflow already exists for agent {agent.id}") + except Exception as e: + # Unexpected error, don't raise here to continue with the next agent + logger.error( + f"Failed to start health check workflow for agent {agent.id}: {e}" + ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/agentex/src/temporal/run_worker.py b/agentex/src/temporal/run_worker.py new file mode 100644 index 00000000..d1a90103 --- /dev/null +++ b/agentex/src/temporal/run_worker.py @@ -0,0 +1,187 @@ +""" +Temporal worker entry point for health check workflows. + +Each worker process handles one task queue for clean separation and scaling. +""" + +import asyncio +import os +import uuid +from concurrent.futures import ThreadPoolExecutor + +import httpx +from temporalio.worker import UnsandboxedWorkflowRunner, Worker + +from src.adapters.temporal.client_factory import TemporalClientFactory +from src.config.dependencies import ( + database_async_read_write_engine, + database_async_read_write_session_maker, + httpx_client, + startup_global_dependencies, +) +from src.config.environment_variables import EnvironmentVariables +from src.domain.repositories.agent_repository import AgentRepository +from src.temporal.activities.healthcheck_activities import HealthCheckActivities +from src.temporal.workflows.healthcheck_workflow import HealthCheckWorkflow +from src.utils.logging import make_logger + +logger = make_logger(__name__) + +# Task queue name for agentex server operations +AGENTEX_SERVER_TASK_QUEUE = "agentex-server" + +# Global worker instance +health_check_worker: Worker | None = None + + +async def run_worker( + task_queue: str = AGENTEX_SERVER_TASK_QUEUE, + dependency_overrides: dict | None = None, + workflows: list | None = None, + activities: list | None = None, + max_workers: int = 10, + max_concurrent_activities: int = 50, +) -> None: + """ + Run the Temporal worker for specified workflows and activities. + + Args: + task_queue: The task queue to process (default: agentex-server) + dependency_overrides: Optional dependency overrides for testing + workflows: List of workflow classes to register + activities: List of activity functions to register + max_workers: Maximum number of activity worker threads + max_concurrent_activities: Maximum concurrent activities + + Raises: + TemporalError: If worker creation or execution fails + """ + global health_check_worker + + try: + # Initialize global dependencies + await startup_global_dependencies() + # Get environment variables + environment_variables = EnvironmentVariables.refresh() + + logger.info(f"Starting Health Check worker for task queue: {task_queue}") + logger.info(f"Temporal address: {environment_variables.TEMPORAL_ADDRESS}") + + # Check if Temporal is configured + if not TemporalClientFactory.is_temporal_configured(environment_variables): + logger.warning("Temporal is not configured, skipping worker creation") + raise ValueError("Temporal is not properly configured") + + # Check for metrics configuration + host_url = os.environ.get("DD_AGENT_HOST") + metrics_url = f"http://[{host_url}]:4317" if host_url else None + if metrics_url: + logger.info(f"Configuring worker with metrics URL: {metrics_url}") + + # Create Temporal client + client = await TemporalClientFactory.create_client_from_env( + environment_variables=environment_variables, + metrics_url=metrics_url, + ) + + # Create the worker directly (no manager needed) + health_check_worker = Worker( + client, + task_queue=task_queue, + activity_executor=ThreadPoolExecutor(max_workers=max_workers), + workflows=workflows or [], + activities=activities or [], + workflow_runner=UnsandboxedWorkflowRunner(), + max_concurrent_activities=max_concurrent_activities, + build_id=str(uuid.uuid4()), + ) + + logger.info( + f"Health Check worker created successfully for task queue: {task_queue}" + ) + logger.info( + f"Registered {len(workflows or [])} workflows and {len(activities or [])} activities" + ) + if workflows: + logger.info(f"Workflows: {[w.__name__ for w in workflows]}") + if activities: + logger.info(f"Activities: {[a.__name__ for a in activities]}") + + # Run the worker (this will block until the worker is stopped) + await health_check_worker.run() + + except Exception as e: + logger.error(f"Worker failed: {e}") + raise + finally: + # Cleanup + if health_check_worker: + logger.info("Shutting down worker...") + await health_check_worker.shutdown() + + +def create_health_check_worker( + agent_repo: AgentRepository, http_client: httpx.AsyncClient +) -> asyncio.Task: + """ + Create a Health Check worker. + """ + # Get task queue from environment or use default + task_queue = os.environ.get("AGENTEX_SERVER_TASK_QUEUE", AGENTEX_SERVER_TASK_QUEUE) + + logger.info("Starting Temporal Health Check Worker") + logger.info(f"Task queue: {task_queue}") + + # Create activities instance with dependencies + health_check_activities = HealthCheckActivities( + agent_repo=agent_repo, + http_client=httpx_client(), + ) + + # Extract activity methods + activities = [ + health_check_activities.check_status_activity, + health_check_activities.update_agent_status_activity, + ] + + # Create and run worker task + return asyncio.create_task( + run_worker( + task_queue=task_queue, + workflows=[HealthCheckWorkflow], + activities=activities, + max_workers=50, + max_concurrent_activities=50, + ) + ) + + +async def main() -> None: + """ + Main entry point for the Health Check worker. + """ + try: + # Initialize global dependencies for this thread + await startup_global_dependencies() + # Create session maker + engine = database_async_read_write_engine() + session_maker = database_async_read_write_session_maker(engine) + agent_repo = AgentRepository(session_maker) + health_check_worker_task = create_health_check_worker( + agent_repo=agent_repo, + http_client=httpx_client(), + ) + # Wait for the worker to complete + await health_check_worker_task + + except KeyboardInterrupt: + logger.info("Received interrupt signal, shutting down worker...") + if health_check_worker: + await health_check_worker.shutdown() + except Exception as e: + logger.error(f"Worker startup failed: {e}") + raise + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/agentex/src/temporal/workflows/healthcheck_workflow.py b/agentex/src/temporal/workflows/healthcheck_workflow.py new file mode 100644 index 00000000..41e65a1f --- /dev/null +++ b/agentex/src/temporal/workflows/healthcheck_workflow.py @@ -0,0 +1,104 @@ +""" +Temporal workflow for checking the status of an agent via its ACP endpoint. +""" + +from datetime import timedelta + +from src.temporal.activities.healthcheck_activities import ( + CHECK_STATUS_ACTIVITY, + UPDATE_AGENT_STATUS_ACTIVITY, +) +from src.utils.logging import make_logger +from temporalio import workflow +from temporalio.common import RetryPolicy + +logger = make_logger(__name__) + + +@workflow.defn +class HealthCheckWorkflow: + """ + Workflow for checking the status of an agent via its ACP endpoint. + + This workflow: + 1. Checks the status of an agent via its ACP endpoint + 2. Updates the agent status in the database + 3. If the agent is not healthy, it stops the workflow + """ + + max_history_length: int = 10000 + + @workflow.run + async def run(self, workflow_args: dict) -> None: + """ + Periodically checks the health of an agent via its ACP endpoint and updates its status in the database. + If the agent fails health checks repeatedly, marks the agent as failed and stops further checks. + + Args: + workflow_args: Dictionary containing: + - agent_id: Database agent ID to check + - acp_url: Registered agent ACP URL + + Returns: + None + """ + # Extract arguments + agent_id = workflow_args["agent_id"] + acp_url = workflow_args["acp_url"] + + logger.info(f"Starting execution for agent {agent_id}") + + failure_counter = workflow_args.get("failure_counter", 0) + while not self.should_continue_as_new(): + # Wait for the next health check + await workflow.sleep(30) + try: + # Call the Activity with a retry policy + await workflow.execute_activity( + CHECK_STATUS_ACTIVITY, + args=[agent_id, acp_url], + start_to_close_timeout=timedelta(seconds=15), + retry_policy=RetryPolicy( + maximum_attempts=2, + initial_interval=timedelta(seconds=1), + backoff_coefficient=2.0, + ), + ) + except Exception: + # Activity failed after all retries + failure_counter += 1 + if failure_counter >= 5: + # Agent is officially unhealthy + await workflow.execute_activity( + UPDATE_AGENT_STATUS_ACTIVITY, + args=[agent_id, "Unhealthy"], + start_to_close_timeout=timedelta(seconds=30), + retry_policy=RetryPolicy( + maximum_attempts=3, + initial_interval=timedelta(seconds=1), + backoff_coefficient=2.0, + ), + ) + # Stop health check workflow until agent registers again + return + else: + continue + + # If health check succeeds, reset the counter + failure_counter = 0 + + workflow_args["failure_counter"] = failure_counter + workflow.continue_as_new( + arg=workflow_args, + ) + + def should_continue_as_new(self) -> bool: + if workflow.info().is_continue_as_new_suggested(): + return True + # For testing + if ( + self.max_history_length + and workflow.info().get_current_history_length() > self.max_history_length + ): + return True + return False diff --git a/agentex/tests/integration/fixtures/integration_client.py b/agentex/tests/integration/fixtures/integration_client.py index aa1f85cf..f5d8b8cd 100644 --- a/agentex/tests/integration/fixtures/integration_client.py +++ b/agentex/tests/integration/fixtures/integration_client.py @@ -16,6 +16,7 @@ from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine from sqlalchemy.orm import sessionmaker +from src.adapters.temporal.adapter_temporal import TemporalAdapter from src.api.app import app from src.api.authentication_cache import reset_auth_cache from src.config.dependencies import GlobalDependencies @@ -209,6 +210,14 @@ def isolated_api_key_http_client(): return AsyncMock(AsyncClient) +@pytest_asyncio.fixture +async def isolated_temporal_adapter(): + """ + Function-scoped fixture that provides a temporal adapter for isolated testing. + """ + return AsyncMock(TemporalAdapter) + + @pytest_asyncio.fixture async def isolated_repositories(isolated_test_schema): """ @@ -287,7 +296,9 @@ def __init__(self, redis_url): @pytest_asyncio.fixture -async def isolated_integration_app(isolated_repositories, isolated_api_key_http_client): +async def isolated_integration_app( + isolated_repositories, isolated_api_key_http_client, isolated_temporal_adapter +): """ Function-scoped fixture that provides FastAPI app with completely isolated dependencies. All use cases get repositories that point to isolated test databases. @@ -325,6 +336,7 @@ async def isolated_integration_app(isolated_repositories, isolated_api_key_http_ def create_agents_use_case(): return AgentsUseCase( agent_repository=isolated_repositories["agent_repository"], + temporal_adapter=isolated_temporal_adapter, deployment_history_repository=isolated_repositories[ "deployment_history_repository" ], diff --git a/agentex/tests/unit/use_cases/test_acp_type_backwards_compatibility_use_case.py b/agentex/tests/unit/use_cases/test_acp_type_backwards_compatibility_use_case.py index 33c9453c..05dece66 100644 --- a/agentex/tests/unit/use_cases/test_acp_type_backwards_compatibility_use_case.py +++ b/agentex/tests/unit/use_cases/test_acp_type_backwards_compatibility_use_case.py @@ -8,6 +8,7 @@ import pytest from src.adapters.crud_store.exceptions import ItemDoesNotExist +from src.adapters.temporal.adapter_temporal import TemporalAdapter from src.domain.entities.agents import ACPType, AgentEntity, AgentStatus from src.domain.entities.agents_rpc import ( ACP_TYPE_TO_ALLOWED_RPC_METHODS, @@ -241,6 +242,8 @@ async def test_register_agent_defaults_to_async_not_agentic(self): agents_use_case = AgentsUseCase( agent_repository=agent_repo, deployment_history_repository=deployment_history_repo, + # Not testing temporal adapter in this test + temporal_adapter=TemporalAdapter(), ) # Mock repository to return a new agent @@ -279,6 +282,8 @@ async def test_can_explicitly_register_agentic_agent(self): agents_use_case = AgentsUseCase( agent_repository=agent_repo, deployment_history_repository=deployment_history_repo, + # Not testing temporal adapter in this test + temporal_adapter=TemporalAdapter(), ) # Mock repository to return an AGENTIC agent