From 12d286b1424b32e0ed3a10ef5a589974d334cb80 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 22 Nov 2025 15:53:53 +0000 Subject: [PATCH 1/2] Add LangChain agent runner support Implements LangChainRunner to support LangChain agents with the same orchestration capabilities as OpenAIRunner. Changes: - Add LangChainRunner class with support for AgentExecutor - Implement async invoke and streaming via astream_events - Add max iterations recovery similar to OpenAI's max turns - Include report_status tool injection for activity tracking - Add optional langchain dependencies to pyproject.toml - Create comprehensive LangChain example in examples/langchain-agents-fastapi/ - Update main README with LangChain documentation and examples The LangChainRunner provides the same features as OpenAIRunner: - Automatic activity tracking - Agent self-reporting via report_activity tool - Max iterations recovery with wrap-up prompts - Streaming support - Compatible with ReAct, tool-calling, and LangGraph agents --- README.md | 59 ++++- examples/langchain-agents-fastapi/README.md | 165 +++++++++++++ examples/langchain-agents-fastapi/alembic.ini | 148 ++++++++++++ .../langchain-agents-fastapi/alembic/README | 1 + .../langchain-agents-fastapi/alembic/env.py | 82 +++++++ .../alembic/script.py.mako | 28 +++ examples/langchain-agents-fastapi/main.py | 77 ++++++ examples/langchain-agents-fastapi/models.py | 9 + .../langchain-agents-fastapi/pyproject.toml | 33 +++ examples/langchain-agents-fastapi/tools.py | 48 ++++ examples/langchain-agents-fastapi/views.py | 85 +++++++ examples/langchain-agents-fastapi/worker.py | 126 ++++++++++ pyproject.toml | 6 + src/agentexec/runners/__init__.py | 8 + src/agentexec/runners/langchain.py | 228 ++++++++++++++++++ 15 files changed, 1099 insertions(+), 4 deletions(-) create mode 100644 examples/langchain-agents-fastapi/README.md create mode 100644 examples/langchain-agents-fastapi/alembic.ini create mode 100644 examples/langchain-agents-fastapi/alembic/README create mode 100644 examples/langchain-agents-fastapi/alembic/env.py create mode 100644 examples/langchain-agents-fastapi/alembic/script.py.mako create mode 100644 examples/langchain-agents-fastapi/main.py create mode 100644 examples/langchain-agents-fastapi/models.py create mode 100644 examples/langchain-agents-fastapi/pyproject.toml create mode 100644 examples/langchain-agents-fastapi/tools.py create mode 100644 examples/langchain-agents-fastapi/views.py create mode 100644 examples/langchain-agents-fastapi/worker.py create mode 100644 src/agentexec/runners/langchain.py diff --git a/README.md b/README.md index bb9946f..dc8cff9 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ # `agentexec` -**Production-ready orchestration for OpenAI Agents SDK** with Redis-backed task queues, SQLAlchemy activity tracking, and multiprocessing worker pools. +**Production-ready orchestration for AI agents** with Redis-backed task queues, SQLAlchemy activity tracking, and multiprocessing worker pools. -Build reliable, scalable AI agent applications with automatic lifecycle management, progress tracking, and fault tolerance. +Supports **OpenAI Agents SDK** and **LangChain** with automatic lifecycle management, progress tracking, and fault tolerance. Running AI agents in production requires more than just the SDK. You need: @@ -22,6 +22,7 @@ Running AI agents in production requires more than just the SDK. You need: - **Redis task queue** - Reliable job distribution with priority support - **Automatic activity tracking** - Full lifecycle management (QUEUED → RUNNING → COMPLETE/ERROR) - **OpenAI Agents integration** - Drop-in runner with max turns recovery +- **LangChain integration** - Support for ReAct, tool-calling, and LangGraph agents - **Agent self-reporting** - Built-in tools for agents to report progress - **SQLAlchemy-based storage** - Flexible database support (PostgreSQL, MySQL, SQLite) - **Type-safe** - Full type annotations with Pydantic schemas @@ -31,10 +32,18 @@ Running AI agents in production requires more than just the SDK. You need: ## Installation +**For OpenAI Agents SDK:** ```bash uv add agentexec ``` +**For LangChain:** +```bash +uv add "agentexec[langchain]" +# Also install your LLM provider, e.g.: +uv add langchain-openai # or langchain-anthropic, etc. +``` + **Requirements:** - Python 3.11+ - Redis (for task queue) @@ -180,7 +189,9 @@ ax.enqueue("batch_job", payload, priority=ax.Priority.LOW) --- -## Full Example: FastAPI Integration +## Full Examples: FastAPI Integration + +### OpenAI Agents SDK Example See **[examples/openai-agents-fastapi/](examples/openai-agents-fastapi/)** for a complete production application showing: @@ -191,6 +202,16 @@ See **[examples/openai-agents-fastapi/](examples/openai-agents-fastapi/)** for a - Real-time progress monitoring - Graceful shutdown with cleanup +### LangChain Example + +See **[examples/langchain-agents-fastapi/](examples/langchain-agents-fastapi/)** for a LangChain-based application demonstrating: + +- ReAct agent pattern with LangChain +- Integration with multiple LLM providers +- Streaming support via astream_events +- Custom tools with @tool decorator +- Same orchestration patterns as OpenAI example + --- ## Configuration @@ -257,6 +278,34 @@ result = await runner.run(agent, input="...", max_turns=15) result = await runner.run_streamed(agent, input="...", max_turns=15) ``` +### LangChain Runner + +```python +from langchain.agents import AgentExecutor, create_react_agent +from langchain_openai import ChatOpenAI + +# Create LangChain agent +llm = ChatOpenAI(model="gpt-4o-mini") +agent = create_react_agent(llm, tools, prompt) +agent_executor = AgentExecutor(agent=agent, tools=tools) + +# Wrap with agentexec runner +runner = ax.LangChainRunner( + agent_id=agent_id, + agent_executor=agent_executor, + max_turns_recovery=True, + wrap_up_prompt="Summarize...", +) + +# Run agent +result = await runner.run(input="...", max_iterations=15) + +# Streaming +async for event in runner.run_streamed(input="...", max_iterations=15): + if event["event"] == "on_chat_model_stream": + print(event["data"]["chunk"].content, end="") +``` + --- ## Architecture @@ -339,7 +388,9 @@ MIT License - see [LICENSE](LICENSE) for details ## Links -- **Documentation**: See example application in `examples/openai-agents-fastapi/` +- **Documentation**: See example applications + - OpenAI Agents SDK: `examples/openai-agents-fastapi/` + - LangChain: `examples/langchain-agents-fastapi/` - **Issues**: [GitHub Issues](https://github.com/Agent-CI/agentexec/issues) - **PyPI**: [agentexec](https://pypi.org/project/agentexec/) diff --git a/examples/langchain-agents-fastapi/README.md b/examples/langchain-agents-fastapi/README.md new file mode 100644 index 0000000..0135c4a --- /dev/null +++ b/examples/langchain-agents-fastapi/README.md @@ -0,0 +1,165 @@ +# LangChain Agents FastAPI Example + +This example demonstrates a complete FastAPI application using **agentexec** to orchestrate LangChain agents in production. + +## What This Example Demonstrates + +### Core Features + +- **Background worker pool** (`worker.py`) - Multi-process task execution with Redis queue +- **LangChainRunner integration** - Automatic activity tracking for agent lifecycle +- **ReAct agent pattern** - Using LangChain's create_react_agent for reasoning +- **Custom FastAPI routes** (`views.py`) - Building your own API on agentexec's public API +- **Database session management** (`main.py`) - Standard SQLAlchemy patterns with full control +- **Agent self-reporting** - Agents report progress via built-in `report_activity` tool +- **Max iterations recovery** - Automatic handling of conversation limits with wrap-up prompts + +### Key Patterns Shown + +**Task Registration:** +```python +@pool.task("research_company") +async def research_company(agent_id: UUID, payload: dict): + # Create LangChain agent + llm = ChatOpenAI(model="gpt-4o-mini") + agent = create_react_agent(llm, tools, prompt) + agent_executor = AgentExecutor(agent=agent, tools=tools) + + # Wrap with agentexec runner + runner = ax.LangChainRunner( + agent_id=agent_id, + agent_executor=agent_executor, + max_turns_recovery=True + ) + + # Execute with activity tracking + result = await runner.run(input_prompt, max_iterations=15) +``` + +**Activity Tracking API:** +```python +# List activities with pagination +ax.activity.list(db, page=1, page_size=50) + +# Get detailed activity with full log history +ax.activity.detail(db, agent_id) + +# Cleanup on shutdown +ax.activity.cancel_pending(db) +``` + +**Queueing Tasks:** +```python +task = ax.enqueue( + task_name="research_company", + payload={"company_name": "Acme"}, + priority=ax.Priority.HIGH, +) +``` + +## Quick Start + +```bash +# Install dependencies +cd examples/langchain-agents-fastapi +uv sync + +# Start Redis +docker run -d -p 6379:6379 redis:latest + +# Set API key +export OPENAI_API_KEY="your-key" + +# Run migrations +alembic upgrade head + +# Start worker (terminal 1) +python -m langchain_agents_fastapi.worker + +# Start API server (terminal 2) +uvicorn langchain_agents_fastapi.main:app --reload +``` + +## Try It + +Queue a task: +```bash +curl -X POST "http://localhost:8000/api/tasks" \ + -H "Content-Type: application/json" \ + -d '{ + "task_name": "research_company", + "payload": {"company_name": "Anthropic"} + }' +``` + +Monitor progress: +```bash +# List all activities +curl "http://localhost:8000/api/agents/activity" + +# Get specific agent details +curl "http://localhost:8000/api/agents/activity/{agent_id}" +``` + +## Configuration + +Set via environment variables: + +```bash +DATABASE_URL="sqlite:///agents.db" # or postgresql://... +REDIS_URL="redis://localhost:6379/0" +QUEUE_NAME="agentexec:tasks" +NUM_WORKERS="4" +OPENAI_API_KEY="sk-..." +``` + +## LangChain-Specific Features + +### Agent Types Supported + +This example demonstrates the **ReAct agent** pattern, but agentexec's LangChainRunner supports: + +- **ReAct agents** (`create_react_agent`) - Reasoning and acting +- **Tool-calling agents** (`create_tool_calling_agent`) - Structured tool use +- **Structured chat agents** (`create_structured_chat_agent`) - Multi-turn conversations +- **LangGraph agents** - Durable execution with state persistence + +### Streaming Support + +LangChain agents support streaming via `astream_events`: + +```python +async for event in runner.run_streamed(input_prompt, max_iterations=15): + if event["event"] == "on_chat_model_stream": + print(event["data"]["chunk"].content, end="") +``` + +### Custom Tools + +Tools in LangChain use the `@tool` decorator: + +```python +from langchain_core.tools import tool + +@tool +def search_company_info(company_name: str, query_type: str) -> str: + """Search for company information.""" + return f"Information about {company_name}..." +``` + +## Differences from OpenAI Agents SDK + +| Feature | OpenAI Agents SDK | LangChain | +|---------|------------------|-----------| +| Agent creation | `Agent(name, instructions, tools)` | `create_react_agent(llm, tools, prompt)` | +| Tool decorator | `@function_tool` | `@tool` | +| Execution | `Runner.run()` | `AgentExecutor.invoke()` | +| Streaming | `run_streamed()` | `astream_events()` | +| Max iterations | `MaxTurnsExceeded` exception | Max iterations in config | +| LLM providers | OpenAI only | OpenAI, Anthropic, local models, etc. | + +## Learn More + +- [LangChain Documentation](https://python.langchain.com/) +- [AgentExec Documentation](https://github.com/Agent-CI/agentexec) +- [ReAct Agent Pattern](https://python.langchain.com/docs/modules/agents/agent_types/react/) diff --git a/examples/langchain-agents-fastapi/alembic.ini b/examples/langchain-agents-fastapi/alembic.ini new file mode 100644 index 0000000..c014592 --- /dev/null +++ b/examples/langchain-agents-fastapi/alembic.ini @@ -0,0 +1,148 @@ +# A generic, single database configuration. + +[alembic] +# path to migration scripts. +# this is typically a path given in POSIX (e.g. forward slashes) +# format, relative to the token %(here)s which refers to the location of this +# ini file +script_location = %(here)s/alembic + +# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s +# Uncomment the line below if you want the files to be prepended with date and time +# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file +# for all available tokens +# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s + +# sys.path path, will be prepended to sys.path if present. +# defaults to the current working directory. for multiple paths, the path separator +# is defined by "path_separator" below. +prepend_sys_path = . + + +# timezone to use when rendering the date within the migration file +# as well as the filename. +# If specified, requires the tzdata library which can be installed by adding +# `alembic[tz]` to the pip requirements. +# string value is passed to ZoneInfo() +# leave blank for localtime +# timezone = + +# max length of characters to apply to the "slug" field +# truncate_slug_length = 40 + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + +# set to 'true' to allow .pyc and .pyo files without +# a source .py file to be detected as revisions in the +# versions/ directory +# sourceless = false + +# version location specification; This defaults +# to /versions. When using multiple version +# directories, initial revisions must be specified with --version-path. +# The path separator used here should be the separator specified by "path_separator" +# below. +# version_locations = %(here)s/bar:%(here)s/bat:%(here)s/alembic/versions + +# path_separator; This indicates what character is used to split lists of file +# paths, including version_locations and prepend_sys_path within configparser +# files such as alembic.ini. +# The default rendered in new alembic.ini files is "os", which uses os.pathsep +# to provide os-dependent path splitting. +# +# Note that in order to support legacy alembic.ini files, this default does NOT +# take place if path_separator is not present in alembic.ini. If this +# option is omitted entirely, fallback logic is as follows: +# +# 1. Parsing of the version_locations option falls back to using the legacy +# "version_path_separator" key, which if absent then falls back to the legacy +# behavior of splitting on spaces and/or commas. +# 2. Parsing of the prepend_sys_path option falls back to the legacy +# behavior of splitting on spaces, commas, or colons. +# +# Valid values for path_separator are: +# +# path_separator = : +# path_separator = ; +# path_separator = space +# path_separator = newline +# +# Use os.pathsep. Default configuration used for new projects. +path_separator = os + +# set to 'true' to search source files recursively +# in each "version_locations" directory +# new in Alembic version 1.10 +# recursive_version_locations = false + +# the output encoding used when revision files +# are written from script.py.mako +# output_encoding = utf-8 + +# database URL. This is consumed by the user-maintained env.py script only. +# other means of configuring database URLs may be customized within the env.py +# file. +# We configure the database URL in env.py from the DATABASE_URL environment variable +# sqlalchemy.url = driver://user:pass@localhost/dbname + + +[post_write_hooks] +# post_write_hooks defines scripts or Python functions that are run +# on newly generated revision scripts. See the documentation for further +# detail and examples + +# format using "black" - use the console_scripts runner, against the "black" entrypoint +# hooks = black +# black.type = console_scripts +# black.entrypoint = black +# black.options = -l 79 REVISION_SCRIPT_FILENAME + +# lint with attempts to fix using "ruff" - use the module runner, against the "ruff" module +# hooks = ruff +# ruff.type = module +# ruff.module = ruff +# ruff.options = check --fix REVISION_SCRIPT_FILENAME + +# Alternatively, use the exec runner to execute a binary found on your PATH +# hooks = ruff +# ruff.type = exec +# ruff.executable = ruff +# ruff.options = check --fix REVISION_SCRIPT_FILENAME + +# Logging configuration. This is also consumed by the user-maintained +# env.py script only. +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARNING +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARNING +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/examples/langchain-agents-fastapi/alembic/README b/examples/langchain-agents-fastapi/alembic/README new file mode 100644 index 0000000..98e4f9c --- /dev/null +++ b/examples/langchain-agents-fastapi/alembic/README @@ -0,0 +1 @@ +Generic single-database configuration. \ No newline at end of file diff --git a/examples/langchain-agents-fastapi/alembic/env.py b/examples/langchain-agents-fastapi/alembic/env.py new file mode 100644 index 0000000..44b8890 --- /dev/null +++ b/examples/langchain-agents-fastapi/alembic/env.py @@ -0,0 +1,82 @@ +import os +from logging.config import fileConfig + +from alembic import context + +# Import your application's models +from models import Base +from sqlalchemy import engine_from_config, pool + +import agentexec as ax + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# Get database URL from environment (same pattern as the app) +DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///agents.db") +config.set_main_option("sqlalchemy.url", DATABASE_URL) + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +# Combine metadata from both your app and agent-runner +# This allows Alembic to manage tables from both sources +target_metadata = [Base.metadata, ax.Base.metadata] + +# other values from the config, defined by the needs of env.py, +# can be acquired: +# my_important_option = config.get_main_option("my_important_option") +# ... etc. + + +def run_migrations_offline() -> None: + """Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + + """ + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online() -> None: + """Run migrations in 'online' mode. + + In this scenario we need to create an Engine + and associate a connection with the context. + + """ + connectable = engine_from_config( + config.get_section(config.config_ini_section, {}), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + + with connectable.connect() as connection: + context.configure(connection=connection, target_metadata=target_metadata) + + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/examples/langchain-agents-fastapi/alembic/script.py.mako b/examples/langchain-agents-fastapi/alembic/script.py.mako new file mode 100644 index 0000000..1101630 --- /dev/null +++ b/examples/langchain-agents-fastapi/alembic/script.py.mako @@ -0,0 +1,28 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision: str = ${repr(up_revision)} +down_revision: Union[str, Sequence[str], None] = ${repr(down_revision)} +branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)} +depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)} + + +def upgrade() -> None: + """Upgrade schema.""" + ${upgrades if upgrades else "pass"} + + +def downgrade() -> None: + """Downgrade schema.""" + ${downgrades if downgrades else "pass"} diff --git a/examples/langchain-agents-fastapi/main.py b/examples/langchain-agents-fastapi/main.py new file mode 100644 index 0000000..aab9509 --- /dev/null +++ b/examples/langchain-agents-fastapi/main.py @@ -0,0 +1,77 @@ +"""FastAPI application demonstrating agentexec integration with LangChain.""" + +import os +from collections.abc import Generator +from contextlib import asynccontextmanager + +from fastapi import FastAPI +from sqlalchemy import create_engine +from sqlalchemy.orm import Session, sessionmaker +import agentexec as ax + +from .views import router + +# Database setup - users manage their own connection +# For PostgreSQL: "postgresql://user:password@localhost:5432/dbname" +# For SQLite: "sqlite:///agents.db" +DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///agents.db") + +# Create engine and session factory (standard SQLAlchemy setup) +engine = create_engine(DATABASE_URL, echo=False) +SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + + +# Dependency: Get database session (standard FastAPI pattern) +def get_db() -> Generator[Session, None, None]: + """Provide a database session for each request. + + This is the standard FastAPI pattern for database session management. + Users have full control over connection pooling, timeouts, etc. + """ + db = SessionLocal() + try: + yield db + db.commit() + except Exception: + db.rollback() + raise + finally: + db.close() + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Application lifespan: setup and teardown.""" + print("✓ Activity tracking configured") + print(f"✓ Redis URL: {ax.CONF.redis_url}") + print(f"✓ Queue name: {ax.CONF.queue_name}") + print(f"✓ Number of workers: {ax.CONF.num_workers}") + + yield + + # Cleanup: cancel any pending agents + with SessionLocal() as db: + try: + canceled = ax.activity.cancel_pending(db) + db.commit() + print(f"✓ Canceled {canceled} pending agents") + except Exception as e: + db.rollback() + print(f"✗ Error canceling pending agents: {e}") + + +# Create FastAPI app +app = FastAPI( + title="AgentExec LangChain Example", + description="Example FastAPI application using agentexec with LangChain agents", + version="1.0.0", + lifespan=lifespan, +) + +app.include_router(router) + + +if __name__ == "__main__": + import uvicorn + + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/examples/langchain-agents-fastapi/models.py b/examples/langchain-agents-fastapi/models.py new file mode 100644 index 0000000..f4450c7 --- /dev/null +++ b/examples/langchain-agents-fastapi/models.py @@ -0,0 +1,9 @@ +"""Database models for the example application.""" + +from sqlalchemy.orm import DeclarativeBase + + +class Base(DeclarativeBase): + """Base class for all database models in this application.""" + + pass diff --git a/examples/langchain-agents-fastapi/pyproject.toml b/examples/langchain-agents-fastapi/pyproject.toml new file mode 100644 index 0000000..1a9dbc5 --- /dev/null +++ b/examples/langchain-agents-fastapi/pyproject.toml @@ -0,0 +1,33 @@ +[project] +name = "langchain-agents-fastapi" +version = "0.1.0" +description = "Example FastAPI application demonstrating agentexec with LangChain" +readme = "README.md" +requires-python = ">=3.11" +license = { text = "MIT" } + +dependencies = [ + # AgentExec for orchestration + "agentexec[langchain]>=0.1.0", + # LangChain for AI agent functionality + "langchain>=0.3.0", + "langchain-core>=0.3.0", + "langchain-openai>=0.2.0", + "langgraph>=0.2.0", + # FastAPI and server + "fastapi>=0.121.0", + "uvicorn[standard]>=0.27.0", + # Database migrations + "alembic>=1.13.0", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.uv] +dev-dependencies = [ + "pytest>=8.0.0", + "pytest-asyncio>=0.23.0", + "httpx>=0.27.0", # For testing FastAPI +] diff --git a/examples/langchain-agents-fastapi/tools.py b/examples/langchain-agents-fastapi/tools.py new file mode 100644 index 0000000..e43d8f9 --- /dev/null +++ b/examples/langchain-agents-fastapi/tools.py @@ -0,0 +1,48 @@ +"""LangChain tools for company research.""" + +from langchain_core.tools import tool + + +@tool +def search_company_info(company_name: str, query_type: str) -> str: + """Search for company information. + + Args: + company_name: The name of the company to search for + query_type: Type of information to search for (financial, news, products, team) + + Returns: + Relevant information about the company + """ + # In a real implementation, this would call actual APIs + # For demo purposes, return simulated data + if query_type == "financial": + return f"{company_name} reported $100M revenue last quarter with 20% YoY growth." + elif query_type == "news": + return f"{company_name} recently announced a new product launch and expansion plans." + elif query_type == "products": + return f"{company_name} offers SaaS solutions for enterprise data management." + elif query_type == "team": + return f"{company_name} has 500+ employees across 5 offices globally." + return f"General information about {company_name}" + + +@tool +def analyze_financial_data(company_name: str, metrics: list[str]) -> dict: + """Analyze financial metrics for a company. + + Args: + company_name: The name of the company + metrics: List of metrics to analyze (revenue, profit, growth_rate) + + Returns: + Dictionary with analyzed metrics + """ + # Simulated financial analysis + return { + "company": company_name, + "revenue": "$100M", + "profit_margin": "25%", + "growth_rate": "20% YoY", + "outlook": "positive", + } diff --git a/examples/langchain-agents-fastapi/views.py b/examples/langchain-agents-fastapi/views.py new file mode 100644 index 0000000..9ba72be --- /dev/null +++ b/examples/langchain-agents-fastapi/views.py @@ -0,0 +1,85 @@ +"""API routes for the LangChain example application.""" + +import uuid + +from fastapi import APIRouter, Depends, HTTPException, Query +from pydantic import BaseModel +from sqlalchemy.orm import Session +import agentexec as ax + +from main import get_db + + +router = APIRouter() + + +class TaskRequest(BaseModel): + """Request to queue a new task.""" + + task_name: str + payload: dict + priority: ax.Priority = ax.Priority.LOW + + +class TaskResponse(BaseModel): + """Response after queuing a task.""" + + agent_id: uuid.UUID + message: str + + +@router.post( + "/api/tasks", + response_model=TaskResponse, +) +async def queue_task(request: TaskRequest, db: Session = Depends(get_db)): + """Queue a new background task. + + The task will be picked up by a worker and executed asynchronously. + Use the agent_id to track progress via the activity endpoints. + """ + task = ax.enqueue( + request.task_name, + request.payload, + priority=request.priority, + ) + + return TaskResponse( + agent_id=task.agent_id, + message=f"Task queued successfully. Track progress at /api/agents/activity/{task.agent_id}", + ) + + +@router.get( + "/api/agents/activity", + response_model=ax.activity.ActivityListSchema, +) +async def list_agents( + page: int = Query(1, ge=1, description="Page number"), + page_size: int = Query(50, ge=1, le=100, description="Items per page"), + db: Session = Depends(get_db), +): + """List all activities with pagination. + + Uses agentexec's public API: activity.list() + The database session is automatically managed by FastAPI dependency injection. + """ + return ax.activity.list(db, page=page, page_size=page_size) + + +@router.get( + "/api/agents/activity/{agent_id}", + response_model=ax.activity.ActivityDetailSchema, +) +async def get_agent(agent_id: str, db: Session = Depends(get_db)): + """Get detailed information about a specific agent including full log history. + + Uses agentexec's public API: activity.detail() + The database session is automatically managed by FastAPI dependency injection. + """ + activity_obj = ax.activity.detail(db, agent_id) + + if not activity_obj: + raise HTTPException(status_code=404, detail=f"Agent {agent_id} not found") + + return activity_obj diff --git a/examples/langchain-agents-fastapi/worker.py b/examples/langchain-agents-fastapi/worker.py new file mode 100644 index 0000000..8e6abf5 --- /dev/null +++ b/examples/langchain-agents-fastapi/worker.py @@ -0,0 +1,126 @@ +"""Worker process for running LangChain agents with agentexec.""" + +from uuid import UUID + +from langchain.agents import AgentExecutor, create_react_agent +from langchain_core.prompts import PromptTemplate +from langchain_openai import ChatOpenAI +from sqlalchemy.orm import Session +import agentexec as ax + +from .main import engine +from .tools import analyze_financial_data, search_company_info + + +pool = ax.WorkerPool(engine=engine) + + +@pool.task("research_company") +async def research_company(agent_id: UUID, payload: dict): + """Research a company using a LangChain ReAct agent with tools. + + This demonstrates: + - Using LangChain with ReAct agent pattern + - Automatic activity tracking via LangChainRunner + - Agent self-reporting progress via report_activity tool + """ + company_name = payload.get("company_name", "Unknown Company") + input_prompt = payload.get("input_prompt", f"Research the company {company_name}.") + + # Initialize the LLM + llm = ChatOpenAI(model="gpt-4o-mini", temperature=0) + + # Define the tools available to the agent + tools = [ + search_company_info, + analyze_financial_data, + ] + + # Create a ReAct prompt template + template = """You are a thorough company research analyst. +Research {company_name} and provide a comprehensive report covering: +- Financial performance and metrics +- Recent news and developments +- Products and services offered +- Team and organizational structure + +Use the available tools to gather information and synthesize a detailed report. + +IMPORTANT: Report your progress regularly using the report_activity tool. +Call report_activity(message, percentage) to update on your current task. +Always report your current activity before starting a new step. +Include a brief message about the task and percentage completion (0-100). + +You have access to the following tools: + +{tools} + +Use the following format: + +Question: the input question you must answer +Thought: you should always think about what to do +Action: the action to take, should be one of [{tool_names}] +Action Input: the input to the action +Observation: the result of the action +... (this Thought/Action/Action Input/Observation can repeat N times) +Thought: I now know the final answer +Final Answer: the final answer to the original input question + +Begin! + +Question: {input} +Thought:{agent_scratchpad}""" + + prompt = PromptTemplate.from_template(template) + prompt = prompt.partial(company_name=company_name) + + # Create the ReAct agent + agent = create_react_agent(llm, tools, prompt) + agent_executor = AgentExecutor( + agent=agent, + tools=tools, + verbose=True, + handle_parsing_errors=True, + ) + + # Wrap with agentexec runner for activity tracking + runner = ax.LangChainRunner( + agent_id=agent_id, + agent_executor=agent_executor, + max_turns_recovery=True, + wrap_up_prompt="Please summarize your findings and provide a final report.", + ) + + # Execute the agent + result = await runner.run( + input=input_prompt, + max_iterations=15, + ) + + # Extract the output + report = result.get("output", "No output generated") + + print(f"✓ Completed research for {company_name} (agent_id: {agent_id})") + print(f"Report preview: {report[:200]}...") + + +if __name__ == "__main__": + print("Starting LangChain agent worker pool...") + print(f"Workers: {ax.CONF.num_workers}") + print(f"Queue: {ax.CONF.queue_name}") + print("Press Ctrl+C to shutdown gracefully") + + try: + pool.start() + + except KeyboardInterrupt: + print("\nShutting down worker pool...") + pool.shutdown() + + # Cancel any pending tasks + with Session(engine) as session: + canceled = ax.activity.cancel_pending(session) + session.commit() + print(f"Canceled {canceled} pending agents") + + print("Worker pool stopped.") diff --git a/pyproject.toml b/pyproject.toml index 5b70c1f..fe14d0e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,6 +27,12 @@ dependencies = [ "openai-agents>=0.1.0", ] +[project.optional-dependencies] +langchain = [ + "langchain>=0.3.0", + "langchain-core>=0.3.0", +] + [project.urls] Homepage = "https://github.com/Agent-CI/agentexec" diff --git a/src/agentexec/runners/__init__.py b/src/agentexec/runners/__init__.py index d1af0fd..4e969f5 100644 --- a/src/agentexec/runners/__init__.py +++ b/src/agentexec/runners/__init__.py @@ -11,3 +11,11 @@ __all__.append("OpenAIRunner") except ImportError: pass + +# LangChain runner is only available if langchain packages are installed +try: + from agentexec.runners.langchain import LangChainRunner + + __all__.append("LangChainRunner") +except ImportError: + pass diff --git a/src/agentexec/runners/langchain.py b/src/agentexec/runners/langchain.py new file mode 100644 index 0000000..4d2da14 --- /dev/null +++ b/src/agentexec/runners/langchain.py @@ -0,0 +1,228 @@ +"""LangChain agent runner with activity tracking.""" + +import logging +import uuid +from typing import Any, AsyncIterator + +from langchain.agents import AgentExecutor +from langchain_core.tools import tool + +from agentexec.runners.base import BaseAgentRunner, _RunnerTools + +logger = logging.getLogger(__name__) + + +class _LangChainRunnerTools(_RunnerTools): + """LangChain-specific tools wrapper that decorates with @tool.""" + + @property + def report_status(self) -> Any: + """Get the status update tool wrapped with @tool decorator. + + Returns: + LangChain tool for status reporting. + """ + agent_id = self._agent_id + + @tool + def report_activity(message: str, percentage: int) -> str: + """Report progress and status updates. + + Use this tool to report your progress as you work through the task. + + Args: + message: A brief description of what you're currently doing + percentage: Your estimated completion percentage (0-100) + + Returns: + Confirmation message + """ + from agentexec import activity + + activity.update( + agent_id=agent_id, + message=message, + completion_percentage=percentage, + ) + return "Status updated" + + return report_activity + + +class LangChainRunner(BaseAgentRunner): + """Runner for LangChain agents with automatic activity tracking. + + This runner wraps LangChain's AgentExecutor and provides: + - Automatic agent_id generation + - Activity lifecycle management (QUEUED -> RUNNING -> COMPLETE/ERROR) + - Max iterations recovery with configurable wrap-up prompts + - Status update tool with agent_id pre-baked + - Support for streaming via astream_events + + Example: + from langchain_openai import ChatOpenAI + from langchain.agents import create_react_agent, AgentExecutor + from langchain import hub + + # Create LangChain agent + llm = ChatOpenAI(model="gpt-4") + prompt = hub.pull("hwchase17/react") + agent = create_react_agent(llm, tools, prompt) + agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True) + + # Wrap with agentexec runner + runner = agentexec.LangChainRunner( + agent_id=activity.id, + agent_executor=agent_executor, + max_turns_recovery=True, + ) + + # Execute with activity tracking + result = await runner.run("Research competitor products") + """ + + def __init__( + self, + agent_id: uuid.UUID, + agent_executor: AgentExecutor, + *, + max_turns_recovery: bool = False, + wrap_up_prompt: str | None = None, + recovery_turns: int = 5, + report_status_prompt: str | None = None, + ) -> None: + """Initialize the LangChain runner. + + Args: + agent_id: UUID for tracking this agent's activity. + agent_executor: LangChain AgentExecutor instance. + max_turns_recovery: Enable automatic recovery when max iterations exceeded. + wrap_up_prompt: Prompt to use for recovery run. + recovery_turns: Number of turns allowed for recovery. + report_status_prompt: Instruction snippet about using the status tool. + """ + super().__init__( + agent_id, + max_turns_recovery=max_turns_recovery, + recovery_turns=recovery_turns, + wrap_up_prompt=wrap_up_prompt, + report_status_prompt=report_status_prompt, + ) + self.agent_executor = agent_executor + + # Override with LangChain-specific tools + self.tools = _LangChainRunnerTools(self.agent_id) + + # Inject status reporting tool into agent's tools if not already present + status_tool = self.tools.report_status + if status_tool not in self.agent_executor.tools: + self.agent_executor.tools.append(status_tool) + + async def run( + self, + input: str | dict[str, Any], + max_iterations: int = 15, + **kwargs: Any, + ) -> dict[str, Any]: + """Run the LangChain agent with automatic activity tracking. + + Args: + input: User input/prompt for the agent. Can be a string or dict with "input" key. + max_iterations: Maximum number of agent iterations. + **kwargs: Additional arguments passed to AgentExecutor.ainvoke(). + + Returns: + Result from the agent execution. + + Raises: + Exception: If agent execution fails. + """ + # Normalize input to dict format expected by AgentExecutor + if isinstance(input, str): + agent_input = {"input": input} + else: + agent_input = input + + try: + result = await self.agent_executor.ainvoke( + agent_input, + config={"max_iterations": max_iterations, **kwargs.get("config", {})}, + ) + return result + except Exception as e: + # Check if this is a max iterations error + if "iterations" in str(e).lower() and self.max_turns_recovery: + logger.info("Max iterations exceeded, attempting recovery") + + # Create recovery input with wrap-up prompt + recovery_input = { + "input": self.prompts.wrap_up, + "chat_history": agent_input.get("chat_history", []), + } + + result = await self.agent_executor.ainvoke( + recovery_input, + config={"max_iterations": self.recovery_turns}, + ) + return result + raise + + async def run_streamed( + self, + input: str | dict[str, Any], + max_iterations: int = 15, + **kwargs: Any, + ) -> AsyncIterator[dict[str, Any]]: + """Run the LangChain agent in streaming mode with automatic activity tracking. + + This method uses LangChain's astream_events API to provide granular streaming + of agent execution, including intermediate steps, tool calls, and LLM responses. + + Args: + input: User input/prompt for the agent. Can be a string or dict with "input" key. + max_iterations: Maximum number of agent iterations. + **kwargs: Additional arguments passed to AgentExecutor.astream_events(). + + Yields: + Event dictionaries from the agent execution. + + Raises: + Exception: If agent execution fails. + + Example: + async for event in runner.run_streamed("Research topic"): + if event["event"] == "on_chat_model_stream": + print(event["data"]["chunk"].content, end="") + """ + # Normalize input to dict format expected by AgentExecutor + if isinstance(input, str): + agent_input = {"input": input} + else: + agent_input = input + + try: + async for event in self.agent_executor.astream_events( + agent_input, + version="v2", + config={"max_iterations": max_iterations, **kwargs.get("config", {})}, + ): + yield event + except Exception as e: + # Check if this is a max iterations error + if "iterations" in str(e).lower() and self.max_turns_recovery: + logger.info("Max iterations exceeded, attempting recovery in streaming mode") + + # Create recovery input with wrap-up prompt + recovery_input = { + "input": self.prompts.wrap_up, + "chat_history": agent_input.get("chat_history", []), + } + + async for event in self.agent_executor.astream_events( + recovery_input, + version="v2", + config={"max_iterations": self.recovery_turns}, + ): + yield event + else: + raise From aefba607223f24e4a6937bea67186b94d6da8c11 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 1 Dec 2025 07:55:58 +0000 Subject: [PATCH 2/2] Update LangChain runner to use built-in max iterations handling Refactored LangChain integration to align with: 1. Latest agentexec architecture (v0.1.0+ changes) 2. LangChain's native early_stopping_method feature 3. Latest LangChain conventions and best practices Changes to LangChain Runner: - Leverage LangChain's early_stopping_method='generate' instead of manual exception handling - Simplified run() and run_streamed() methods - no try/except needed - AgentExecutor handles max iterations gracefully without throwing exceptions - Better documentation explaining the difference from OpenAI's approach - Added notes about LangGraph's GraphRecursionError for future support Changes to LangChain Example: - Updated to match new agentexec architecture with typed contexts - Context now uses Pydantic BaseModel (ResearchCompanyContext) - Task handlers signature: async def handler(agent_id, context) -> Result - Added typed return values (ResearchCompanyResult) - Created context.py and db.py following OpenAI example pattern - Updated views.py to use typed context instead of generic payload dict - Updated main.py to use new imports and patterns - Changed pool.start()/shutdown() to pool.run() - Updated README with corrected examples and curl commands Key Improvements: - Defers to LangChain's built-in max iterations recovery - Cleaner, simpler code without manual error handling - Better alignment with LangChain conventions - Matches latest agentexec patterns from main branch Sources: - https://python.langchain.com/docs/modules/agents/how_to/max_iterations/ - https://python.langchain.com/api_reference/langchain/agents/langchain.agents.agent.AgentExecutor.html --- examples/langchain-agents-fastapi/README.md | 21 ++-- examples/langchain-agents-fastapi/context.py | 21 ++++ examples/langchain-agents-fastapi/db.py | 34 ++++++ examples/langchain-agents-fastapi/main.py | 34 +----- examples/langchain-agents-fastapi/views.py | 38 +++++-- examples/langchain-agents-fastapi/worker.py | 60 ++++++---- src/agentexec/runners/langchain.py | 110 +++++++++---------- 7 files changed, 189 insertions(+), 129 deletions(-) create mode 100644 examples/langchain-agents-fastapi/context.py create mode 100644 examples/langchain-agents-fastapi/db.py diff --git a/examples/langchain-agents-fastapi/README.md b/examples/langchain-agents-fastapi/README.md index 0135c4a..daad649 100644 --- a/examples/langchain-agents-fastapi/README.md +++ b/examples/langchain-agents-fastapi/README.md @@ -19,7 +19,10 @@ This example demonstrates a complete FastAPI application using **agentexec** to **Task Registration:** ```python @pool.task("research_company") -async def research_company(agent_id: UUID, payload: dict): +async def research_company( + agent_id: UUID, + context: ResearchCompanyContext, +) -> ResearchCompanyResult: # Create LangChain agent llm = ChatOpenAI(model="gpt-4o-mini") agent = create_react_agent(llm, tools, prompt) @@ -33,7 +36,8 @@ async def research_company(agent_id: UUID, payload: dict): ) # Execute with activity tracking - result = await runner.run(input_prompt, max_iterations=15) + result = await runner.run(context.input_prompt, max_iterations=15) + return ResearchCompanyResult(summary=result["output"]) ``` **Activity Tracking API:** @@ -50,9 +54,10 @@ ax.activity.cancel_pending(db) **Queueing Tasks:** ```python -task = ax.enqueue( - task_name="research_company", - payload={"company_name": "Acme"}, +context = ResearchCompanyContext(company_name="Acme") +task = await ax.enqueue( + "research_company", + context, priority=ax.Priority.HIGH, ) ``` @@ -84,11 +89,11 @@ uvicorn langchain_agents_fastapi.main:app --reload Queue a task: ```bash -curl -X POST "http://localhost:8000/api/tasks" \ +curl -X POST "http://localhost:8000/api/tasks/research_company" \ -H "Content-Type: application/json" \ -d '{ - "task_name": "research_company", - "payload": {"company_name": "Anthropic"} + "company_name": "Anthropic", + "input_prompt": "Focus on their AI safety research" }' ``` diff --git a/examples/langchain-agents-fastapi/context.py b/examples/langchain-agents-fastapi/context.py new file mode 100644 index 0000000..9ec136e --- /dev/null +++ b/examples/langchain-agents-fastapi/context.py @@ -0,0 +1,21 @@ +"""Context classes for LangChain Agents FastAPI example. + +These Pydantic models provide type-safe context for background tasks. +""" + +from pydantic import BaseModel, Field + + +class ResearchCompanyContext(BaseModel): + """Context for company research tasks.""" + + company_name: str = Field(..., min_length=1, description="Name of the company to research") + input_prompt: str | None = Field(None, description="Custom research prompt (optional)") + + class Config: + json_schema_extra = { + "example": { + "company_name": "Anthropic", + "input_prompt": "Focus on their AI safety research and product offerings", + } + } diff --git a/examples/langchain-agents-fastapi/db.py b/examples/langchain-agents-fastapi/db.py new file mode 100644 index 0000000..f1b5486 --- /dev/null +++ b/examples/langchain-agents-fastapi/db.py @@ -0,0 +1,34 @@ +"""Database configuration and utilities for LangChain example.""" + +import os +from collections.abc import Generator + +from sqlalchemy import create_engine +from sqlalchemy.orm import Session, sessionmaker + +# Database setup - users manage their own connection +# For PostgreSQL: "postgresql://user:password@localhost:5432/dbname" +# For SQLite: "sqlite:///agents.db" +DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///agents.db") + +# Create engine and session factory (standard SQLAlchemy setup) +engine = create_engine(DATABASE_URL, echo=False) +SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + + +# Dependency: Get database session (standard FastAPI pattern) +def get_db() -> Generator[Session, None, None]: + """Provide a database session for each request. + + This is the standard FastAPI pattern for database session management. + Users have full control over connection pooling, timeouts, etc. + """ + db = SessionLocal() + try: + yield db + db.commit() + except Exception: + db.rollback() + raise + finally: + db.close() diff --git a/examples/langchain-agents-fastapi/main.py b/examples/langchain-agents-fastapi/main.py index aab9509..6342743 100644 --- a/examples/langchain-agents-fastapi/main.py +++ b/examples/langchain-agents-fastapi/main.py @@ -1,42 +1,12 @@ """FastAPI application demonstrating agentexec integration with LangChain.""" -import os -from collections.abc import Generator from contextlib import asynccontextmanager from fastapi import FastAPI -from sqlalchemy import create_engine -from sqlalchemy.orm import Session, sessionmaker import agentexec as ax -from .views import router - -# Database setup - users manage their own connection -# For PostgreSQL: "postgresql://user:password@localhost:5432/dbname" -# For SQLite: "sqlite:///agents.db" -DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///agents.db") - -# Create engine and session factory (standard SQLAlchemy setup) -engine = create_engine(DATABASE_URL, echo=False) -SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) - - -# Dependency: Get database session (standard FastAPI pattern) -def get_db() -> Generator[Session, None, None]: - """Provide a database session for each request. - - This is the standard FastAPI pattern for database session management. - Users have full control over connection pooling, timeouts, etc. - """ - db = SessionLocal() - try: - yield db - db.commit() - except Exception: - db.rollback() - raise - finally: - db.close() +from db import SessionLocal +from views import router @asynccontextmanager diff --git a/examples/langchain-agents-fastapi/views.py b/examples/langchain-agents-fastapi/views.py index 9ba72be..3a4311f 100644 --- a/examples/langchain-agents-fastapi/views.py +++ b/examples/langchain-agents-fastapi/views.py @@ -7,17 +7,18 @@ from sqlalchemy.orm import Session import agentexec as ax -from main import get_db +from context import ResearchCompanyContext +from db import get_db router = APIRouter() -class TaskRequest(BaseModel): - """Request to queue a new task.""" +class ResearchCompanyRequest(BaseModel): + """Request to queue a company research task.""" - task_name: str - payload: dict + company_name: str + input_prompt: str | None = None priority: ax.Priority = ax.Priority.LOW @@ -29,18 +30,33 @@ class TaskResponse(BaseModel): @router.post( - "/api/tasks", + "/api/tasks/research_company", response_model=TaskResponse, ) -async def queue_task(request: TaskRequest, db: Session = Depends(get_db)): - """Queue a new background task. +async def queue_research_company( + request: ResearchCompanyRequest, + db: Session = Depends(get_db), +): + """Queue a company research task. The task will be picked up by a worker and executed asynchronously. Use the agent_id to track progress via the activity endpoints. + + Example request body: + { + "company_name": "Anthropic", + "input_prompt": "Focus on AI safety research", + "priority": "low" + } """ - task = ax.enqueue( - request.task_name, - request.payload, + context = ResearchCompanyContext( + company_name=request.company_name, + input_prompt=request.input_prompt, + ) + + task = await ax.enqueue( + "research_company", + context, priority=request.priority, ) diff --git a/examples/langchain-agents-fastapi/worker.py b/examples/langchain-agents-fastapi/worker.py index 8e6abf5..89142f8 100644 --- a/examples/langchain-agents-fastapi/worker.py +++ b/examples/langchain-agents-fastapi/worker.py @@ -5,27 +5,44 @@ from langchain.agents import AgentExecutor, create_react_agent from langchain_core.prompts import PromptTemplate from langchain_openai import ChatOpenAI -from sqlalchemy.orm import Session +from pydantic import BaseModel import agentexec as ax -from .main import engine -from .tools import analyze_financial_data, search_company_info +from context import ResearchCompanyContext +from db import engine +from tools import analyze_financial_data, search_company_info + + +class ResearchCompanyResult(BaseModel): + """Result from company research task.""" + + summary: str + financial_performance: str | None = None + recent_news: str | None = None + products_services: str | None = None + team_structure: str | None = None pool = ax.WorkerPool(engine=engine) @pool.task("research_company") -async def research_company(agent_id: UUID, payload: dict): +async def research_company( + agent_id: UUID, + context: ResearchCompanyContext, +) -> ResearchCompanyResult: """Research a company using a LangChain ReAct agent with tools. This demonstrates: - Using LangChain with ReAct agent pattern - Automatic activity tracking via LangChainRunner - Agent self-reporting progress via report_activity tool + - Type-safe context object (automatically deserialized from queue) + - Typed result return value """ - company_name = payload.get("company_name", "Unknown Company") - input_prompt = payload.get("input_prompt", f"Research the company {company_name}.") + # Type-safe context access with IDE autocomplete! + company_name = context.company_name + input_prompt = context.input_prompt or f"Research the company {company_name}." # Initialize the LLM llm = ChatOpenAI(model="gpt-4o-mini", temperature=0) @@ -97,11 +114,20 @@ async def research_company(agent_id: UUID, payload: dict): max_iterations=15, ) - # Extract the output - report = result.get("output", "No output generated") + # Extract the output from LangChain's result dict + output = result.get("output", "No output generated") print(f"✓ Completed research for {company_name} (agent_id: {agent_id})") - print(f"Report preview: {report[:200]}...") + print(f"Report preview: {output[:200]}...") + + # Return typed result + return ResearchCompanyResult( + summary=output, + financial_performance="See report", + recent_news="See report", + products_services="See report", + team_structure="See report", + ) if __name__ == "__main__": @@ -110,17 +136,5 @@ async def research_company(agent_id: UUID, payload: dict): print(f"Queue: {ax.CONF.queue_name}") print("Press Ctrl+C to shutdown gracefully") - try: - pool.start() - - except KeyboardInterrupt: - print("\nShutting down worker pool...") - pool.shutdown() - - # Cancel any pending tasks - with Session(engine) as session: - canceled = ax.activity.cancel_pending(session) - session.commit() - print(f"Canceled {canceled} pending agents") - - print("Worker pool stopped.") + # run() blocks and handles log streaming from workers + pool.run() diff --git a/src/agentexec/runners/langchain.py b/src/agentexec/runners/langchain.py index 4d2da14..dc4f7f7 100644 --- a/src/agentexec/runners/langchain.py +++ b/src/agentexec/runners/langchain.py @@ -1,4 +1,8 @@ -"""LangChain agent runner with activity tracking.""" +"""LangChain agent runner with activity tracking. + +Supports both classic AgentExecutor and LangGraph agents with automatic +max iterations handling via LangChain's built-in early_stopping_method. +""" import logging import uuid @@ -97,9 +101,18 @@ def __init__( agent_id: UUID for tracking this agent's activity. agent_executor: LangChain AgentExecutor instance. max_turns_recovery: Enable automatic recovery when max iterations exceeded. - wrap_up_prompt: Prompt to use for recovery run. - recovery_turns: Number of turns allowed for recovery. + When True, uses LangChain's early_stopping_method='generate' to have the + LLM generate a final answer when max iterations is reached. + wrap_up_prompt: Prompt to use for recovery run (not used with built-in method). + recovery_turns: Number of turns allowed for recovery (not used with built-in method). report_status_prompt: Instruction snippet about using the status tool. + + Note: + LangChain's AgentExecutor has built-in max iterations handling via + early_stopping_method. When max_turns_recovery=True, we leverage this + instead of manually catching exceptions. The agent will automatically + call the LLM one final time to generate a summary when max_iterations + is reached. """ super().__init__( agent_id, @@ -110,6 +123,15 @@ def __init__( ) self.agent_executor = agent_executor + # Configure early stopping for max iterations + # 'generate' calls LLM one final time to create answer (like wrap-up) + # 'force' just returns "Agent stopped due to iteration limit" + if max_turns_recovery and self.agent_executor.early_stopping_method == "force": + logger.info( + "Enabling early_stopping_method='generate' for max iterations recovery" + ) + self.agent_executor.early_stopping_method = "generate" + # Override with LangChain-specific tools self.tools = _LangChainRunnerTools(self.agent_id) @@ -126,16 +148,28 @@ async def run( ) -> dict[str, Any]: """Run the LangChain agent with automatic activity tracking. + LangChain's AgentExecutor handles max iterations internally via + early_stopping_method. If max_turns_recovery=True was set during + initialization, the agent will automatically generate a final answer + when max_iterations is reached. + Args: input: User input/prompt for the agent. Can be a string or dict with "input" key. max_iterations: Maximum number of agent iterations. **kwargs: Additional arguments passed to AgentExecutor.ainvoke(). Returns: - Result from the agent execution. + Result from the agent execution. When max iterations is reached with + early_stopping_method='generate', the result will contain the LLM's + final generated answer. Raises: - Exception: If agent execution fails. + Exception: If agent execution fails (non-max-iterations errors). + + Note: + Unlike OpenAI's MaxTurnsExceeded exception, LangChain's AgentExecutor + handles max iterations gracefully and returns a normal result, so no + exception handling is needed. """ # Normalize input to dict format expected by AgentExecutor if isinstance(input, str): @@ -143,29 +177,11 @@ async def run( else: agent_input = input - try: - result = await self.agent_executor.ainvoke( - agent_input, - config={"max_iterations": max_iterations, **kwargs.get("config", {})}, - ) - return result - except Exception as e: - # Check if this is a max iterations error - if "iterations" in str(e).lower() and self.max_turns_recovery: - logger.info("Max iterations exceeded, attempting recovery") - - # Create recovery input with wrap-up prompt - recovery_input = { - "input": self.prompts.wrap_up, - "chat_history": agent_input.get("chat_history", []), - } - - result = await self.agent_executor.ainvoke( - recovery_input, - config={"max_iterations": self.recovery_turns}, - ) - return result - raise + result = await self.agent_executor.ainvoke( + agent_input, + config={"max_iterations": max_iterations, **kwargs.get("config", {})}, + ) + return result async def run_streamed( self, @@ -178,6 +194,10 @@ async def run_streamed( This method uses LangChain's astream_events API to provide granular streaming of agent execution, including intermediate steps, tool calls, and LLM responses. + LangChain's AgentExecutor handles max iterations internally, so streaming + will continue until completion or max_iterations is reached (with early + stopping applied if configured). + Args: input: User input/prompt for the agent. Can be a string or dict with "input" key. max_iterations: Maximum number of agent iterations. @@ -187,7 +207,7 @@ async def run_streamed( Event dictionaries from the agent execution. Raises: - Exception: If agent execution fails. + Exception: If agent execution fails (non-max-iterations errors). Example: async for event in runner.run_streamed("Research topic"): @@ -200,29 +220,9 @@ async def run_streamed( else: agent_input = input - try: - async for event in self.agent_executor.astream_events( - agent_input, - version="v2", - config={"max_iterations": max_iterations, **kwargs.get("config", {})}, - ): - yield event - except Exception as e: - # Check if this is a max iterations error - if "iterations" in str(e).lower() and self.max_turns_recovery: - logger.info("Max iterations exceeded, attempting recovery in streaming mode") - - # Create recovery input with wrap-up prompt - recovery_input = { - "input": self.prompts.wrap_up, - "chat_history": agent_input.get("chat_history", []), - } - - async for event in self.agent_executor.astream_events( - recovery_input, - version="v2", - config={"max_iterations": self.recovery_turns}, - ): - yield event - else: - raise + async for event in self.agent_executor.astream_events( + agent_input, + version="v2", + config={"max_iterations": max_iterations, **kwargs.get("config", {})}, + ): + yield event