Skip to content
Draft
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 136 additions & 0 deletions areal/api/cli_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -1963,6 +1963,68 @@ def __post_init__(self):
raise ValueError("admin_api_key must not be empty or whitespace-only")


@dataclass
class AgentConfig:
"""Configuration for the experimental agent service controller."""

agent_cls_path: str = field(
default="",
metadata={
"help": "Fully-qualified import path for the AgentRunnable implementation."
},
)
admin_api_key: str = field(
default="areal-agent-admin",
metadata={"help": "Shared admin API key for agent-service inter-service auth."},
)
num_pairs: int = field(
default=1,
metadata={"help": "Number of Worker+DataProxy pairs to launch on initialize."},
)
setup_timeout: float = field(
default=120.0,
metadata={
"help": "Timeout in seconds waiting for each service to become healthy."
},
)
health_poll_interval: float = field(
default=5.0,
metadata={
"help": "Seconds between pair health polls; 0 disables health monitoring."
},
)
drain_timeout: float = field(
default=30.0,
metadata={
"help": "Seconds to wait for active sessions to drain before force-killing a pair."
},
)
log_level: str = field(
default="info",
metadata={"help": "Log level for spawned agent-service micro-services."},
)
env: dict[str, str] = field(
default_factory=dict,
metadata={
"help": "Extra environment variables passed to all forked child processes."
},
)

def __post_init__(self) -> None:
if not self.agent_cls_path:
raise ValueError("agent_cls_path must be a non-empty import path")
if self.num_pairs < 0:
raise ValueError(f"num_pairs must be non-negative, got {self.num_pairs}")
if self.setup_timeout <= 0:
raise ValueError(
f"setup_timeout must be positive, got {self.setup_timeout}"
)
if self.drain_timeout < 0:
raise ValueError(
f"drain_timeout must be non-negative, got {self.drain_timeout}"
)


@dataclass
class InferenceEngineConfig:
"""Configuration for inference servers, including offpolicyness control."""
Expand Down Expand Up @@ -2081,13 +2143,87 @@ class InferenceEngineConfig:
},
)

# v2 controller options
_version: str = field(
default="v1",
metadata={
"help": "Rollout controller implementation version. Use 'v1' for legacy RolloutController, 'v2' for RolloutControllerV2.",
"choices": ["v1", "v2"],
},
)
model: str = field(
default="default",
metadata={"help": "Model name exposed through the inference-service gateway."},
)
routing_strategy: str = field(
default="round_robin",
metadata={"help": "Routing strategy for the inference-service router."},
)
poll_interval: float = field(
default=5.0,
metadata={
"help": "Health-poll interval in seconds for the inference-service router."
},
)
set_reward_finish_timeout: float = field(
default=0.0,
metadata={
"help": "Timeout in seconds to wait for additional reward updates before finalizing a session."
},
)
log_level: str = field(
default="info",
metadata={"help": "Log level for inference-service micro-services."},
)
admin_api_key: str = field(
default="areal-admin-key",
metadata={
"help": "Admin API key used by the inference-service gateway, router, and data proxies."
},
)
api_url: str | None = field(
default=None,
metadata={
"help": "External OpenAI-compatible base URL for inference-service external model mode."
},
)
provider_api_key: str | None = field(
default=None,
metadata={"help": "API key for the external OpenAI-compatible provider."},
)
n_gpus_per_node: int | None = field(
default=None,
metadata={
"help": "GPUs per physical node for multinode inference-service launch."
},
)

def __post_init__(self):
"""Validate scheduling_spec length."""
if len(self.scheduling_spec) not in (1, 2):
raise ValueError(
f"scheduling_spec must contain 1 or 2 SchedulingSpec, "
f"got {len(self.scheduling_spec)}"
)
if self._version not in ("v1", "v2"):
raise ValueError(
f"_version must be either 'v1' or 'v2', got '{self._version}'"
)
if self.n_gpus_per_node is not None and self.n_gpus_per_node < 1:
raise ValueError(
f"n_gpus_per_node must be >= 1, got {self.n_gpus_per_node}"
)
if not self.admin_api_key or not self.admin_api_key.strip():
raise ValueError("admin_api_key must not be empty or whitespace-only")
if (
self._version == "v2"
and self.openai is not None
and self.openai.admin_api_key != "areal-admin-key"
):
logger.warning(
"rollout.openai.admin_api_key is ignored by rollout controller v2; "
"use rollout.admin_api_key instead."
)


@dataclass
Expand Down
60 changes: 53 additions & 7 deletions areal/experimental/agent_service/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

The Agent Service provides **agent-level** capabilities on top of AReaL's model-level
proxy. It exposes complete agent sessions — multi-turn conversations with tool use,
memory, and pluggable agent frameworks — via independent HTTP microservices.
memory, and pluggable agent frameworks — via independent HTTP microservices. It also
includes an `AgentController` that can launch the stack through Guard processes and
bridge agent conversations to the experimental inference service for RL data
collection.

## Architecture

Expand Down Expand Up @@ -47,6 +50,10 @@ at startup. Each `POST /run` request is a single turn — the agent receives the
conversation history in the request and returns a response. The Worker has no session
state.

**AgentController** — Python orchestrator that launches Guards via the scheduler, forks
the Router / Gateway / Worker+DataProxy pairs onto them, supports scale-up and
scale-down, and exposes async runtime APIs for inference-backed RL sessions.

## Agent Protocol

Any class that satisfies the `AgentRunnable` protocol can run on the Worker:
Expand Down Expand Up @@ -129,6 +136,29 @@ class EventEmitter(Protocol):
| `/ws` | WS | Gateway WebSocket protocol |
| `/v1/responses` | POST | OpenResponses HTTP bridge |

## AgentController Runtime APIs

`AgentController` is the integration point used by the examples and rollout workflows.
It manages the agent-service stack and exposes async helpers for RL/inference flows:

| Method | Description |
| ------ | ----------- |
| `initialize()` | Launch Guards, Router, Worker+DataProxy pairs, Gateway, and the health monitor |
| `destroy()` | Tear down the full stack in reverse order |
| `scale_up(count)` | Add Worker+DataProxy pairs |
| `scale_down(count)` | Unregister, drain, and remove pairs |
| `start_session(...)` | Grant inference capacity and create an RL session bound to an agent session |
| `step(input, session_id, metadata=None)` | Send a turn through the agent-service Gateway `POST /v1/responses` |
| `set_reward(reward, session_id, interaction_id=None)` | Forward the final reward to the inference service |
| `export_trajectory(session_id, ...)` | Export serialized interactions from the inference service |

Typical rollout flow:

1. `start_session()` to create the agent/inference session pair.
2. `step()` for each user turn.
3. `set_reward()` when the episode completes.
4. `export_trajectory()` to retrieve interactions for training.

## Multi-turn Conversation Flow

```
Expand Down Expand Up @@ -159,9 +189,8 @@ areal/experimental/agent_service/
├── protocol.py # Gateway protocol frame types
├── types.py # AgentRequest, AgentResponse, EventEmitter, AgentRunnable
├── controller/
│ ├── __init__.py # AgentServiceController, AgentServiceControllerConfig
│ ├── config.py # AgentServiceControllerConfig dataclass
│ └── controller.py # AgentServiceController orchestrator
│ ├── __init__.py # AgentController export
│ └── controller.py # AgentController orchestrator
├── guard/
│ ├── __init__.py # Module docstring
│ ├── __main__.py # python -m areal.experimental.agent_service.guard
Expand Down Expand Up @@ -190,8 +219,25 @@ areal/experimental/agent_service/
├── app.py # create_worker_app()
└── config.py # WorkerConfig dataclass

examples/agent_service/
├── agent.py # ClaudeAgent (Claude Agent SDK)
├── run_agent_service.py # Controller-based launcher + interactive demo
examples/experimental/agent_service/
├── __init__.py # Marks the examples package
├── claude/
│ ├── __init__.py # Claude example package
│ ├── agent.py # ClaudeAgent (Claude Agent SDK)
│ └── run_agent_service.py # Controller-based launcher + interactive demo
├── tau2/
│ ├── __init__.py # Tau2 example package
│ ├── agent.py # Tau2 agent-service worker example
│ ├── workflow.py # Tau2 workflow using async controller APIs
│ ├── run_rollout.py # Direct rollout driver for Tau2
│ └── config.yaml # Tau2 example config
└── README.md # Example documentation
```

For a standalone worker process, the agent import path now points at the nested Claude
example module:

```bash
python -m areal.experimental.agent_service.worker \
--agent examples.experimental.agent_service.claude.agent.ClaudeAgent
```
2 changes: 1 addition & 1 deletion areal/experimental/agent_service/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

Submodules
----------
- ``controller`` — :class:`AgentServiceController` orchestrator
- ``controller`` — :class:`AgentController` orchestrator
- ``gateway`` — public HTTP/WebSocket entry point
- ``router`` — session-affine routing
- ``data_proxy`` — stateful session proxy
Expand Down
9 changes: 5 additions & 4 deletions areal/experimental/agent_service/controller/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

"""Agent Service Controller — orchestrator for agent micro-services."""

from .config import AgentServiceControllerConfig
from .controller import AgentServiceController
from areal.api.cli_args import AgentConfig

from .controller import AgentController

__all__ = [
"AgentServiceController",
"AgentServiceControllerConfig",
"AgentController",
"AgentConfig",
]
63 changes: 0 additions & 63 deletions areal/experimental/agent_service/controller/config.py

This file was deleted.

Loading
Loading