Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
name: Tests

on:
push:
branches:
- main
pull_request:
branches:
- main

permissions:
contents: read

jobs:
test:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: ["3.11", "3.12"]

steps:
- uses: actions/checkout@v4

- name: Install uv
uses: astral-sh/setup-uv@v5
with:
enable-cache: true

- name: Set up Python ${{ matrix.python-version }}
run: uv python install ${{ matrix.python-version }}

- name: Install dependencies
run: |
# Install package with all optional dependencies for full test coverage
uv sync --all-extras --dev

- name: Run linter (ruff)
run: uv run ruff check src tests

- name: Run type checker (mypy)
run: uv run mypy src
continue-on-error: true # Don't fail the build on type errors initially

- name: Run tests
run: uv run pytest tests/ -v --cov=agentexec --cov-report=term-missing --cov-report=xml

- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v4
with:
file: ./coverage.xml
flags: unittests
name: codecov-umbrella
fail_ci_if_error: false
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
continue-on-error: true # Don't fail if codecov upload fails

test-minimal:
# Test that the package works without optional dependencies
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4

- name: Install uv
uses: astral-sh/setup-uv@v5
with:
enable-cache: true

- name: Set up Python 3.11
run: uv python install 3.11

- name: Install minimal dependencies
run: uv sync --dev

- name: Run tests (minimal - should skip OpenAI and Pydantic AI tests)
run: uv run pytest tests/ -v -k "not (openai_runner or pydantic_ai_runner)" --cov=agentexec --cov-report=term-missing
13 changes: 10 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
[project]
name = "agentexec"
version = "0.1.2"
description = "Production-ready orchestration for OpenAI Agents with Redis-backed coordination, activity tracking, and workflow management"
description = "Production-ready orchestration for AI agents (OpenAI, Pydantic AI) with Redis-backed coordination, activity tracking, and workflow management"
readme = "README.md"
requires-python = ">=3.11"
license = { text = "MIT" }
authors = [
{name = "Agent CI", email = "[email protected]"},
{name = "Travis Dent", email = "[email protected]"}
]
keywords = ["agents", "openai", "redis", "orchestration", "async", "background-workers"]
keywords = ["agents", "openai", "pydantic-ai", "redis", "orchestration", "async", "background-workers"]
classifiers = [
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
Expand All @@ -24,9 +24,12 @@ dependencies = [
"pydantic>=2.12.0",
"pydantic-settings>=2.5.0",
"sqlalchemy>=2.0.44",
"openai-agents>=0.1.0",
]

[project.optional-dependencies]
openai = ["openai-agents>=0.1.0"]
pydantic-ai = ["pydantic-ai>=0.0.1"]
all = ["openai-agents>=0.1.0", "pydantic-ai>=0.0.1"]

[project.urls]
Homepage = "https://github.com/Agent-CI/agentexec"
Expand Down Expand Up @@ -55,6 +58,10 @@ dev-dependencies = [
line-length = 100
target-version = "py311"

[tool.ruff.lint]
# Ignore unused imports in __init__.py files (used for re-exports)
per-file-ignores = { "__init__.py" = ["F401"] }

[tool.mypy]
python_version = "3.11"
warn_return_any = true
Expand Down
8 changes: 8 additions & 0 deletions src/agentexec/runners/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,11 @@
__all__.append("OpenAIRunner")
except ImportError:
pass

# Pydantic AI runner is only available if pydantic-ai package is installed
try:
from agentexec.runners.pydantic_ai import PydanticAIRunner

__all__.append("PydanticAIRunner")
except ImportError:
pass
222 changes: 222 additions & 0 deletions src/agentexec/runners/pydantic_ai.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
import logging
import uuid
from typing import Any

from pydantic_ai import Agent, AgentRunResult, capture_run_messages
from pydantic_ai.exceptions import UsageLimitExceeded
from pydantic_ai.messages import (
ModelMessage,
ModelRequest,
UserPromptPart,
)
from pydantic_ai.result import StreamedRunResult
from pydantic_ai.tools import Tool
from pydantic_ai.usage import UsageLimits

from agentexec.runners.base import BaseAgentRunner, _RunnerTools


logger = logging.getLogger(__name__)


class _PydanticAIRunnerTools(_RunnerTools):
"""Pydantic AI-specific tools wrapper that creates Tool instances."""

@property
def report_status(self) -> Tool:
"""Get the status update tool wrapped as a Pydantic AI Tool."""
# Get the base report_activity function
base_func = super().report_status

# Create a Tool instance for Pydantic AI
return Tool(
function=base_func,
name="report_activity",
description=(
"Report progress and status updates. "
"Use this tool to report your progress as you work through the task."
),
)


class PydanticAIRunner(BaseAgentRunner):
"""Runner for Pydantic AI agents with automatic activity tracking.

This runner wraps Pydantic AI agents and provides:
- Automatic agent_id generation
- Activity lifecycle management (QUEUED -> RUNNING -> COMPLETE/ERROR)
- Request limit recovery with configurable wrap-up prompts
- Status update tool with agent_id pre-baked

Example:
runner = agentexec.PydanticAIRunner(
agent_id=agent_id,
max_turns_recovery=True,
wrap_up_prompt="Please summarize your findings.",
report_status_prompt="Use report_activity(message, percentage) to report progress.",
)

agent = Agent(
'anthropic:claude-sonnet-4-0',
system_prompt=f"Research companies. {runner.prompts.report_status}",
tools=[runner.tools.report_status],
)

result = await runner.run(
agent=agent,
user_prompt="Research Acme Corp",
max_turns=15,
)
"""

def __init__(
self,
agent_id: uuid.UUID,
*,
max_turns_recovery: bool = False,
wrap_up_prompt: str | None = None,
recovery_turns: int = 5,
report_status_prompt: str | None = None,
) -> None:
"""Initialize the Pydantic AI runner.

Args:
agent_id: UUID for tracking this agent's activity.
max_turns_recovery: Enable automatic recovery when request limit exceeded.
wrap_up_prompt: Prompt to use for recovery run.
recovery_turns: Number of turns allowed for recovery.
report_status_prompt: Instruction snippet about using the status tool.
"""
super().__init__(
agent_id,
max_turns_recovery=max_turns_recovery,
recovery_turns=recovery_turns,
wrap_up_prompt=wrap_up_prompt,
report_status_prompt=report_status_prompt,
)
# Override with Pydantic AI-specific tools
self.tools = _PydanticAIRunnerTools(self.agent_id)

async def run(
self,
agent: Agent[Any, Any],
user_prompt: str | list[ModelMessage] | None,
max_turns: int = 10,
deps: Any | None = None,
message_history: list[ModelMessage] | None = None,
model_settings: dict[str, Any] | None = None,
) -> AgentRunResult[Any]:
"""Run the agent with automatic activity tracking.

Args:
agent: Pydantic AI Agent instance.
user_prompt: User input/prompt for the agent, or list of messages.
max_turns: Maximum number of agent iterations (maps to request_limit).
deps: Optional dependencies to pass to the agent.
message_history: Optional message history to continue from.
model_settings: Optional model settings to pass to the agent.

Returns:
AgentRunResult from the agent execution.
"""
# Use capture_run_messages to access conversation history if UsageLimitExceeded
with capture_run_messages() as messages:
try:
result = await agent.run(
user_prompt=user_prompt,
message_history=message_history,
deps=deps,
usage_limits=UsageLimits(request_limit=max_turns),
model_settings=model_settings,
)
return result
except UsageLimitExceeded:
if not self.max_turns_recovery:
raise

logger.info(
"Request limit exceeded, attempting recovery with %d messages",
len(messages),
)

# Append wrap-up prompt to the captured messages
wrap_up_request = ModelRequest(
parts=[UserPromptPart(content=self.prompts.wrap_up)]
)
recovery_messages = list(messages) + [wrap_up_request]

# Retry with recovery turns limit
result = await agent.run(
user_prompt=None, # None since we're using message_history
message_history=recovery_messages,
deps=deps,
usage_limits=UsageLimits(request_limit=self.recovery_turns),
model_settings=model_settings,
)
return result

async def run_streamed(
self,
agent: Agent[Any, Any],
user_prompt: str | list[ModelMessage] | None,
max_turns: int = 10,
deps: Any | None = None,
message_history: list[ModelMessage] | None = None,
model_settings: dict[str, Any] | None = None,
) -> StreamedRunResult[Any]:
"""Run the agent in streaming mode with automatic activity tracking.

The returned streaming result can be used with async context manager pattern.
Activity tracking happens automatically.

Args:
agent: Pydantic AI Agent instance.
user_prompt: User input/prompt for the agent, or list of messages.
max_turns: Maximum number of agent iterations (maps to request_limit).
deps: Optional dependencies to pass to the agent.
message_history: Optional message history to continue from.
model_settings: Optional model settings to pass to the agent.

Returns:
StreamedRunResult from the agent execution.

Example:
async with await runner.run_streamed(agent, "Research XYZ") as result:
async for message in result.stream_text():
print(message)
"""
# Use capture_run_messages to access conversation history if UsageLimitExceeded
with capture_run_messages() as messages:
try:
result = await agent.run_stream(
user_prompt=user_prompt,
message_history=message_history,
deps=deps,
usage_limits=UsageLimits(request_limit=max_turns),
model_settings=model_settings,
)
return result
except UsageLimitExceeded:
if not self.max_turns_recovery:
raise

logger.info(
"Request limit exceeded during streaming, attempting recovery with %d messages",
len(messages),
)

# Append wrap-up prompt to the captured messages
wrap_up_request = ModelRequest(
parts=[UserPromptPart(content=self.prompts.wrap_up)]
)
recovery_messages = list(messages) + [wrap_up_request]

# Retry with recovery turns limit
result = await agent.run_stream(
user_prompt=None, # None since we're using message_history
message_history=recovery_messages,
deps=deps,
usage_limits=UsageLimits(request_limit=self.recovery_turns),
model_settings=model_settings,
)
return result
2 changes: 1 addition & 1 deletion tests/test_activity_tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from sqlalchemy.orm import Session, sessionmaker

from agentexec import activity
from agentexec.activity.models import Activity, ActivityLog, Base, Status
from agentexec.activity.models import Activity, Base, Status


@pytest.fixture
Expand Down
Loading
Loading