[misc] fix: synthesize zero routed_experts for empty trajectories under rout…#57
Conversation
…er replay When enable_rollout_routing_replay (R3) is on, successful trajectories carry a routed_experts tensor while failed/empty ones returned None. verl's _postprocess decides inclusion from inputs[0] only and torch.cat's the whole batch, so a mixed None/tensor batch either crashes or silently drops replay data. Emit a shape-consistent zero routed_experts (num_hidden_layers, num_experts_per_tok, int64) for empty outputs; the row is traj_masked so zeros never reach the loss. Also add examples/agent_train/train_qwen3_moe_rc.sh: Qwen3 MoE decoupled-PPO + rollout-correction training script. Co-authored-by: Cursor <cursoragent@cursor.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a training script for Qwen3 MoE with rollout correction and updates the agent loop to support router replay (R3) by synthesizing zero-filled routed experts for failed trajectories. It also includes the reward score in the saved interaction results. The feedback highlights a critical performance issue where a blocking synchronous I/O call (AutoConfig.from_pretrained) is executed inside the asyncio event loop, which should be offloaded to a separate thread. Additionally, there are redundant and conflicting variable definitions for RAY_DATA_HOME, NNODES_ROLLOUT, and NNODES_TRAIN in the training script that need to be cleaned up.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
I am having trouble creating individual review comments. Click here to see my feedback.
uni_agent/agent_loop.py (169-210)
The method _get_routing_replay_shape performs a synchronous, blocking network/disk I/O call (AutoConfig.from_pretrained) inside the asyncio event loop. This blocks the entire event loop thread, which can cause timeouts, heartbeat failures, or severe performance degradation for other concurrent agent loops running on the same worker process.
To resolve this, make both _synth_failed_routed_experts and _get_routing_replay_shape asynchronous, and run the blocking AutoConfig.from_pretrained call in a separate thread using asyncio.to_thread.
routed_experts=await self._synth_failed_routed_experts(dummy_response_length),
multi_modal_data={},
reward_score=0,
num_turns=0,
metrics={},
extra_fields=extra_fields,
)
async def _synth_failed_routed_experts(self, length: int) -> np.ndarray | None:
"""Synthesize a zero ``routed_experts`` of shape ``(length, num_layers, top_k)``."""
shape = await self._get_routing_replay_shape()
if shape is None:
return None
num_layers, top_k = shape
return np.zeros((length, num_layers, top_k), dtype=np.int64)
async def _get_routing_replay_shape(self) -> tuple[int, int] | None:
"""Resolve and cache ``(num_hidden_layers, num_experts_per_tok)`` for the rollout
model. Returns ``None`` if rollout routing replay is off or the model has no
experts. The HF config is loaded at most once per worker process."""
rollout_cfg = self.config.actor_rollout_ref.rollout
if not bool(getattr(rollout_cfg, "enable_rollout_routing_replay", False)):
return None
cls = UniAgentLoop
if not cls._routing_replay_resolved:
from transformers import AutoConfig
model_path = self.config.actor_rollout_ref.model.path
model_cfg = await asyncio.to_thread(
AutoConfig.from_pretrained, model_path, trust_remote_code=True
)
# Newer Qwen3 nests MoE fields under ``text_config``; older configs keep them
# at the top level. ``... or 0`` guards against fields explicitly set to None.
text_cfg = getattr(model_cfg, "text_config", None) or model_cfg
num_layers = int(getattr(text_cfg, "num_hidden_layers", 0) or 0) or int(
getattr(model_cfg, "num_hidden_layers", 0) or 0
)
top_k = int(getattr(text_cfg, "num_experts_per_tok", 0) or 0) or int(
getattr(model_cfg, "num_experts_per_tok", 0) or 0
)
cls._routing_replay_shape = (num_layers, top_k) if num_layers > 0 and top_k > 0 else None
cls._routing_replay_resolved = True
self.logger.info(f"routed_experts replay shape resolved: {cls._routing_replay_shape}")
return cls._routing_replay_shapeexamples/agent_train/train_qwen3_moe_rc.sh (4-12)
The variable RAY_DATA_HOME is unconditionally assigned on line 4 (RAY_DATA_HOME=/mnt/hdfs/yyding), which makes the conditional assignment on line 12 (RAY_DATA_HOME=${RAY_DATA_HOME:-"${HOME}/verl"}) dead code. If you want to allow overriding RAY_DATA_HOME via environment variables, you should use conditional assignment (${RAY_DATA_HOME:-...}) from the start.
RAY_DATA_HOME=${RAY_DATA_HOME:-"${HOME}/verl"}
NNODES_ROLLOUT=${NNODES_ROLLOUT:-16}
NNODES_TRAIN=${NNODES_TRAIN:-4}
GEN_TP=${GEN_TP:-2}
project_name=${PROJECT_NAME:-'Uni-Agent-Qwen3-Coder-30B-megatron'}
exp_name=${EXP_NAME:-"$(date +%Y%m%d%H)_exp"}
examples/agent_train/train_qwen3_moe_rc.sh (73-75)
The variables NNODES_ROLLOUT and NNODES_TRAIN are already unconditionally defined at the top of the script. These duplicate conditional assignments are redundant and can be removed to avoid confusion about the default values (e.g., 16 vs 12 for NNODES_ROLLOUT).
NGPUS_PER_NODE=${NGPUS_PER_NODE:-8}
When enable_rollout_routing_replay (R3) is on, successful trajectories carry a routed_experts tensor while failed/empty ones returned None. verl's _postprocess decides inclusion from inputs[0] only and torch.cat's the whole batch, so a mixed None/tensor batch either crashes or silently drops replay data. Emit a shape-consistent zero routed_experts (num_hidden_layers, num_experts_per_tok, int64) for empty outputs; the row is traj_masked so zeros never reach the loss.
Also add examples/agent_train/train_qwen3_moe_rc.sh: Qwen3 MoE decoupled-PPO + rollout-correction training script.
What does this PR do?
Checklist Before Starting
[{modules}] {type}: {description}(checked by CI){modules}may includecore,interaction,model,env,tools,deployment,reward,dashboard,docs,examples,data,train,ci,build,deps,misc,like[interaction, tools, docs]{type}must be one offeat,fix,refactor,chore,test[BREAKING]to the beginning of the title[1/N][BREAKING][deployment, docs] feat: simplify runtime env configurationTest
API and Usage Example
# Add a short example here when the PR changes public behaviorDesign & Code Changes
Checklist Before Submitting
pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=always