diff --git a/README.md b/README.md index 1a160c3..1719184 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ # `agentexec` -**Production-ready orchestration for OpenAI Agents SDK** with Redis-backed task queues, SQLAlchemy activity tracking, and multiprocessing worker pools. +**Production-ready orchestration for AI agents** with Redis-backed task queues, SQLAlchemy activity tracking, and multiprocessing worker pools. -Build reliable, scalable AI agent applications with automatic lifecycle management, progress tracking, and fault tolerance. +Supports **OpenAI Agents SDK** and **LangChain** with automatic lifecycle management, progress tracking, and fault tolerance. Running AI agents in production requires more than just the SDK. You need: @@ -22,6 +22,7 @@ Running AI agents in production requires more than just the SDK. You need: - **Redis task queue** - Reliable job distribution with priority support - **Automatic activity tracking** - Full lifecycle management (QUEUED → RUNNING → COMPLETE/ERROR) - **OpenAI Agents integration** - Drop-in runner with max turns recovery +- **LangChain integration** - Support for ReAct, tool-calling, and LangGraph agents - **Agent self-reporting** - Built-in tools for agents to report progress - **SQLAlchemy-based storage** - Flexible database support (PostgreSQL, MySQL, SQLite) - **Type-safe** - Full type annotations with Pydantic schemas @@ -31,10 +32,18 @@ Running AI agents in production requires more than just the SDK. You need: ## Installation +**For OpenAI Agents SDK:** ```bash uv add agentexec ``` +**For LangChain:** +```bash +uv add "agentexec[langchain]" +# Also install your LLM provider, e.g.: +uv add langchain-openai # or langchain-anthropic, etc. +``` + **Requirements:** - Python 3.11+ - Redis (for task queue) @@ -229,7 +238,9 @@ See **[examples/openai-agents-fastapi/pipeline.py](examples/openai-agents-fastap --- -## Full Example: FastAPI Integration +## Full Examples: FastAPI Integration + +### OpenAI Agents SDK Example See **[examples/openai-agents-fastapi/](examples/openai-agents-fastapi/)** for a complete production application showing: @@ -240,6 +251,16 @@ See **[examples/openai-agents-fastapi/](examples/openai-agents-fastapi/)** for a - Real-time progress monitoring - Graceful shutdown with cleanup +### LangChain Example + +See **[examples/langchain-agents-fastapi/](examples/langchain-agents-fastapi/)** for a LangChain-based application demonstrating: + +- ReAct agent pattern with LangChain +- Integration with multiple LLM providers +- Streaming support via astream_events +- Custom tools with @tool decorator +- Same orchestration patterns as OpenAI example + --- ## Configuration @@ -320,6 +341,34 @@ result = await runner.run(agent, input="...", max_turns=15) result = await runner.run_streamed(agent, input="...", max_turns=15) ``` +### LangChain Runner + +```python +from langchain.agents import AgentExecutor, create_react_agent +from langchain_openai import ChatOpenAI + +# Create LangChain agent +llm = ChatOpenAI(model="gpt-4o-mini") +agent = create_react_agent(llm, tools, prompt) +agent_executor = AgentExecutor(agent=agent, tools=tools) + +# Wrap with agentexec runner +runner = ax.LangChainRunner( + agent_id=agent_id, + agent_executor=agent_executor, + max_turns_recovery=True, + wrap_up_prompt="Summarize...", +) + +# Run agent +result = await runner.run(input="...", max_iterations=15) + +# Streaming +async for event in runner.run_streamed(input="...", max_iterations=15): + if event["event"] == "on_chat_model_stream": + print(event["data"]["chunk"].content, end="") +``` + --- ## Architecture @@ -445,7 +494,9 @@ MIT License - see [LICENSE](LICENSE) for details ## Links -- **Documentation**: See example application in `examples/openai-agents-fastapi/` +- **Documentation**: See example applications + - OpenAI Agents SDK: `examples/openai-agents-fastapi/` + - LangChain: `examples/langchain-agents-fastapi/` - **Issues**: [GitHub Issues](https://github.com/Agent-CI/agentexec/issues) - **PyPI**: [agentexec](https://pypi.org/project/agentexec/) diff --git a/examples/langchain-agents-fastapi/README.md b/examples/langchain-agents-fastapi/README.md new file mode 100644 index 0000000..daad649 --- /dev/null +++ b/examples/langchain-agents-fastapi/README.md @@ -0,0 +1,170 @@ +# LangChain Agents FastAPI Example + +This example demonstrates a complete FastAPI application using **agentexec** to orchestrate LangChain agents in production. + +## What This Example Demonstrates + +### Core Features + +- **Background worker pool** (`worker.py`) - Multi-process task execution with Redis queue +- **LangChainRunner integration** - Automatic activity tracking for agent lifecycle +- **ReAct agent pattern** - Using LangChain's create_react_agent for reasoning +- **Custom FastAPI routes** (`views.py`) - Building your own API on agentexec's public API +- **Database session management** (`main.py`) - Standard SQLAlchemy patterns with full control +- **Agent self-reporting** - Agents report progress via built-in `report_activity` tool +- **Max iterations recovery** - Automatic handling of conversation limits with wrap-up prompts + +### Key Patterns Shown + +**Task Registration:** +```python +@pool.task("research_company") +async def research_company( + agent_id: UUID, + context: ResearchCompanyContext, +) -> ResearchCompanyResult: + # Create LangChain agent + llm = ChatOpenAI(model="gpt-4o-mini") + agent = create_react_agent(llm, tools, prompt) + agent_executor = AgentExecutor(agent=agent, tools=tools) + + # Wrap with agentexec runner + runner = ax.LangChainRunner( + agent_id=agent_id, + agent_executor=agent_executor, + max_turns_recovery=True + ) + + # Execute with activity tracking + result = await runner.run(context.input_prompt, max_iterations=15) + return ResearchCompanyResult(summary=result["output"]) +``` + +**Activity Tracking API:** +```python +# List activities with pagination +ax.activity.list(db, page=1, page_size=50) + +# Get detailed activity with full log history +ax.activity.detail(db, agent_id) + +# Cleanup on shutdown +ax.activity.cancel_pending(db) +``` + +**Queueing Tasks:** +```python +context = ResearchCompanyContext(company_name="Acme") +task = await ax.enqueue( + "research_company", + context, + priority=ax.Priority.HIGH, +) +``` + +## Quick Start + +```bash +# Install dependencies +cd examples/langchain-agents-fastapi +uv sync + +# Start Redis +docker run -d -p 6379:6379 redis:latest + +# Set API key +export OPENAI_API_KEY="your-key" + +# Run migrations +alembic upgrade head + +# Start worker (terminal 1) +python -m langchain_agents_fastapi.worker + +# Start API server (terminal 2) +uvicorn langchain_agents_fastapi.main:app --reload +``` + +## Try It + +Queue a task: +```bash +curl -X POST "http://localhost:8000/api/tasks/research_company" \ + -H "Content-Type: application/json" \ + -d '{ + "company_name": "Anthropic", + "input_prompt": "Focus on their AI safety research" + }' +``` + +Monitor progress: +```bash +# List all activities +curl "http://localhost:8000/api/agents/activity" + +# Get specific agent details +curl "http://localhost:8000/api/agents/activity/{agent_id}" +``` + +## Configuration + +Set via environment variables: + +```bash +DATABASE_URL="sqlite:///agents.db" # or postgresql://... +REDIS_URL="redis://localhost:6379/0" +QUEUE_NAME="agentexec:tasks" +NUM_WORKERS="4" +OPENAI_API_KEY="sk-..." +``` + +## LangChain-Specific Features + +### Agent Types Supported + +This example demonstrates the **ReAct agent** pattern, but agentexec's LangChainRunner supports: + +- **ReAct agents** (`create_react_agent`) - Reasoning and acting +- **Tool-calling agents** (`create_tool_calling_agent`) - Structured tool use +- **Structured chat agents** (`create_structured_chat_agent`) - Multi-turn conversations +- **LangGraph agents** - Durable execution with state persistence + +### Streaming Support + +LangChain agents support streaming via `astream_events`: + +```python +async for event in runner.run_streamed(input_prompt, max_iterations=15): + if event["event"] == "on_chat_model_stream": + print(event["data"]["chunk"].content, end="") +``` + +### Custom Tools + +Tools in LangChain use the `@tool` decorator: + +```python +from langchain_core.tools import tool + +@tool +def search_company_info(company_name: str, query_type: str) -> str: + """Search for company information.""" + return f"Information about {company_name}..." +``` + +## Differences from OpenAI Agents SDK + +| Feature | OpenAI Agents SDK | LangChain | +|---------|------------------|-----------| +| Agent creation | `Agent(name, instructions, tools)` | `create_react_agent(llm, tools, prompt)` | +| Tool decorator | `@function_tool` | `@tool` | +| Execution | `Runner.run()` | `AgentExecutor.invoke()` | +| Streaming | `run_streamed()` | `astream_events()` | +| Max iterations | `MaxTurnsExceeded` exception | Max iterations in config | +| LLM providers | OpenAI only | OpenAI, Anthropic, local models, etc. | + +## Learn More + +- [LangChain Documentation](https://python.langchain.com/) +- [AgentExec Documentation](https://github.com/Agent-CI/agentexec) +- [ReAct Agent Pattern](https://python.langchain.com/docs/modules/agents/agent_types/react/) diff --git a/examples/langchain-agents-fastapi/alembic.ini b/examples/langchain-agents-fastapi/alembic.ini new file mode 100644 index 0000000..c014592 --- /dev/null +++ b/examples/langchain-agents-fastapi/alembic.ini @@ -0,0 +1,148 @@ +# A generic, single database configuration. + +[alembic] +# path to migration scripts. +# this is typically a path given in POSIX (e.g. forward slashes) +# format, relative to the token %(here)s which refers to the location of this +# ini file +script_location = %(here)s/alembic + +# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s +# Uncomment the line below if you want the files to be prepended with date and time +# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file +# for all available tokens +# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s + +# sys.path path, will be prepended to sys.path if present. +# defaults to the current working directory. for multiple paths, the path separator +# is defined by "path_separator" below. +prepend_sys_path = . + + +# timezone to use when rendering the date within the migration file +# as well as the filename. +# If specified, requires the tzdata library which can be installed by adding +# `alembic[tz]` to the pip requirements. +# string value is passed to ZoneInfo() +# leave blank for localtime +# timezone = + +# max length of characters to apply to the "slug" field +# truncate_slug_length = 40 + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + +# set to 'true' to allow .pyc and .pyo files without +# a source .py file to be detected as revisions in the +# versions/ directory +# sourceless = false + +# version location specification; This defaults +# to /versions. When using multiple version +# directories, initial revisions must be specified with --version-path. +# The path separator used here should be the separator specified by "path_separator" +# below. +# version_locations = %(here)s/bar:%(here)s/bat:%(here)s/alembic/versions + +# path_separator; This indicates what character is used to split lists of file +# paths, including version_locations and prepend_sys_path within configparser +# files such as alembic.ini. +# The default rendered in new alembic.ini files is "os", which uses os.pathsep +# to provide os-dependent path splitting. +# +# Note that in order to support legacy alembic.ini files, this default does NOT +# take place if path_separator is not present in alembic.ini. If this +# option is omitted entirely, fallback logic is as follows: +# +# 1. Parsing of the version_locations option falls back to using the legacy +# "version_path_separator" key, which if absent then falls back to the legacy +# behavior of splitting on spaces and/or commas. +# 2. Parsing of the prepend_sys_path option falls back to the legacy +# behavior of splitting on spaces, commas, or colons. +# +# Valid values for path_separator are: +# +# path_separator = : +# path_separator = ; +# path_separator = space +# path_separator = newline +# +# Use os.pathsep. Default configuration used for new projects. +path_separator = os + +# set to 'true' to search source files recursively +# in each "version_locations" directory +# new in Alembic version 1.10 +# recursive_version_locations = false + +# the output encoding used when revision files +# are written from script.py.mako +# output_encoding = utf-8 + +# database URL. This is consumed by the user-maintained env.py script only. +# other means of configuring database URLs may be customized within the env.py +# file. +# We configure the database URL in env.py from the DATABASE_URL environment variable +# sqlalchemy.url = driver://user:pass@localhost/dbname + + +[post_write_hooks] +# post_write_hooks defines scripts or Python functions that are run +# on newly generated revision scripts. See the documentation for further +# detail and examples + +# format using "black" - use the console_scripts runner, against the "black" entrypoint +# hooks = black +# black.type = console_scripts +# black.entrypoint = black +# black.options = -l 79 REVISION_SCRIPT_FILENAME + +# lint with attempts to fix using "ruff" - use the module runner, against the "ruff" module +# hooks = ruff +# ruff.type = module +# ruff.module = ruff +# ruff.options = check --fix REVISION_SCRIPT_FILENAME + +# Alternatively, use the exec runner to execute a binary found on your PATH +# hooks = ruff +# ruff.type = exec +# ruff.executable = ruff +# ruff.options = check --fix REVISION_SCRIPT_FILENAME + +# Logging configuration. This is also consumed by the user-maintained +# env.py script only. +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARNING +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARNING +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/examples/langchain-agents-fastapi/alembic/README b/examples/langchain-agents-fastapi/alembic/README new file mode 100644 index 0000000..98e4f9c --- /dev/null +++ b/examples/langchain-agents-fastapi/alembic/README @@ -0,0 +1 @@ +Generic single-database configuration. \ No newline at end of file diff --git a/examples/langchain-agents-fastapi/alembic/env.py b/examples/langchain-agents-fastapi/alembic/env.py new file mode 100644 index 0000000..44b8890 --- /dev/null +++ b/examples/langchain-agents-fastapi/alembic/env.py @@ -0,0 +1,82 @@ +import os +from logging.config import fileConfig + +from alembic import context + +# Import your application's models +from models import Base +from sqlalchemy import engine_from_config, pool + +import agentexec as ax + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# Get database URL from environment (same pattern as the app) +DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///agents.db") +config.set_main_option("sqlalchemy.url", DATABASE_URL) + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +# Combine metadata from both your app and agent-runner +# This allows Alembic to manage tables from both sources +target_metadata = [Base.metadata, ax.Base.metadata] + +# other values from the config, defined by the needs of env.py, +# can be acquired: +# my_important_option = config.get_main_option("my_important_option") +# ... etc. + + +def run_migrations_offline() -> None: + """Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + + """ + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online() -> None: + """Run migrations in 'online' mode. + + In this scenario we need to create an Engine + and associate a connection with the context. + + """ + connectable = engine_from_config( + config.get_section(config.config_ini_section, {}), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + + with connectable.connect() as connection: + context.configure(connection=connection, target_metadata=target_metadata) + + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/examples/langchain-agents-fastapi/alembic/script.py.mako b/examples/langchain-agents-fastapi/alembic/script.py.mako new file mode 100644 index 0000000..1101630 --- /dev/null +++ b/examples/langchain-agents-fastapi/alembic/script.py.mako @@ -0,0 +1,28 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision: str = ${repr(up_revision)} +down_revision: Union[str, Sequence[str], None] = ${repr(down_revision)} +branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)} +depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)} + + +def upgrade() -> None: + """Upgrade schema.""" + ${upgrades if upgrades else "pass"} + + +def downgrade() -> None: + """Downgrade schema.""" + ${downgrades if downgrades else "pass"} diff --git a/examples/langchain-agents-fastapi/context.py b/examples/langchain-agents-fastapi/context.py new file mode 100644 index 0000000..9ec136e --- /dev/null +++ b/examples/langchain-agents-fastapi/context.py @@ -0,0 +1,21 @@ +"""Context classes for LangChain Agents FastAPI example. + +These Pydantic models provide type-safe context for background tasks. +""" + +from pydantic import BaseModel, Field + + +class ResearchCompanyContext(BaseModel): + """Context for company research tasks.""" + + company_name: str = Field(..., min_length=1, description="Name of the company to research") + input_prompt: str | None = Field(None, description="Custom research prompt (optional)") + + class Config: + json_schema_extra = { + "example": { + "company_name": "Anthropic", + "input_prompt": "Focus on their AI safety research and product offerings", + } + } diff --git a/examples/langchain-agents-fastapi/db.py b/examples/langchain-agents-fastapi/db.py new file mode 100644 index 0000000..f1b5486 --- /dev/null +++ b/examples/langchain-agents-fastapi/db.py @@ -0,0 +1,34 @@ +"""Database configuration and utilities for LangChain example.""" + +import os +from collections.abc import Generator + +from sqlalchemy import create_engine +from sqlalchemy.orm import Session, sessionmaker + +# Database setup - users manage their own connection +# For PostgreSQL: "postgresql://user:password@localhost:5432/dbname" +# For SQLite: "sqlite:///agents.db" +DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///agents.db") + +# Create engine and session factory (standard SQLAlchemy setup) +engine = create_engine(DATABASE_URL, echo=False) +SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + + +# Dependency: Get database session (standard FastAPI pattern) +def get_db() -> Generator[Session, None, None]: + """Provide a database session for each request. + + This is the standard FastAPI pattern for database session management. + Users have full control over connection pooling, timeouts, etc. + """ + db = SessionLocal() + try: + yield db + db.commit() + except Exception: + db.rollback() + raise + finally: + db.close() diff --git a/examples/langchain-agents-fastapi/main.py b/examples/langchain-agents-fastapi/main.py new file mode 100644 index 0000000..6342743 --- /dev/null +++ b/examples/langchain-agents-fastapi/main.py @@ -0,0 +1,47 @@ +"""FastAPI application demonstrating agentexec integration with LangChain.""" + +from contextlib import asynccontextmanager + +from fastapi import FastAPI +import agentexec as ax + +from db import SessionLocal +from views import router + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Application lifespan: setup and teardown.""" + print("✓ Activity tracking configured") + print(f"✓ Redis URL: {ax.CONF.redis_url}") + print(f"✓ Queue name: {ax.CONF.queue_name}") + print(f"✓ Number of workers: {ax.CONF.num_workers}") + + yield + + # Cleanup: cancel any pending agents + with SessionLocal() as db: + try: + canceled = ax.activity.cancel_pending(db) + db.commit() + print(f"✓ Canceled {canceled} pending agents") + except Exception as e: + db.rollback() + print(f"✗ Error canceling pending agents: {e}") + + +# Create FastAPI app +app = FastAPI( + title="AgentExec LangChain Example", + description="Example FastAPI application using agentexec with LangChain agents", + version="1.0.0", + lifespan=lifespan, +) + +app.include_router(router) + + +if __name__ == "__main__": + import uvicorn + + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/examples/langchain-agents-fastapi/models.py b/examples/langchain-agents-fastapi/models.py new file mode 100644 index 0000000..f4450c7 --- /dev/null +++ b/examples/langchain-agents-fastapi/models.py @@ -0,0 +1,9 @@ +"""Database models for the example application.""" + +from sqlalchemy.orm import DeclarativeBase + + +class Base(DeclarativeBase): + """Base class for all database models in this application.""" + + pass diff --git a/examples/langchain-agents-fastapi/pyproject.toml b/examples/langchain-agents-fastapi/pyproject.toml new file mode 100644 index 0000000..1a9dbc5 --- /dev/null +++ b/examples/langchain-agents-fastapi/pyproject.toml @@ -0,0 +1,33 @@ +[project] +name = "langchain-agents-fastapi" +version = "0.1.0" +description = "Example FastAPI application demonstrating agentexec with LangChain" +readme = "README.md" +requires-python = ">=3.11" +license = { text = "MIT" } + +dependencies = [ + # AgentExec for orchestration + "agentexec[langchain]>=0.1.0", + # LangChain for AI agent functionality + "langchain>=0.3.0", + "langchain-core>=0.3.0", + "langchain-openai>=0.2.0", + "langgraph>=0.2.0", + # FastAPI and server + "fastapi>=0.121.0", + "uvicorn[standard]>=0.27.0", + # Database migrations + "alembic>=1.13.0", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.uv] +dev-dependencies = [ + "pytest>=8.0.0", + "pytest-asyncio>=0.23.0", + "httpx>=0.27.0", # For testing FastAPI +] diff --git a/examples/langchain-agents-fastapi/tools.py b/examples/langchain-agents-fastapi/tools.py new file mode 100644 index 0000000..e43d8f9 --- /dev/null +++ b/examples/langchain-agents-fastapi/tools.py @@ -0,0 +1,48 @@ +"""LangChain tools for company research.""" + +from langchain_core.tools import tool + + +@tool +def search_company_info(company_name: str, query_type: str) -> str: + """Search for company information. + + Args: + company_name: The name of the company to search for + query_type: Type of information to search for (financial, news, products, team) + + Returns: + Relevant information about the company + """ + # In a real implementation, this would call actual APIs + # For demo purposes, return simulated data + if query_type == "financial": + return f"{company_name} reported $100M revenue last quarter with 20% YoY growth." + elif query_type == "news": + return f"{company_name} recently announced a new product launch and expansion plans." + elif query_type == "products": + return f"{company_name} offers SaaS solutions for enterprise data management." + elif query_type == "team": + return f"{company_name} has 500+ employees across 5 offices globally." + return f"General information about {company_name}" + + +@tool +def analyze_financial_data(company_name: str, metrics: list[str]) -> dict: + """Analyze financial metrics for a company. + + Args: + company_name: The name of the company + metrics: List of metrics to analyze (revenue, profit, growth_rate) + + Returns: + Dictionary with analyzed metrics + """ + # Simulated financial analysis + return { + "company": company_name, + "revenue": "$100M", + "profit_margin": "25%", + "growth_rate": "20% YoY", + "outlook": "positive", + } diff --git a/examples/langchain-agents-fastapi/views.py b/examples/langchain-agents-fastapi/views.py new file mode 100644 index 0000000..3a4311f --- /dev/null +++ b/examples/langchain-agents-fastapi/views.py @@ -0,0 +1,101 @@ +"""API routes for the LangChain example application.""" + +import uuid + +from fastapi import APIRouter, Depends, HTTPException, Query +from pydantic import BaseModel +from sqlalchemy.orm import Session +import agentexec as ax + +from context import ResearchCompanyContext +from db import get_db + + +router = APIRouter() + + +class ResearchCompanyRequest(BaseModel): + """Request to queue a company research task.""" + + company_name: str + input_prompt: str | None = None + priority: ax.Priority = ax.Priority.LOW + + +class TaskResponse(BaseModel): + """Response after queuing a task.""" + + agent_id: uuid.UUID + message: str + + +@router.post( + "/api/tasks/research_company", + response_model=TaskResponse, +) +async def queue_research_company( + request: ResearchCompanyRequest, + db: Session = Depends(get_db), +): + """Queue a company research task. + + The task will be picked up by a worker and executed asynchronously. + Use the agent_id to track progress via the activity endpoints. + + Example request body: + { + "company_name": "Anthropic", + "input_prompt": "Focus on AI safety research", + "priority": "low" + } + """ + context = ResearchCompanyContext( + company_name=request.company_name, + input_prompt=request.input_prompt, + ) + + task = await ax.enqueue( + "research_company", + context, + priority=request.priority, + ) + + return TaskResponse( + agent_id=task.agent_id, + message=f"Task queued successfully. Track progress at /api/agents/activity/{task.agent_id}", + ) + + +@router.get( + "/api/agents/activity", + response_model=ax.activity.ActivityListSchema, +) +async def list_agents( + page: int = Query(1, ge=1, description="Page number"), + page_size: int = Query(50, ge=1, le=100, description="Items per page"), + db: Session = Depends(get_db), +): + """List all activities with pagination. + + Uses agentexec's public API: activity.list() + The database session is automatically managed by FastAPI dependency injection. + """ + return ax.activity.list(db, page=page, page_size=page_size) + + +@router.get( + "/api/agents/activity/{agent_id}", + response_model=ax.activity.ActivityDetailSchema, +) +async def get_agent(agent_id: str, db: Session = Depends(get_db)): + """Get detailed information about a specific agent including full log history. + + Uses agentexec's public API: activity.detail() + The database session is automatically managed by FastAPI dependency injection. + """ + activity_obj = ax.activity.detail(db, agent_id) + + if not activity_obj: + raise HTTPException(status_code=404, detail=f"Agent {agent_id} not found") + + return activity_obj diff --git a/examples/langchain-agents-fastapi/worker.py b/examples/langchain-agents-fastapi/worker.py new file mode 100644 index 0000000..89142f8 --- /dev/null +++ b/examples/langchain-agents-fastapi/worker.py @@ -0,0 +1,140 @@ +"""Worker process for running LangChain agents with agentexec.""" + +from uuid import UUID + +from langchain.agents import AgentExecutor, create_react_agent +from langchain_core.prompts import PromptTemplate +from langchain_openai import ChatOpenAI +from pydantic import BaseModel +import agentexec as ax + +from context import ResearchCompanyContext +from db import engine +from tools import analyze_financial_data, search_company_info + + +class ResearchCompanyResult(BaseModel): + """Result from company research task.""" + + summary: str + financial_performance: str | None = None + recent_news: str | None = None + products_services: str | None = None + team_structure: str | None = None + + +pool = ax.WorkerPool(engine=engine) + + +@pool.task("research_company") +async def research_company( + agent_id: UUID, + context: ResearchCompanyContext, +) -> ResearchCompanyResult: + """Research a company using a LangChain ReAct agent with tools. + + This demonstrates: + - Using LangChain with ReAct agent pattern + - Automatic activity tracking via LangChainRunner + - Agent self-reporting progress via report_activity tool + - Type-safe context object (automatically deserialized from queue) + - Typed result return value + """ + # Type-safe context access with IDE autocomplete! + company_name = context.company_name + input_prompt = context.input_prompt or f"Research the company {company_name}." + + # Initialize the LLM + llm = ChatOpenAI(model="gpt-4o-mini", temperature=0) + + # Define the tools available to the agent + tools = [ + search_company_info, + analyze_financial_data, + ] + + # Create a ReAct prompt template + template = """You are a thorough company research analyst. +Research {company_name} and provide a comprehensive report covering: +- Financial performance and metrics +- Recent news and developments +- Products and services offered +- Team and organizational structure + +Use the available tools to gather information and synthesize a detailed report. + +IMPORTANT: Report your progress regularly using the report_activity tool. +Call report_activity(message, percentage) to update on your current task. +Always report your current activity before starting a new step. +Include a brief message about the task and percentage completion (0-100). + +You have access to the following tools: + +{tools} + +Use the following format: + +Question: the input question you must answer +Thought: you should always think about what to do +Action: the action to take, should be one of [{tool_names}] +Action Input: the input to the action +Observation: the result of the action +... (this Thought/Action/Action Input/Observation can repeat N times) +Thought: I now know the final answer +Final Answer: the final answer to the original input question + +Begin! + +Question: {input} +Thought:{agent_scratchpad}""" + + prompt = PromptTemplate.from_template(template) + prompt = prompt.partial(company_name=company_name) + + # Create the ReAct agent + agent = create_react_agent(llm, tools, prompt) + agent_executor = AgentExecutor( + agent=agent, + tools=tools, + verbose=True, + handle_parsing_errors=True, + ) + + # Wrap with agentexec runner for activity tracking + runner = ax.LangChainRunner( + agent_id=agent_id, + agent_executor=agent_executor, + max_turns_recovery=True, + wrap_up_prompt="Please summarize your findings and provide a final report.", + ) + + # Execute the agent + result = await runner.run( + input=input_prompt, + max_iterations=15, + ) + + # Extract the output from LangChain's result dict + output = result.get("output", "No output generated") + + print(f"✓ Completed research for {company_name} (agent_id: {agent_id})") + print(f"Report preview: {output[:200]}...") + + # Return typed result + return ResearchCompanyResult( + summary=output, + financial_performance="See report", + recent_news="See report", + products_services="See report", + team_structure="See report", + ) + + +if __name__ == "__main__": + print("Starting LangChain agent worker pool...") + print(f"Workers: {ax.CONF.num_workers}") + print(f"Queue: {ax.CONF.queue_name}") + print("Press Ctrl+C to shutdown gracefully") + + # run() blocks and handles log streaming from workers + pool.run() diff --git a/pyproject.toml b/pyproject.toml index d63420e..065c223 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,6 +27,12 @@ dependencies = [ "openai-agents>=0.1.0", ] +[project.optional-dependencies] +langchain = [ + "langchain>=0.3.0", + "langchain-core>=0.3.0", +] + [project.urls] Homepage = "https://github.com/Agent-CI/agentexec" diff --git a/src/agentexec/runners/__init__.py b/src/agentexec/runners/__init__.py index d1af0fd..4e969f5 100644 --- a/src/agentexec/runners/__init__.py +++ b/src/agentexec/runners/__init__.py @@ -11,3 +11,11 @@ __all__.append("OpenAIRunner") except ImportError: pass + +# LangChain runner is only available if langchain packages are installed +try: + from agentexec.runners.langchain import LangChainRunner + + __all__.append("LangChainRunner") +except ImportError: + pass diff --git a/src/agentexec/runners/langchain.py b/src/agentexec/runners/langchain.py new file mode 100644 index 0000000..dc4f7f7 --- /dev/null +++ b/src/agentexec/runners/langchain.py @@ -0,0 +1,228 @@ +"""LangChain agent runner with activity tracking. + +Supports both classic AgentExecutor and LangGraph agents with automatic +max iterations handling via LangChain's built-in early_stopping_method. +""" + +import logging +import uuid +from typing import Any, AsyncIterator + +from langchain.agents import AgentExecutor +from langchain_core.tools import tool + +from agentexec.runners.base import BaseAgentRunner, _RunnerTools + +logger = logging.getLogger(__name__) + + +class _LangChainRunnerTools(_RunnerTools): + """LangChain-specific tools wrapper that decorates with @tool.""" + + @property + def report_status(self) -> Any: + """Get the status update tool wrapped with @tool decorator. + + Returns: + LangChain tool for status reporting. + """ + agent_id = self._agent_id + + @tool + def report_activity(message: str, percentage: int) -> str: + """Report progress and status updates. + + Use this tool to report your progress as you work through the task. + + Args: + message: A brief description of what you're currently doing + percentage: Your estimated completion percentage (0-100) + + Returns: + Confirmation message + """ + from agentexec import activity + + activity.update( + agent_id=agent_id, + message=message, + completion_percentage=percentage, + ) + return "Status updated" + + return report_activity + + +class LangChainRunner(BaseAgentRunner): + """Runner for LangChain agents with automatic activity tracking. + + This runner wraps LangChain's AgentExecutor and provides: + - Automatic agent_id generation + - Activity lifecycle management (QUEUED -> RUNNING -> COMPLETE/ERROR) + - Max iterations recovery with configurable wrap-up prompts + - Status update tool with agent_id pre-baked + - Support for streaming via astream_events + + Example: + from langchain_openai import ChatOpenAI + from langchain.agents import create_react_agent, AgentExecutor + from langchain import hub + + # Create LangChain agent + llm = ChatOpenAI(model="gpt-4") + prompt = hub.pull("hwchase17/react") + agent = create_react_agent(llm, tools, prompt) + agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True) + + # Wrap with agentexec runner + runner = agentexec.LangChainRunner( + agent_id=activity.id, + agent_executor=agent_executor, + max_turns_recovery=True, + ) + + # Execute with activity tracking + result = await runner.run("Research competitor products") + """ + + def __init__( + self, + agent_id: uuid.UUID, + agent_executor: AgentExecutor, + *, + max_turns_recovery: bool = False, + wrap_up_prompt: str | None = None, + recovery_turns: int = 5, + report_status_prompt: str | None = None, + ) -> None: + """Initialize the LangChain runner. + + Args: + agent_id: UUID for tracking this agent's activity. + agent_executor: LangChain AgentExecutor instance. + max_turns_recovery: Enable automatic recovery when max iterations exceeded. + When True, uses LangChain's early_stopping_method='generate' to have the + LLM generate a final answer when max iterations is reached. + wrap_up_prompt: Prompt to use for recovery run (not used with built-in method). + recovery_turns: Number of turns allowed for recovery (not used with built-in method). + report_status_prompt: Instruction snippet about using the status tool. + + Note: + LangChain's AgentExecutor has built-in max iterations handling via + early_stopping_method. When max_turns_recovery=True, we leverage this + instead of manually catching exceptions. The agent will automatically + call the LLM one final time to generate a summary when max_iterations + is reached. + """ + super().__init__( + agent_id, + max_turns_recovery=max_turns_recovery, + recovery_turns=recovery_turns, + wrap_up_prompt=wrap_up_prompt, + report_status_prompt=report_status_prompt, + ) + self.agent_executor = agent_executor + + # Configure early stopping for max iterations + # 'generate' calls LLM one final time to create answer (like wrap-up) + # 'force' just returns "Agent stopped due to iteration limit" + if max_turns_recovery and self.agent_executor.early_stopping_method == "force": + logger.info( + "Enabling early_stopping_method='generate' for max iterations recovery" + ) + self.agent_executor.early_stopping_method = "generate" + + # Override with LangChain-specific tools + self.tools = _LangChainRunnerTools(self.agent_id) + + # Inject status reporting tool into agent's tools if not already present + status_tool = self.tools.report_status + if status_tool not in self.agent_executor.tools: + self.agent_executor.tools.append(status_tool) + + async def run( + self, + input: str | dict[str, Any], + max_iterations: int = 15, + **kwargs: Any, + ) -> dict[str, Any]: + """Run the LangChain agent with automatic activity tracking. + + LangChain's AgentExecutor handles max iterations internally via + early_stopping_method. If max_turns_recovery=True was set during + initialization, the agent will automatically generate a final answer + when max_iterations is reached. + + Args: + input: User input/prompt for the agent. Can be a string or dict with "input" key. + max_iterations: Maximum number of agent iterations. + **kwargs: Additional arguments passed to AgentExecutor.ainvoke(). + + Returns: + Result from the agent execution. When max iterations is reached with + early_stopping_method='generate', the result will contain the LLM's + final generated answer. + + Raises: + Exception: If agent execution fails (non-max-iterations errors). + + Note: + Unlike OpenAI's MaxTurnsExceeded exception, LangChain's AgentExecutor + handles max iterations gracefully and returns a normal result, so no + exception handling is needed. + """ + # Normalize input to dict format expected by AgentExecutor + if isinstance(input, str): + agent_input = {"input": input} + else: + agent_input = input + + result = await self.agent_executor.ainvoke( + agent_input, + config={"max_iterations": max_iterations, **kwargs.get("config", {})}, + ) + return result + + async def run_streamed( + self, + input: str | dict[str, Any], + max_iterations: int = 15, + **kwargs: Any, + ) -> AsyncIterator[dict[str, Any]]: + """Run the LangChain agent in streaming mode with automatic activity tracking. + + This method uses LangChain's astream_events API to provide granular streaming + of agent execution, including intermediate steps, tool calls, and LLM responses. + + LangChain's AgentExecutor handles max iterations internally, so streaming + will continue until completion or max_iterations is reached (with early + stopping applied if configured). + + Args: + input: User input/prompt for the agent. Can be a string or dict with "input" key. + max_iterations: Maximum number of agent iterations. + **kwargs: Additional arguments passed to AgentExecutor.astream_events(). + + Yields: + Event dictionaries from the agent execution. + + Raises: + Exception: If agent execution fails (non-max-iterations errors). + + Example: + async for event in runner.run_streamed("Research topic"): + if event["event"] == "on_chat_model_stream": + print(event["data"]["chunk"].content, end="") + """ + # Normalize input to dict format expected by AgentExecutor + if isinstance(input, str): + agent_input = {"input": input} + else: + agent_input = input + + async for event in self.agent_executor.astream_events( + agent_input, + version="v2", + config={"max_iterations": max_iterations, **kwargs.get("config", {})}, + ): + yield event