Skip to content

init commit for external agent framework+gateway#25

Open
zackcxb wants to merge 1 commit into
verl-project:mainfrom
zackcxb:gateway_framework_pr
Open

init commit for external agent framework+gateway#25
zackcxb wants to merge 1 commit into
verl-project:mainfrom
zackcxb:gateway_framework_pr

Conversation

@zackcxb

@zackcxb zackcxb commented May 18, 2026

Copy link
Copy Markdown

What does this PR do?

This PR adds a trainer-side agent framework and gateway runtime for multi-turn agent-style rollout in uni-agent, as a downstream integration of verl RFC #5790 and the upstream agent framework PR verl#6299.

Specifically, it:

  • adds uni_agent.trainer.frameworkAgentFramework abstract base, OpenAICompatibleAgentFramework concrete implementation, and AgentFrameworkRolloutAdapter (satisfies the trainer's agent_loop_manager_class extension point; recipes wire it in via YAML with no per-recipe glue),
  • adds uni_agent.trainer.gateway_GatewayActor / GatewayManager / GatewayServingRuntime for OpenAI-compatible session serving, sticky session routing, tool-parser wiring, and multimodal media accumulation; backend routing delegates to LLMServerClient,
  • adds a deepeyes gateway recipe under examples/deepeyes/,
  • adds CPU tests covering framework contract, gateway actor / manager behavior, session runtime lifecycle, and multimodal postprocess.

Wave 2 additions (length budget enforcement + OpenAI parity):

  • Rollout prompt_length / response_length budget injected into _GatewayActor; continuation turns clamped to remaining budget; budget-exhausted turns materialise a synthetic finish_reason=length response without hitting the backend.
  • All error paths return a unified OpenAI-spec error body ({"error": {"message": …, "type": …, "code": …}}); encode/decode failures caught and surfaced as 400.
  • Per-request chat_template_kwargs forwarded to apply_chat_template; reasoning_content preserved through _normalize_message and prefix comparison.
  • Unsupported OpenAI capabilities (n>1, response_format, tool_choice=required/function) rejected with 400; tool_choice="none" supported (skips tool injection and parser).
  • verl submodule bumped to upstream 3c5f6e04 (verl PR #6129: move LLMServerManager out of AgentLoopManager) so reviewers can git submodule update --init without access to a private fork.

Checklist Before Starting

Test

PYTHONPATH=$(pwd) pytest tests/uni_agent/trainer/ -q

Result: 64 passed, 6 warnings (framework, gateway, runtime, multimodal postprocess).

Real-rollout evidence from the deepeyes gateway recipe: a 50-step GRPO run on multi-turn multimodal data (Qwen3.5-4B, 7× RTX 3090 train + 1× local judge) produced a real learning curve — critic/rewards/mean moved from ~0.21 at step 1 to ~1.86 by step 50.

API and Usage Example

Public APIs added:

  • uni_agent.trainer.frameworkAgentFramework, OpenAICompatibleAgentFramework, AgentFrameworkRolloutAdapter, build_agent_framework
  • uni_agent.trainer.gatewayGatewayServingRuntime, GatewayManager, GatewayActor

Minimum viable wiring via YAML config:

actor_rollout_ref:
  rollout:
    agent:
      agent_loop_manager_class: uni_agent.trainer.framework.entry.AgentFrameworkRolloutAdapter
    custom:
      agent_framework:
        framework_class_fqn: uni_agent.trainer.framework.framework.OpenAICompatibleAgentFramework
        gateway_count: 1

The adapter calls build_agent_framework() which wires GatewayServingRuntime and the framework subclass from config. The agent runner only needs the gateway base URL:

async def agent_runner(*, raw_prompt, session_runtime, sample_index, **_):
    await run_external_agent(
        base_url=f"http://127.0.0.1:{session_runtime.port}",
        raw_prompt=raw_prompt,
    )

generate_sequences() writes finalized trajectories directly to TransferQueue with key "{uid}_{session_id}_{index}", matching AgentLoopWorkerTQ._agent_loop_postprocess()'s field / tag layout.

Design & Code Changes

High-level changes:

  • AgentFramework base class + OpenAICompatibleAgentFramework own session orchestration (create_sessionagent_runnerfinalize_session), trajectory assembly, multimodal post-processing, reward scoring, and TransferQueue writes. Per-session failures are isolated via asyncio.gather(..., return_exceptions=True) so one bad session does not cancel the rest of the batch.
  • _GatewayActor provides OpenAI Chat Completions over sticky sessions with prefix-consistency checks, tool-parser decoding, multimodal media accumulation, and rollout budget enforcement. GatewayManager routes new sessions by least-active count. GatewayServingRuntime owns gateway actor lifecycle and delegates backend routing to LLMServerClient.
  • Multimodal trajectory post-process builds trainer-consumable multi_modal_inputs and (4, seq_len) position ids inside the framework, so VLM sessions do not need per-recipe glue.
  • AgentFrameworkRolloutAdapter satisfies the trainer's agent_loop_manager_class contract; every recipe wires the same class in YAML — no per-recipe adapter code.

WIP / Follow-up

  • GatewayActor default placement strategy (at least one per node) once multi-node validation is in
  • Fully Async support

Checklist Before Submitting

  • Read the Contribute Guide (if present).
  • Add unit tests to cover all new code — 64 CPU tests included, following the *_on_cpu.py naming convention.
  • Apply pre-commit checks: pre-commit install && pre-commit run --all-files
  • Add / update documentation — deferred to a follow-up; inline docstrings ship with this PR.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive agent framework and gateway system designed to facilitate agentic workflows within a training environment. Key components include a factory for constructing frameworks, an OpenAI-compatible framework implementation that manages sequence generation and trajectory logging, and a gateway system that provides an OpenAI-compatible API for agent interactions. The gateway handles session lifecycle, trajectory buffering, and multimodal data processing. Feedback identifies a critical issue with the incremental token encoding logic in the gateway, which may produce malformed sequences due to assumptions about tokenizer stability and turn separators. Further recommendations include parallelizing reward calculations to improve performance and replacing blocking ray.get calls with asynchronous operations to avoid event loop starvation.

Comment thread uni_agent/trainer/gateway/gateway.py Outdated
Comment on lines +412 to +453
def _encode_incremental(
self,
messages: list[dict[str, Any]],
image_data: list[Any] | None = None,
video_data: list[Any] | None = None,
) -> list[int]:
"""Encode incremental messages (tool results, user follow-ups) for a continuation turn.

Uses the remove_system_prompt pattern from ToolAgentLoop: encode the new messages
alone (which prepends a system prompt), then strip the known system_prompt prefix.
No tools parameter — tool schema is already in the initial prompt_ids.
"""
if self._processor is not None:
raw_prompt = _apply_chat_template(
self._processor,
messages,
add_generation_prompt=True,
tokenize=False,
**self._apply_chat_template_kwargs,
)
videos = video_data
video_metadata = None
if videos is not None:
videos, video_metadata = zip(*videos, strict=False)
videos, video_metadata = list(videos), list(video_metadata)
model_inputs = self._processor(
text=[raw_prompt],
images=image_data,
videos=videos,
video_metadata=video_metadata,
return_tensors="pt",
do_sample_frames=False,
)
ids = normalize_token_ids(model_inputs["input_ids"])
else:
ids = normalize_token_ids(
_apply_chat_template(
self._tokenizer, messages, add_generation_prompt=True,
**self._apply_chat_template_kwargs,
)
)
return ids[len(self._system_prompt):]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The incremental encoding logic is fragile and likely to produce malformed token sequences. Slicing tokens based on the length of a pre-encoded system prompt assumes that the tokenizer is prefix-stable and that the chat template doesn't insert turn separators or special tokens between the system prompt and the first message. Furthermore, concatenating these incremental IDs to the previous turn's response IDs (at line 542) will miss the necessary turn separators (e.g., <|im_end|> and <|im_start|>user) required by most chat templates. It is safer to re-encode the full message history and identify the delta, or simply rely on the backend's prefix caching by sending the full prompt.

Comment thread uni_agent/trainer/framework/framework.py Outdated
gateway_actor_kwargs["backend"] = self

self.owned_gateway_actors = [GatewayActor.remote(**gateway_actor_kwargs) for _ in range(gateway_count)]
ray.get([gateway.start.remote() for gateway in self.owned_gateway_actors])

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using ray.get inside an async context (called via build_agent_framework) will block the event loop, preventing other concurrent tasks from making progress. Since a helper _await_ray_ref is already defined in this file, you should consider moving the gateway startup logic to an async initialization method that can be awaited, rather than performing blocking calls in the constructor.

@wangtiance

Copy link
Copy Markdown

为什么放在trainer目录下?我觉得这是黑盒调用训推通用的流程。我偏向往上提一级,直接放uni_agent/framework和uni_agent/gateway.

return DataProto(batch=batch, non_tensor_batch=non_tensor_batch)


class OpenAICompatibleAgentFramework(AgentFramework):

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move OpenAICompatibleAgentFramework into a separate file, keep abstract interface only.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved the abstract interface to base.py

Comment thread uni_agent/trainer/framework/entry.py Outdated
@zackcxb zackcxb force-pushed the gateway_framework_pr branch from 2da5be1 to 825b7f3 Compare May 27, 2026 11:55
Comment thread uni_agent/trainer/gateway/runtime.py Outdated
@zackcxb zackcxb force-pushed the gateway_framework_pr branch from 825b7f3 to 9c7c97a Compare May 27, 2026 13:16
@zackcxb zackcxb marked this pull request as ready for review May 28, 2026 03:14
@zackcxb zackcxb force-pushed the gateway_framework_pr branch 3 times, most recently from a7e392b to 9677db1 Compare May 28, 2026 08:09
@yyDing1

yyDing1 commented May 28, 2026

Copy link
Copy Markdown
Collaborator

The current entry point binds a single runner via agent_runner_fqn + agent_runner_kwargs. This works for a single-task recipe like DeepEyes, but it doesn't scale to multi-task rollout.

We may introduce an AgentRunner abstract base with a minimal run() contract:

# uni_agent/trainer/framework/runner.py
class AgentRunner(ABC):
    name: str = ""
    @abstractmethod
    async def run(
        self,
        *,
        raw_prompt: list[dict],
        session: SessionHandle,
        session_runtime: SessionRuntime,
        sample_index: int,
        tools_kwargs: dict[str, Any] | None = None,
    ) -> None:
        ...

Each sample carries the runner name; config mounts a name → runner map.

The config could be in the following format:

# agent_runner.yaml
- name: deepeyes
  _target_: examples.agent_train.deepeyes_gateway.runner.DeepEyesAgentRunner
  max_turns: 5
  tools:
    - name: image_zoom_in_tool
      config_path: examples/agent_train/deepeyes_gateway/configs/image_zoom_in_tool_config.yaml

- name: swe
  _target_: examples.agent_train.swe_gateway.runner.SweAgentRunner
  max_turns: 50
  env:
    deployment:
      type: vefaas
      command: ...
  tools:
    - name: str_replace_editor
    - name: execute_bash

Then the framework resolves the runner per-session by sample["agent_runner_name"], like:

# framework.py:_run_session
runner = self._runners_by_name[sample_fields["agent_runner_name"]]
await runner.run(
    raw_prompt=raw_prompt,
    session=session,
    session_runtime=self.session_runtime,
    sample_index=sample_index,
    tools_kwargs=sample_fields.get("tools_kwargs"),
)

This could be similar to verl's existing agent_loop_config pattern, and we can adopt the same shape here.

Comment thread uni_agent/gateway/gateway.py
Comment thread uni_agent/trainer/gateway/gateway.py Outdated
@gxlvera

gxlvera commented May 28, 2026

Copy link
Copy Markdown

Hi, I would like to propose using Prefix Trie for multi-trajectory storage for Agentgateway. My RFC is here:#51
This approach could address the following limitations of current implementation:

  • Single active branch only: A session keeps one message_history and one active trajectory. When switching sub-agents, picking a resample path, or returning to an older branch, new requests cannot reattach to historical branches. A trie keeps every branch; incoming messages longest-prefix-match against any path and continue from there.
  • Repeated encoding of shared prefixes: Message/token prefixes shared across trajectories are re-materialized and re-tokenized on every branch switch. A trie stores checkpoints on shared nodes; later calls clone from the matched node and tokenize incrementally.
  • No concurrent inference: One shared state requires a generation lock and serial LLM calls. With a trie, each call owns a cloned branch state; tokenize and commit can interleave—supporting sub-agents, best-of-n, etc.

For detailed explanation, please also refer to this comment: verl-project/verl#6299 (comment)

yyDing1 added a commit that referenced this pull request Jun 1, 2026
…nt) (#52)

### What does this PR do?

Adds `examples/swe_agent/` — an end-to-end recipe for training a
SWE-bench coding agent with fully-async RL (Megatron actors + vLLM
rollout on separate nodes) and Modal swe-rex sandboxes.

It stitches the existing building blocks into something runnable,
mirroring `examples/search_agent/`:
- data: `examples/data_preprocess/swe_rebench.py` +
`swe_bench_verified.py`
- reward: `uni_agent.reward.swe_rebench` / `swe_bench`
- rollout: `uni_agent.agent_loop.UniAgentLoop` (Modal swe-rex)

Reference config trains Qwen3-235B-A22B-Instruct-2507 with GRPO on a
12-node (8 train + 4 rollout) × 4-GPU topology; everything is
env-overridable to scale down.

### Checklist Before Starting

- [x] Search for similar PRs/issues:
- `gh pr list --repo verl-project/uni-agent --state open` → no SWE-bench
training example (PR #25 is an unrelated agent-framework/gateway)
  - no existing `examples/swe*` dir
- [x] Format the PR title as `[examples] feat: ...`

### Test

This is a recipe (scripts + configs + docs), not library code:
- `bash -n train_qwen3_235b_swebench.sh` — OK
- `python -c "import yaml; yaml.safe_load(...)"` on both YAMLs — OK
- `pre-commit run --files examples/swe_agent/*` — pass (compile-all;
ruff/mypy skip non-py)
- `shellcheck` — clean except style-only SC2206 on the hydra arg-array
append, consistent with the repo's other launch scripts

Full end-to-end training was run internally on the reference topology;
the committed files are the scrubbed/generalized form of that setup
(no secrets or site-specific paths — `runtime_env.yaml` ships
placeholders only).

### Files

| File | Purpose |
|---|---|
| `train_qwen3_235b_swebench.sh` | `ray job submit` + full GRPO /
Megatron / vLLM config; topology & paths are env vars |
| `agent_config.yaml` | UniAgentLoop config: tools, Modal deployment,
rollout concurrency, reward |
| `runtime_env.yaml` | Ray runtime-env **template** (placeholders for
Modal / W&B tokens + checkout paths) |
| `README.md` | dataset → runtime_env → launch → monitor + tuning notes
|

### Notes captured for reproducibility

Non-obvious settings learned running this at scale (documented in the
script header / README):
- `max_response_length=128K` — SWE-bench trajectories are long (mean
~70K tokens, ~90 turns); 32K truncates ~half
- `tool_parser: hermes` for Qwen3-235B (wrong parser silently breaks
tool calls)
- `moe_token_dispatcher_type=alltoall` — portable MoE dispatch
- `VLLM_USE_DEEP_GEMM=0` — vLLM 0.21 EP/CUTLASS init workaround
- do **not** set `expandable_segments:True` (incompatible with vLLM
sleep-mode CuMemAllocator, pytorch#147851)

### Checklist Before Submitting

- [x] Read the Contribute Guide
- [x] `pre-commit run --files examples/swe_agent/*` passed
- [x] No new library code → no unit tests; recipe validated via
syntax/lint + internal end-to-end run
- [x] AI assistance was used (Claude Code); the submitting human
(@aoshen02) reviewed every line
- [x] No secrets / site-specific paths committed

---------

Signed-off-by: aoshen02 <aoshen@inferact.ai>
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-authored-by: yuyangding <yuyangding@bytedance.com>
@sl-1314

sl-1314 commented Jun 2, 2026

Copy link
Copy Markdown

Hi, I noticed that, in the original verl AgentLoopManager/AgentLoopManagerTQ, it spawns num_workers independent AgentLoopWorker(Ray actors), distributing the total train_batch×rollout.n agent loops across these actors for parallelism. However, OpenAICompatibleAgentFramework.generate_sequences currently runs all train_batch×rollout.n sessions in a single asyncio event loop inside the PPOTrainer process. This means CPU-bound operations in one agent loop (e.g. tool execution) will block all other concurrent sessions.

Suggestion: Refer the original verl AgentLoopManager pattern — introduce multiple AgentLoopWorker Ray actors, partition the batch tasks across them.

# refer AgentLoopManager._init_agent_loop_workers()
for i in range(num_workers):
    worker = AgentLoopWorker.options(
        scheduling_strategy=NodeAffinitySchedulingStrategy(node_id=..., soft=True)
    ).remote(config, ...)
    self.workers.append(worker)

async def generate_sequences(self, prompts):
    chunks = prompts.chunk(len(self.workers))
    await asyncio.gather(*[
        w.generate_sequences.remote(chunk)
        for w, chunk in zip(self.workers, chunks)
    ])

This separates two independent concurrency axes: gateway_count for LLM serving throughput, num_workers for agent execution parallelism — consistent with the original verl design.

@wuxibin89

wuxibin89 commented Jun 3, 2026

Copy link
Copy Markdown
Collaborator
  1. P0: Add doc string for all public class, method, fields and functions.
  2. P1: Separate this PR into 3 PRs: gateway, framework, deepeyes examples. We can only review the gateway part before sepration.

from verl.workers.rollout.utils import run_uvicorn


class _GatewayActor:

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just decorate it with @ray.remote?

@ray.remote
class GatewayActor:
    ...

zackcxb added a commit to zackcxb/uni-agent that referenced this pull request Jun 4, 2026
Split from PR verl-project#25 per maintainer request: gateway is the first
independently-reviewable PR. Owns SessionHandle/Trajectory (moved from
framework.types). No framework dependency.

Spec: cxb_dev/docs/plans/2026-06-03-pr25-split-gateway-framework-deepeyes-design.md

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
zackcxb added a commit to zackcxb/uni-agent that referenced this pull request Jun 4, 2026
P0 follow-up to PR verl-project#25 review: docstring every public class, method, field, and function in the gateway package. Pure documentation; zero behavior change. Full regression 50 passed unchanged.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@zackcxb zackcxb force-pushed the gateway_framework_pr branch 2 times, most recently from a300294 to a17e1ed Compare June 5, 2026 07:10
zackcxb added a commit to zackcxb/uni-agent that referenced this pull request Jun 5, 2026
Introduce uni_agent/trainer/gateway/protocol.py with OpenAI-compatible
ChatCompletionRequest / ChatCompletionResponse TypedDicts. _handle_chat_completions
now annotates its payload as ChatCompletionRequest and constructs the
response via ChatCompletionResponse local instead of an anonymous dict.

Response gains the OpenAI-standard `created` (unix ts) and `model` fields;
`model` falls back to "unknown" when the request omits it to avoid
breaking direct-call test payloads.

MessageCodec runtime validation, GatewaySession envelope, GenerationOutcome
contract, Trajectory token-truth all unchanged. No pydantic, no openai SDK
runtime dependency.

Spec: cxb_dev/docs/plans/2026-06-04-gateway-openai-sdk-typed-io-design.md
Addresses PR verl-project#25 wuxibin89 review: typed request/response.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@zackcxb zackcxb force-pushed the gateway_framework_pr branch from a17e1ed to d0ad4af Compare June 5, 2026 07:19
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@zackcxb zackcxb force-pushed the gateway_framework_pr branch from d0ad4af to 348f520 Compare June 8, 2026 01:26
@zackcxb

zackcxb commented Jun 8, 2026

Copy link
Copy Markdown
Author

Hi, I noticed that, in the original verl AgentLoopManager/AgentLoopManagerTQ, it spawns num_workers independent AgentLoopWorker(Ray actors), distributing the total train_batch×rollout.n agent loops across these actors for parallelism. However, OpenAICompatibleAgentFramework.generate_sequences currently runs all train_batch×rollout.n sessions in a single asyncio event loop inside the PPOTrainer process. This means CPU-bound operations in one agent loop (e.g. tool execution) will block all other concurrent sessions.

Suggestion: Refer the original verl AgentLoopManager pattern — introduce multiple AgentLoopWorker Ray actors, partition the batch tasks across them.

# refer AgentLoopManager._init_agent_loop_workers()
for i in range(num_workers):
    worker = AgentLoopWorker.options(
        scheduling_strategy=NodeAffinitySchedulingStrategy(node_id=..., soft=True)
    ).remote(config, ...)
    self.workers.append(worker)

async def generate_sequences(self, prompts):
    chunks = prompts.chunk(len(self.workers))
    await asyncio.gather(*[
        w.generate_sequences.remote(chunk)
        for w, chunk in zip(self.workers, chunks)
    ])

This separates two independent concurrency axes: gateway_count for LLM serving throughput, num_workers for agent execution parallelism — consistent with the original verl design.

Hi, I noticed that, in the original verl AgentLoopManager/AgentLoopManagerTQ, it spawns num_workers independent AgentLoopWorker(Ray actors), distributing the total train_batch×rollout.n agent loops across these actors for parallelism. However, OpenAICompatibleAgentFramework.generate_sequences currently runs all train_batch×rollout.n sessions in a single asyncio event loop inside the PPOTrainer process. This means CPU-bound operations in one agent loop (e.g. tool execution) will block all other concurrent sessions.

Suggestion: Refer the original verl AgentLoopManager pattern — introduce multiple AgentLoopWorker Ray actors, partition the batch tasks across them.

# refer AgentLoopManager._init_agent_loop_workers()
for i in range(num_workers):
    worker = AgentLoopWorker.options(
        scheduling_strategy=NodeAffinitySchedulingStrategy(node_id=..., soft=True)
    ).remote(config, ...)
    self.workers.append(worker)

async def generate_sequences(self, prompts):
    chunks = prompts.chunk(len(self.workers))
    await asyncio.gather(*[
        w.generate_sequences.remote(chunk)
        for w, chunk in zip(self.workers, chunks)
    ])

This separates two independent concurrency axes: gateway_count for LLM serving throughput, num_workers for agent execution parallelism — consistent with the original verl design.

Thanks for the comment, this is a valid point. The current framework path fans out all
batch_size * rollout.n sessions with asyncio.gather inside the trainer process, and the
existing semaphore only caps coroutine concurrency; it does not isolate CPU-bound tool or
sandbox work. We have also seen this show up with SWE-style runners where sandbox work
can block unrelated sessions.

Since I have split the current PR into three separate PRs (gateway, framework, and examples), I will treat this as a framework-layer follow-up and address it in the next PR instead of this one.
For the framework PR, I plan to keep the reference OpenAICompatibleAgentFramework the lightweight inline implementation but documenting the dispatching mode. Heavy runners such as SWE can provide a framework subclass that overrides the session execution boundary and runs the agent runner
in process/Ray workers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants