Conversation
There was a problem hiding this comment.
Pull Request Overview
This PR syncs the private Agentex repository to the public repository, introducing Temporal-based health check workflows for agent monitoring. The changes add infrastructure for periodic health checks of registered agents, including workflow definitions, activities, worker processes, and supporting temporal client abstractions.
Key changes:
- Implementation of a Temporal health check workflow that periodically monitors agent endpoints and updates status
- Addition of temporal adapter and client factory for workflow orchestration
- New agent status "Unhealthy" and optional agent input type field
- Integration of health check workflow into agent registration process
- Database migrations for new enum values and agent fields
Reviewed Changes
Copilot reviewed 22 out of 24 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| agentex/src/temporal/workflows/healthcheck_workflow.py | New workflow for periodic agent health monitoring via ACP endpoints |
| agentex/src/temporal/activities/healthcheck_activities.py | Activities for checking agent status and updating database |
| agentex/src/temporal/run_worker.py | Worker entry point for processing health check workflows |
| agentex/src/temporal/run_healthcheck_workflow.py | Script to ensure health check workflows exist for all agents |
| agentex/src/adapters/temporal/adapter_temporal.py | Temporal client adapter implementing workflow operations |
| agentex/src/adapters/temporal/client_factory.py | Factory for creating configured Temporal clients |
| agentex/src/adapters/temporal/port.py | Abstract interface for Temporal gateway operations |
| agentex/src/adapters/temporal/exceptions.py | Custom exceptions for Temporal operations |
| agentex/src/domain/use_cases/agents_use_case.py | Updated to integrate health check workflow on agent registration |
| agentex/src/domain/entities/agents.py | Added UNHEALTHY status and AgentInputType enum |
| agentex/src/api/schemas/agents.py | Added UNHEALTHY status and agent_input_type to API schemas |
| agentex/src/api/routes/agents.py | Updated registration endpoint to accept agent_input_type |
| agentex/src/config/environment_variables.py | Added configuration for Temporal namespace and health check settings |
| agentex/src/config/dependencies.py | Updated to use TemporalClientFactory |
| agentex/src/adapters/orm.py | Added agent_input_type column to AgentORM |
| agentex/src/domain/repositories/task_repository.py | Changed default order direction from desc to asc |
| agentex/tests/unit/use_cases/test_acp_type_backwards_compatibility_use_case.py | Updated tests to provide temporal_adapter parameter |
| agentex/tests/integration/fixtures/integration_client.py | Added isolated_temporal_adapter fixture |
| agentex/docker-compose.yml | Added agentex-temporal-worker service and environment variables |
| agentex/database/migrations/alembic/versions/2025_11_04_2340_add_agent_input_type_24429f13b8bd.py | Migration to add agent_input_type column |
| agentex/database/migrations/alembic/versions/2025_11_04_1923_add_unhealthy_status_a5d67f2d7356.py | Migration to add UNHEALTHY agent status |
| agentex/database/migrations/migration_history.txt | Updated migration history |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| CREATE TYPE agentinputtype AS ENUM ('TEXT', 'JSON'); | ||
| END IF; | ||
| END $$; | ||
| """) | ||
| op.add_column('agents', sa.Column('agent_input_type', sa.Enum('TEXT', 'JSON', name='agentinputtype', create_type=False), nullable=True)) |
There was a problem hiding this comment.
The database enum uses uppercase values ('TEXT', 'JSON'), but the Python AgentInputType enum uses lowercase values ('text', 'json'). This mismatch will cause runtime errors when inserting or querying data. The enum values must match exactly between the database and Python code.
| CREATE TYPE agentinputtype AS ENUM ('TEXT', 'JSON'); | |
| END IF; | |
| END $$; | |
| """) | |
| op.add_column('agents', sa.Column('agent_input_type', sa.Enum('TEXT', 'JSON', name='agentinputtype', create_type=False), nullable=True)) | |
| CREATE TYPE agentinputtype AS ENUM ('text', 'json'); | |
| END IF; | |
| END $$; | |
| """) | |
| op.add_column('agents', sa.Column('agent_input_type', sa.Enum('text', 'json', name='agentinputtype', create_type=False), nullable=True)) |
| @workflow.defn | ||
| class HealthCheckWorkflow: | ||
| """ | ||
| Workflow for checking the status of an agent via its ACP endpoint. |
There was a problem hiding this comment.
The docstring has inconsistent indentation - the opening line has extra leading whitespace. Remove the leading spaces before "Workflow" to align with standard docstring formatting.
| Workflow for checking the status of an agent via its ACP endpoint. | |
| Workflow for checking the status of an agent via its ACP endpoint. |
## Summary
Adds `GET /tasks/{task_id}/query/{query_name}` endpoint that proxies
Temporal workflow queries through the Agentex REST API.
## Problem
When programmatically invoking agentic (Temporal) agents, callers have
no way to know when the agent has finished processing a turn. The task
status stays `RUNNING` throughout. Agents using the state machine SDK
internally track their state (`waiting_for_input`, `researching`, etc.)
but this isn't exposed externally.
## Solution
Expose Temporal's built-in workflow query API through the existing REST
API. Agents that register `@workflow.query` handlers can now be queried
for their current state without affecting execution.
### New endpoint
```
GET /tasks/{task_id}/query/{query_name}
→ {"task_id": "...", "query": "get_current_state", "result": "waiting_for_input"}
```
### Changes
- Added query route to `tasks.py` using existing `DTemporalAdapter`
dependency
- Changed `TemporalQueryError` from 500 to 400 (invalid query is a
client error)
### Companion change needed
Agents need to register `@workflow.query` handlers to be queryable. The
state machine SDK should add a default `get_current_state` query handler
— tracked separately in the SDK repo.
## Use case
The Agent Plane communication service (meta-registry-comms-svc) invokes
agents via RPC and polls for responses. With this endpoint, it can check
`get_current_state` to detect when the agent transitions back to
`waiting_for_input`, providing a reliable turn-completion signal for
multi-turn conversations.
Follows the same pattern as Google A2A protocol's `INPUT_REQUIRED` task
state.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
<!-- greptile_comment -->
<h3>Greptile Summary</h3>
Adds a `GET /tasks/{task_id}/query/{query_name}` endpoint to proxy
Temporal workflow queries through the REST API, enabling callers to
inspect agent state (e.g., `waiting_for_input`) without affecting
execution. Also fixes a bug in `TemporalAdapter.query_workflow` where
`None` was incorrectly passed as an argument to the Temporal SDK's
`handle.query()`.\n\n- The new route bypasses the domain use case layer
and calls `DTemporalAdapter` directly from the API layer — this is the
only route in `src/api/routes/` that does so, deviating from the
documented clean architecture.\n- The endpoint does not validate that
the task exists before querying Temporal, which could lead to querying
unrelated workflows.\n- The `None`-arg fix in the adapter is correct and
prevents a runtime error when queries don't require arguments.
<details><summary><h3>Confidence Score: 3/5</h3></summary>
Functional but has architectural concerns and a missing task existence
check that should be addressed before merge.
The adapter fix is solid, but the new endpoint skips task existence
validation (allowing queries against potentially unrelated Temporal
workflows) and breaks the established layered architecture by calling an
adapter directly from the route layer.
Pay close attention to agentex/src/api/routes/tasks.py — the new
endpoint should go through a use case and validate task existence.
</details>
<h3>Important Files Changed</h3>
| Filename | Overview |
|----------|----------|
| agentex/src/api/routes/tasks.py | Adds new GET
/{task_id}/query/{query_name} endpoint. Bypasses the domain use case
layer by calling DTemporalAdapter directly, and does not validate that
the task exists before querying Temporal. |
| agentex/src/adapters/temporal/adapter_temporal.py | Fixes a bug where
None was passed as an argument to Temporal handle.query() — now
conditionally omits the arg parameter when it's None. |
</details>
<details><summary><h3>Sequence Diagram</h3></summary>
```mermaid
sequenceDiagram
participant Client
participant TasksRoute as tasks.py Route
participant TemporalAdapter as TemporalAdapter
participant Temporal as Temporal Server
participant Workflow as Agent Workflow
Client->>TasksRoute: GET /tasks/{task_id}/query/{query_name}
Note over TasksRoute: Auth check via DAuthorizedId
TasksRoute->>TemporalAdapter: query_workflow(workflow_id=task_id, query=query_name)
TemporalAdapter->>Temporal: get_workflow_handle(task_id)
TemporalAdapter->>Temporal: handle.query(query_name)
Temporal->>Workflow: Execute @workflow.query handler
Workflow-->>Temporal: Return state (e.g., "waiting_for_input")
Temporal-->>TemporalAdapter: Query result
TemporalAdapter-->>TasksRoute: result
TasksRoute-->>Client: {"task_id": "...", "query": "...", "result": "..."}
```
</details>
<details><summary>Prompt To Fix All With AI</summary>
`````markdown
This is a comment left during a code review.
Path: agentex/src/api/routes/tasks.py
Line: 229
Comment:
**Route bypasses domain layer, calling adapter directly**
Every other endpoint in this file delegates to a use case (`DTaskUseCase`, `DStreamsUseCase`), following the documented architecture: "API layer handles HTTP concerns, delegates to use cases" and "Dependencies flow inward (API → Domain ← Adapters)" (from `CLAUDE.md`).
This endpoint injects `DTemporalAdapter` directly into the route, which is the only place in `src/api/routes/` where an adapter is imported and called from the API layer (confirmed by grepping — `deployment_history.py` and `agents.py` only import adapter *exceptions*, not the adapter itself).
This matters because:
- It couples the HTTP layer to the Temporal infrastructure, making it harder to swap or mock the adapter
- It skips any domain-level validation (e.g., verifying the task exists before querying its workflow)
- It sets a precedent that erodes the layered architecture
Consider creating a method on `DTaskUseCase` (e.g., `query_task_workflow(task_id, query_name)`) that validates the task exists and then delegates to the temporal adapter.
**Context Used:** CLAUDE.md ([source](https://app.greptile.com/review/custom-context?memory=54e85549-5e1a-4d52-b6ac-c8e71f4ea0e6))
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: agentex/src/api/routes/tasks.py
Line: 235-238
Comment:
**No validation that the task exists before querying Temporal**
The endpoint passes `task_id` directly as `workflow_id` to Temporal without first checking that a task with this ID exists in the database. If a caller provides a valid-format UUID that doesn't correspond to a task (but does match a Temporal workflow started by another system, e.g., a healthcheck workflow), they'd get a result from the wrong workflow.
Other endpoints in this file first load the task via `task_use_case.get_task(id=task_id)` which returns a 404 if the task doesn't exist. This endpoint should do the same to maintain consistency and prevent querying unrelated workflows.
How can I resolve this? If you propose a fix, please make it concise.
`````
</details>
<sub>Reviews (4): Last reviewed commit: ["Merge branch 'main'
into
feat/workflow-q..."](f476f0b)
| [Re-trigger
Greptile](https://app.greptile.com/api/retrigger?id=26219250)</sub>
> Greptile also left **1 inline comment** on this PR.
**Context used:**
- Context used - CLAUDE.md
([source](https://app.greptile.com/review/custom-context?memory=54e85549-5e1a-4d52-b6ac-c8e71f4ea0e6))
<!-- /greptile_comment -->
---------
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
No description provided.