Skip to content
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions examples/tutorial/agent_framework_get_started/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Agent Framework Get Started

This tutorial is the smallest runnable entry for the current `verl.agent`
path in PR `verl-project/verl#5931`.

It demonstrates exactly three boundaries:

1. The caller creates the runtime externally.
2. `GatewayServingRuntime` is injected into
`OpenAICompatibleAgentFramework`.
3. The framework is exercised with one `generate_sequences(...)` call on a
minimal `TensorDict`.

Inside the script, the agent side is intentionally split into two layers:

- `agent_runner(...)`: the framework-facing adapter that receives a session
handle and extracts `session.base_url`
- `run_mock_agent(base_url, raw_prompt)`: the external-agent-style function
that only knows an OpenAI-compatible backend URL plus prompt messages

That keeps the gateway-specific lifecycle shim visible, while still showing
how a normal agent can treat the gateway as its backend URL.

This is intentionally **not** a trainer integration example. It uses:

- a tiny fake rollout server actor,
- the real `GlobalRequestLoadBalancer`,
- the real `GatewayServingRuntime`,
- the real `GatewayActor`,
- the real `OpenAICompatibleAgentFramework`.

That keeps the example CPU-only and lightweight, while avoiding any suggestion
that the current bootstrap logic has already been promoted into a polished
public API.

## Run

```bash
python examples/tutorial/agent_framework_get_started/minimal_e2e.py
```

The script will:

1. start Ray,
2. start one fake rollout server actor,
3. create a `GlobalRequestLoadBalancer`,
4. create a `GatewayServingRuntime`,
5. inject that runtime into `OpenAICompatibleAgentFramework`,
6. send one chat-completions request through the gateway,
7. call `generate_sequences(...)`,
8. print a short JSON summary,
9. clean up the runtime and Ray.
225 changes: 225 additions & 0 deletions examples/tutorial/agent_framework_get_started/minimal_e2e.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
from __future__ import annotations

import json

import httpx
import ray

from verl.agent.framework import framework as framework_module
from verl.agent.framework.framework import OpenAICompatibleAgentFramework
from verl.agent.framework.types import SessionRewardContext
from verl.agent.gateway.runtime import GatewayServingRuntime
from verl.utils import tensordict_utils as tu
from verl.workers.rollout.llm_server import GlobalRequestLoadBalancer, LLMServerClient
from verl.workers.rollout.replica import TokenOutput


class MinimalTokenizer:
"""Small tokenizer stub for the gateway tutorial example."""

def apply_chat_template(self, messages, tokenize=True, add_generation_prompt=True, tools=None, **kwargs):
del tools, kwargs
parts = []
for message in messages:
parts.append("{}:{}\n".format(message["role"], self._normalize_content(message.get("content", ""))))
if add_generation_prompt:
parts.append("assistant:")
text = "".join(parts)
if tokenize:
return [ord(char) for char in text]
return text

def decode(self, token_ids, skip_special_tokens=True):
del skip_special_tokens
return "".join(chr(token_id) for token_id in token_ids)

def encode(self, text, add_special_tokens=False):
del add_special_tokens
return [ord(char) for char in text]

def _normalize_content(self, content):
if isinstance(content, list):
return "".join(part.get("text", "") if isinstance(part, dict) else str(part) for part in content)
if content is None:
return ""
return str(content)


@ray.remote
class MinimalRolloutServer:
def __init__(self, response_text: str = "MINIMAL"):
self.response_text = response_text
self.calls = []

async def generate(
self,
request_id,
*,
prompt_ids,
sampling_params,
image_data=None,
video_data=None,
):
del image_data, video_data
self.calls.append(
{
"request_id": request_id,
"prompt_ids": list(prompt_ids),
"sampling_params": dict(sampling_params),
}
)
token_ids = [ord(char) for char in self.response_text]
return TokenOutput(
token_ids=token_ids,
log_probs=[-0.1] * len(token_ids),
stop_reason="completed",
)

def get_calls(self):
return list(self.calls)


class MinimalTransferQueue:
def __init__(self):
self.puts = []
self.batch_puts = []

async def async_kv_put(self, *, key, partition_id, tag):
self.puts.append({"key": key, "partition_id": partition_id, "tag": dict(tag)})

async def async_kv_batch_put(self, *, keys, fields, tags, partition_id):
self.batch_puts.append(
{
"keys": list(keys),
"fields": fields,
"tags": [dict(tag) for tag in tags],
"partition_id": partition_id,
}
)


def _build_prompts():
return tu.get_tensordict(
tensor_dict={
"raw_prompt": [[{"role": "user", "content": "Say MINIMAL"}]],
"uid": ["sample-0"],
}
)


def _reward_fn(ctx: SessionRewardContext) -> list[float]:
return [float(traj.reward_info["score"]) for traj in ctx.trajectories]


async def run_mock_agent(*, base_url: str, raw_prompt) -> tuple[str, dict[str, object]]:
"""Mimic an external agent that only knows an OpenAI-compatible backend URL."""

async with httpx.AsyncClient(timeout=5.0) as client:
chat_response = await client.post(
f"{base_url}/chat/completions",
json={
"model": "minimal-model",
"messages": raw_prompt,
"temperature": 0.0,
},
)
chat_response.raise_for_status()
response_payload = chat_response.json()

reward_info = {"score": 0.5, "label": "minimal-example"}
complete_response = await client.post(
base_url.removesuffix("/v1") + "/complete",
json={"reward_info": reward_info},
)
complete_response.raise_for_status()

return response_payload["choices"][0]["message"]["content"], reward_info


async def run_example() -> dict[str, object]:
"""Run the minimal end-to-end path through runtime -> framework -> generate_sequences."""

started_ray_here = False
runtime: GatewayServingRuntime | None = None
gateway_response_text = ""
fake_tq = MinimalTransferQueue()
original_tq = framework_module.tq

if not ray.is_initialized():
ray.init(ignore_reinit_error=True, include_dashboard=False)
started_ray_here = True

try:
framework_module.tq = fake_tq
rollout_server = MinimalRolloutServer.remote("MINIMAL")
load_balancer = GlobalRequestLoadBalancer.remote({"server-0": rollout_server})
llm_client = LLMServerClient(
config=None,
servers={"server-0": rollout_server},
load_balancer_handle=load_balancer,
)

runtime = GatewayServingRuntime(
llm_client=llm_client,
gateway_count=1,
gateway_actor_kwargs={
"tokenizer": MinimalTokenizer(),
"host": "127.0.0.1",
},
)

async def agent_runner(*, raw_prompt, session, sample_index):
nonlocal gateway_response_text

assert session.base_url is not None
gateway_response_text, _reward_info = await run_mock_agent(
base_url=session.base_url,
raw_prompt=raw_prompt,
)

framework = OpenAICompatibleAgentFramework(
session_runtime=runtime,
agent_runner=agent_runner,
reward_fn=_reward_fn,
wait_for_completion_after_agent_run=True,
completion_timeout=5.0,
)

stats = await framework.generate_sequences(
_build_prompts(),
global_steps=1,
partition_id="train",
)
rollout_calls = ray.get(rollout_server.get_calls.remote())
fields = fake_tq.batch_puts[0]["fields"]

# Everything returned here is example evidence for reviewers/tests,
# not a suggested public API shape for framework consumers.
return {
"runtime_class": type(runtime).__name__,
"framework_class": type(framework).__name__,
"agent_runner_contract": "session_to_base_url_adapter",
"gateway_response_text": gateway_response_text,
"rollout_stats": stats,
"tq_keys": fake_tq.batch_puts[0]["keys"],
"finished_tags": fake_tq.puts,
"uid_values": tu.get(fields, "uid"),
"reward_scores": fields["rm_scores"][0].tolist(),
"rollout_calls": rollout_calls,
}
finally:
framework_module.tq = original_tq
if runtime is not None:
await runtime.shutdown()
if started_ray_here and ray.is_initialized():
ray.shutdown()


def main() -> None:
import asyncio

print(json.dumps(asyncio.run(run_example()), indent=2, sort_keys=True))


if __name__ == "__main__":
main()
109 changes: 109 additions & 0 deletions tests/agent/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Agent tests

This directory contains the CPU-only unit tests for the new `verl.agent`
packages introduced for the agent framework / gateway path.

## Naming and CI routing

All executable test modules in this directory use the `*_on_cpu.py` suffix so
they are picked up by VERL's existing `cpu_unit_tests.yml` workflow instead of
the default GPU unit-test workflow.

## Coverage inventory

### Framework

- `framework/test_assembler_on_cpu.py`
- `test_trajectory_assembler_matches_training_batch_contract`
- Verifies the assembled `TensorDict` matches the expected training batch contract, including prompt/response padding, masks, `position_ids`, rollout logprobs, routed experts, `rm_scores`, and non-tensor metadata packing.
- `test_trajectory_assembler_rejects_empty_trajectories`
- Verifies `assemble()` rejects an empty trajectory list.
- `test_trajectory_assembler_rejects_response_mask_length_mismatch`
- Verifies `response_mask` length must match `response_ids`.
- `test_trajectory_assembler_rejects_response_logprobs_length_mismatch`
- Verifies `response_logprobs` length must match `response_ids` when logprobs are present.
- `test_trajectory_assembler_requires_reward_score`
- Verifies each trajectory must have a non-`None` `reward_score` before assembly.
- `test_trajectory_assembler_supports_numpy_routed_experts`
- Verifies `routed_experts` accepts `numpy.ndarray` input and preserves the expected tensor shape/dtype in the output.
- `framework/test_openai_compatible_framework_on_cpu.py`
- `test_openai_compatible_framework_runs_against_fake_session_runtime`
- Verifies the framework can run end-to-end against a fake in-memory session runtime and propagate sample-level non-tensor fields into the assembled batch.
- `test_openai_compatible_framework_waits_for_completion_when_configured`
- Verifies optional `wait_for_completion()` is invoked with the configured timeout before finalization.
- `test_openai_compatible_framework_broadcasts_sample_fields_to_multiple_trajectories`
- Verifies a single sample's non-tensor fields are broadcast to all trajectories materialized from that sample.
- `test_openai_compatible_framework_aborts_session_on_agent_error`
- Verifies agent runner failures trigger `abort_session()`, do not finalize the session, and surface a clear all-failed batch error when no sample succeeds.
- `test_openai_compatible_framework_drops_failed_samples_but_keeps_successful_ones`
- Verifies a mixed batch can drop a failed sample while still finalizing, rewarding, and assembling the successful sample.
- `test_openai_compatible_framework_raises_when_all_samples_fail_without_calling_assembler`
- Verifies an all-failed batch raises a clear error and does not call the assembler with an empty trajectory list.
- `test_openai_compatible_framework_omits_rollout_log_probs_when_missing`
- Verifies missing rollout logprobs stay absent from the assembled batch instead of being synthesized.

### Gateway

- `gateway/test_gateway_actor_on_cpu.py`
- `test_normalize_request_context_preserves_structured_fields`
- Verifies request normalization preserves structured multimodal content, `tool_calls`, and `tool_call_id` fields needed for prefix comparison.
- `test_gateway_actor_complete_wait_and_finalize`
- Verifies `/complete`, `wait_for_completion()`, and `finalize_session()` work together on the happy path and attach `reward_info` to materialized trajectories.
- `test_gateway_actor_prefix_mismatch_splits_trajectories`
- Verifies a message-history prefix mismatch starts a new trajectory instead of continuing the active one.
- `test_gateway_actor_tool_context_change_splits_trajectory`
- Verifies a tool-schema change is treated as a request-context split boundary.
- `test_gateway_actor_does_not_forward_tools_in_sampling_params`
- Verifies `tools` are stripped before backend generation params are forwarded.
- `test_gateway_actor_strips_request_envelope_but_keeps_sampling_params`
- Verifies request-envelope fields such as `messages`, `model`, and `tools` are removed at the backend boundary while backend sampling params come from gateway-owned base params plus whitelisted request overrides.
- `test_gateway_actor_ignores_non_whitelisted_request_sampling_params`
- Verifies non-whitelisted request fields do not leak into backend sampling params.
- `test_gateway_actor_continuation_preserves_prompt_and_generation_masks`
- Verifies continuation tokenization appends mask `0` for replayed/incremental context and mask `1` for newly generated tokens.
- `test_gateway_actor_tool_argument_json_equivalence_does_not_split_after_valid_continuation`
- Verifies JSON-equivalent tool-call arguments do not trigger a trajectory split when only key order changes.
- `test_message_prefix_falls_back_to_raw_tool_argument_value_comparison_when_arguments_are_invalid_json`
- Verifies invalid tool-call argument strings fall back to raw-value comparison rather than best-effort JSON equivalence.
- `test_gateway_actor_serializes_same_session_concurrent_requests`
- Verifies concurrent requests targeting the same session are serialized rather than entering the backend concurrently.
- `test_gateway_actor_rejects_chat_after_complete`
- Verifies chat requests are rejected once the session has been marked completed.
- `test_gateway_actor_finalizes_without_complete`
- Verifies `finalize_session()` can materialize and remove the active trajectory even if `/complete` was never called.
- `test_gateway_actor_rejects_malformed_requests_with_bad_request`
- Verifies representative malformed request shapes are rejected with HTTP 400.
- `test_gateway_actor_backend_failure_does_not_commit_partial_state`
- Verifies backend generation failure returns HTTP 500 without committing partial trajectory/session state.
- `test_gateway_actor_backend_failure_after_tool_mismatch_does_not_split`
- Verifies a failed request after a tool-context mismatch does not prematurely materialize/split the previous trajectory.
- `test_gateway_actor_tool_call_decode_returns_openai_format`
- Verifies tool-parser output is decoded back into OpenAI-compatible `tool_calls` responses and can be continued with a tool-result turn.
- `gateway/test_gateway_manager_on_cpu.py`
- `test_gateway_manager_routes_sessions_stickily`
- Verifies session creation/finalization stay routed to the owning gateway.
- `test_gateway_manager_uses_least_active_sessions_routing`
- Verifies new sessions are assigned to the gateway with the fewest active sessions.
- `test_gateway_manager_wait_for_completion_delegates_to_session_owner`
- Verifies `wait_for_completion()` is delegated to the gateway that owns the session.
- `gateway/test_session_runtime_on_cpu.py`
- `test_gateway_serving_runtime_owns_gateway_lifecycle_and_session_runtime`
- Verifies the runtime can own gateway actor lifecycle plus session creation, wait, completion, and finalization behavior.
- `test_gateway_serving_runtime_injects_runtime_owned_gateway_backend`
- Verifies runtime-owned gateways use the runtime itself as backend and correctly apply gateway-owned base sampling params plus whitelisted request overrides before calling the rollout server.
- `test_gateway_serving_runtime_passes_processor_and_media_to_owned_gateway`
- Verifies runtime-owned gateways accept processor-aware actor kwargs and forward extracted multimodal media to the rollout server.
- `test_gateway_serving_runtime_releases_server_when_generate_fails`
- Verifies backend-server slots are still released when `generate()` raises, preventing load-balancer bookkeeping leaks.
- `test_gateway_serving_runtime_gateway_count_zero_falls_back_to_generate_only_mode`
- Verifies `gateway_count=0` still supports direct `generate()` requests without creating owned gateway actors or a session runtime.

## Mocking boundaries

- No test in this directory depends on a real `LLMServer`, model weights, or a
production serving runtime.
- `tests/agent/support.py` provides the fakes and lightweight Ray actors used by
the gateway/runtime tests.
- The only retained dependency on the old experimental tree is
`verl.experimental.agent_loop.tool_parser`, which is intentionally reused by
`GatewayActor` until the community-wide extraction lands.
Loading