chore: refactor env group config into Env wrapper classes#2193
chore: refactor env group config into Env wrapper classes#2193mikasenghaas wants to merge 48 commits intomainfrom
Conversation
…field - Remove ValConfig and all val-related orchestrator code (not part of env group refactor) - Move env_ratios from BufferConfig to per-env ratio field on EnvConfig - Fix merge artifacts: train_env_names -> train_envs.names, env_cfg.server.address -> env_cfg.address - Add validator ensuring env ratios are all-or-nothing across envs - Update tests and example configs Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Extract spawn/connect/shutdown into Envs class with atexit cleanup - Auto-detect group rubric envs and use run_group instead of deferred scoring - Remove deferred_group_scoring_tasks, max_concurrent from EnvConfig - Fix stale eval references (eval_env_set, eval_env_names, config.eval.num_examples) - Use Sequence[EnvConfig] for covariant type compatibility Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Env wraps vf.Environment + config with per-env state (group scoring, max_retries, ratio, spawn/connect/shutdown, generate_rollout/generate_group) - EvalEnv extends Env with num_examples, rollouts_per_example, and evaluate() - Envs is now a thin container over Env instances - Scheduler uses env.uses_group_scoring and env.generate_* directly instead of maintaining parallel dicts/sets - Buffer iterates Env instances directly for datasets and ratios - evaluate_and_log replaces evaluate_env, taking an EvalEnv directly - Tests use make_env/make_envs helpers for cleaner Env construction Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- sampling_args set once at env init via Envs.set_sampling_args() - Remove per-step compute_temperature + set_sampling_args from training loop - get_sampling_args no longer takes temperature param - Add TrainEnv/EvalEnv subclasses with distinct get_dataset behavior - EvalEnv.get_dataset calls get_eval_dataset, base Env uses get_dataset - Buffer no longer needs eval flag — uses env.get_dataset() polymorphically - Raise on connect if address not configured Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- TrainEnvs takes SamplingConfig and computes train sampling args once - EvalEnvs takes EvalSamplingConfig and computes eval sampling args once - Remove set_sampling_args, get_eval_sampling_args calls from orchestrator loop - evaluate_and_log uses eval_env.sampling_args directly - Access max_retries and ratio through config instead of duplicated attrs Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Inline run_rollout/run_group logic into Env.generate_rollout/generate_group - Make uses_group_scoring a property on Env (reads rubric directly) - Rename get_sampling_args -> get_train_sampling_args - Move get_eval_sampling_args from eval_utils to utils (next to train variant) - Rename vf_env -> _env Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…_group - Remove sampling config from TrainEnv/EvalEnv constructors - Orchestrator computes sampling args and sets via Envs.set_sampling_args() - Rename generate_rollout -> run_rollout, generate_group -> run_group Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Remove dead vf_utils functions: run_rollout, spawn_env_server, setup_env_client, wait_for_env_servers, task_uses_group_scoring - Inline spawn logic (ZMQEnvServer.run_server) directly into Env.spawn() - Inline connect logic (ZMQEnvClient) directly into Env.connect() - Convert name and uses_group_scoring from properties to attributes - Clean up unused imports from vf_utils Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- EvalEnv.evaluate() now contains the full eval pipeline (get inputs, round-robin clients, gather groups, collect results) - Remove run_group, generate, evaluate functions from vf_utils - Remove dead constants (DEFAULT_RETRIES, DEFAULT_STATE_COLUMNS, REQUIRED_STATE_COLUMNS) from vf_utils Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- EvalEnv.evaluate() now handles rollout generation, metrics, and logging - Support group scoring in evaluate() via requires_group_scoring branch - Rename uses_group_scoring -> requires_group_scoring - Remove semaphore (set_semaphore, get_semaphore) and max_concurrent config - Remove log_eval_results, evaluate_and_log — eval_utils down to ckpt_step + pass@k - Remove strip_env_version, add EnvConfig.stripped_id property - Use model_name consistently Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Resolve train env num_workers in OrchestratorConfig from max_inflight_rollouts - Resolve eval env num_workers in EvalEnvConfig from num_examples * rollouts_per_example - Remove resolve_num_workers from vf_utils, max_concurrent from Env.spawn() - Rename by_task -> by_env in scheduler - Add EnvConfig.stripped_id property, remove strip_env_version function - Use debug-level logs for spawn/connect - Use prime-rl logger in process utils instead of loguru - Fix eval_envs unbound error when no eval config - Add multi_reverse_text test config without external server Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…nv_name - Remove [orchestrator.val] sections from all config files (ValConfig removed) - Fix test_scheduler: task= -> env_name= in InflightRolloutInfo - Warn when eval env defaults to 4 workers due to num_examples=-1 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Env.run() dispatches to run_rollout or run_group based on requires_group_scoring and rollouts_per_example, always returns list - Scheduler and EvalEnv.evaluate() both use env.run() — no more branching - Unified result handling in scheduler (single path for both modes) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ainEnvs Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…internal Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Remove example["task"] = env.name override in buffer - Change default hash_keys from ["task", "prompt"] to ["env_name", "prompt"] - Add env_name column to wandb and prime monitor logging - task field now purely vf-internal (set by verifiers, not overridden) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… cleanups - Restore `if eval_ckpt_step is not None` guard so evals run at interval steps only - Use `self.verification.enabled and env.score_rollouts` so global flag is respected - Fix stray `)` in spawn log message - Fix `update_pools` return type annotation (str, not list[str]) - Use `get_logger().warning()` instead of `warnings.warn()` in EvalEnvConfig Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Remove unused parse_num_completion_tokens and parse_is_truncated_completions - Fix ratio description: ratios are relative weights, not required to sum to 1 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Delete utils/envs.py and trainer/envs.py (97 lines of env-var parsing ceremony) - Inline os.environ.get() calls directly in World.__init__ - Remove dead parse helpers and fix ratio field description Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Delete TemperatureSchedulerConfig and temp_scheduling module - Make SamplingConfig.temperature a plain float (default 1.0) - Use tuple(dict) + random.choice for buffer sampling instead of materializing a list of values on every call Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…spawn - Env.start() spawns (if needed) and connects in one call - Envs.start() does the same for all envs in parallel - Add requires_env_server_spawn property for readability - Move spawn logic to private _spawn() Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Env.run_rollout() returns a single RolloutOutput - Env.run_group() returns list[RolloutOutput] for group-scoring envs - EvalEnv.evaluate() dispatches correctly: n*k individual run_rollout calls for non-group envs, n run_group(k) calls for group-scoring envs - Replace _get_eval_inputs with own logic (get_dataset + flat repeat) - Scheduler normalizes single/list results via isinstance check - Progress bar tracks n*k rollouts in both eval modes Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 3 potential issues.
Autofix Details
Bugbot Autofix prepared fixes for all 3 issues found in the latest run.
- ✅ Fixed: Scheduler passes raw dict instead of
vf.RolloutInput- Scheduler now converts examples to vf.RolloutInput (stripping env_name) before calling env.run_rollout/run_group.
- ✅ Fixed: Missing CHANGELOG entry for new
score_rolloutsconfig field- Added CHANGELOG entry documenting the new orchestrator.env.score_rollouts field and default.
- ✅ Fixed: Group-scoring envs re-schedule individual rollouts incorrectly
- Group-scoring now discards partial groups and re-runs the full group if any rollout fails, avoiding mixed scoring contexts.
Or push these changes by commenting:
@cursor push c42003423a
Preview (c42003423a)
diff --git a/CHANGELOG.md b/CHANGELOG.md
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -7,6 +7,7 @@
- **`orchestrator.max_concurrent` removed**: Concurrency limiting via `max_concurrent` and the global semaphore have been removed. Existing configs must delete this field. (2026-04-05)
- **`orchestrator.buffer.hash_keys` default changed**: Default changed from `["task", "prompt"]` to `["env_name", "prompt"]`. The `task` field is no longer overridden by the orchestrator for env identification; `env_name` is used instead. Buffer checkpoints using the old default may not resume correctly. (2026-04-05)
- **`orchestrator.eval.env[].num_examples` / `rollouts_per_example` no longer fall through**: `num_examples` and `rollouts_per_example` are now required per eval env and no longer inherit from the top-level `orchestrator.eval` section. (2026-04-05)
+- **`orchestrator.env.score_rollouts` (NEW)**: Added per-env boolean to control rubric scoring on the env server (default: `true`). Also gated by `orchestrator.verification.enabled`. (2026-04-05)
- **`orchestrator.eval.env[].failed_rollouts` metric is now a ratio**: The `eval/{name}/failed_rollouts` metric now reports a ratio (0.0–1.0) instead of a raw count. Dashboards keying on this metric should be updated. (2026-04-05)
- **`orchestrator.sampling.temp_scheduler` removed**: Temperature scheduling (`TemperatureSchedulerConfig`) has been removed. `sampling.temperature` is now a required `float` (default `1.0`). Existing configs using `temp_scheduler` must replace it with a fixed `temperature` value. (2026-04-05)
- **`log.file` and `log.env_worker_logs` removed**: Removed `log.file` (from `LogConfig` and `SharedLogConfig`) and `log.env_worker_logs` (from `LogConfig`). Python file logging is replaced by deployment-level capture. Existing configs using these fields must delete them. Log paths unified: `.stdout` files renamed to `.log`, SLURM logs moved from `slurm/` to `logs/`. (2026-03-31)
diff --git a/src/prime_rl/orchestrator/scheduler.py b/src/prime_rl/orchestrator/scheduler.py
--- a/src/prime_rl/orchestrator/scheduler.py
+++ b/src/prime_rl/orchestrator/scheduler.py
@@ -196,20 +196,25 @@
if env.requires_group_scoring:
rollouts_per_example = group.rollouts_to_schedule
group.rollouts_to_schedule = 0
+ # Convert example dict to vf.RolloutInput, stripping scheduler-internal keys
+ clean_example_dict = {k: v for k, v in group.example.items() if k != "env_name"}
+ rollout_input = vf.RolloutInput(**clean_example_dict)
task = asyncio.create_task(
env.run_group(
client=client_config,
- example=group.example,
+ example=rollout_input,
model_name=self.model_name,
rollouts_per_example=rollouts_per_example,
)
)
else:
group.rollouts_to_schedule -= 1
+ clean_example_dict = {k: v for k, v in group.example.items() if k != "env_name"}
+ rollout_input = vf.RolloutInput(**clean_example_dict)
task = asyncio.create_task(
env.run_rollout(
client=client_config,
- example=group.example,
+ example=rollout_input,
model_name=self.model_name,
)
)
@@ -410,17 +415,23 @@
# Check for empty/errored rollouts and reschedule
valid_rollouts = []
+ had_invalid = False
+ requires_group_scoring = self.envs.get(env_name).requires_group_scoring
for rollout in rollouts:
if len(rollout["trajectory"]) == 0:
self.empty_rollouts_by_env[env_name] += 1
- group.rollouts_to_schedule += 1
+ had_invalid = True
+ if not requires_group_scoring:
+ group.rollouts_to_schedule += 1
self.logger.warning(
f"Empty trajectory in group {group_id} ({env_name}), re-scheduling "
f"({len(group.completed_rollouts)}/{self.rollouts_per_example} complete)"
)
elif rollout["error"] is not None:
self.errored_rollouts_by_env[env_name] += 1
- group.rollouts_to_schedule += 1
+ had_invalid = True
+ if not requires_group_scoring:
+ group.rollouts_to_schedule += 1
self.logger.warning(
f"Rollout error in group {group_id} ({env_name}), re-scheduling "
f"({len(group.completed_rollouts)}/{self.rollouts_per_example} complete): "
@@ -430,6 +441,13 @@
rollout["env_name"] = env_name
valid_rollouts.append(rollout)
+ # For group-scoring envs, treat each run_group attempt atomically:
+ # if any rollout in the group fails, discard the entire attempt and re-run full group.
+ if requires_group_scoring and had_invalid:
+ group.completed_rollouts.clear()
+ group.rollouts_to_schedule = self.rollouts_per_example
+ continue
+
group.completed_rollouts.extend(valid_rollouts)
if len(group.completed_rollouts) < self.rollouts_per_example:
continueThis Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.
…dule - Remove score_rollouts field from EnvConfig (was internal, not user-facing) - Fix group-scoring envs: discard entire group on partial failure and reschedule from scratch (group rewards require all rollouts together) - Fix "Eval environments ready" -> "Eval environment(s) ready" Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Delete VerificationConfig class and verification field from OrchestratorConfig - Remove validate_verification_config validator - Stop setting score_rollouts in extra_env_kwargs (defaults to True in verifiers) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
run_rollout returns a single RolloutOutput, not a list. Without wrapping, iterating over the dict yields keys instead of the rollout. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Convert from NamedTuple to dataclass for direct mutation of off_policy_steps, and rename to better reflect what it represents. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
inflight_rollout_count, cancel_inflight_rollouts, and drop_group all counted tasks (1 per group request) instead of actual rollouts. This caused the capacity check to allow up to rollouts_per_example × more concurrent rollouts than max_inflight_rollouts intended. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 6b01f9d. Configure here.
| max_retries=self.config.max_retries, | ||
| state_columns=REQUIRED_STATE_COLUMNS, | ||
| env_client=self.env_client, | ||
| ) |
There was a problem hiding this comment.
Shared mutable dict passed to concurrent rollouts
Medium Severity
Env.run_group builds the input list as [example for _ in range(rollouts_per_example)], passing N references to the same mutable dict. Similarly, Env.run_rollout passes group.example directly from the scheduler. The old wrappers in vf_utils.py created a fresh shallow copy via vf.RolloutInput(**example) for each rollout/group element. If vf.Environment.run_rollout or run_group mutates the input dict (e.g. setting task), all concurrent rollouts in a group share and clobber the same object. The eval path correctly creates copies with vf.RolloutInput(**example), making this inconsistency more evident.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 6b01f9d. Configure here.
| def inflight_sample_count(self) -> int: | ||
| return self.inflight_rollout_count + sum(g.rollouts_to_schedule for g in self.groups.values()) | ||
| pending = sum(g.rollouts_to_schedule for g in self.groups.values()) | ||
| return self.inflight_rollout_count + pending |
There was a problem hiding this comment.
Group scheduling can overshoot max inflight rollouts limit
Low Severity
_schedule_next_request checks remaining_capacity > 0 before calling schedule_rollout, but for group-scoring envs, schedule_rollout consumes all rollouts_to_schedule at once (setting rollout_count = group.rollouts_to_schedule). When remaining capacity is less than rollouts_per_example, this overshoots max_inflight_rollouts by up to rollouts_per_example - 1. The old code scheduled one rollout per task, respecting the capacity precisely.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 6b01f9d. Configure here.



Motivation
EnvGroupconcatenated all env datasets at init, requiring matching column names across envs — can easily break with heterogeneous env mixesThis PR deprecates
EnvGroupin favor of per-envEnvwrapper classes that own their state and lifecycle, and flattens the config so each environment is configured independently.Summary
Env,TrainEnv,EvalEnvwrapper classes that encapsulate per-env state (config, spawn/connect/shutdown, rollout generation, sampling args, group scoring detection)TrainEnvs/EvalEnvsare typed generic containers with bulk spawn/connect/shutdown and deprecatevf.EnvGroup. This also means we stop hijacking vftaskfield but use a resolved env name as primary env identifierbuffer.env_ratiosto per-envratiofield onEnvConfigrun_groupinstead of deferred group scoring -- this resolves the issueValConfigand validation loop — users can register train envs as eval envs to get the same functionalityutils/envs.pyandtrainer/envs.pyenv-var modules — inlinesos.environreads intoWorld(mainly to free upenvs.pymodule on orchestrator but also removes some unnecessary bloat)parse_num_completion_tokensandparse_is_truncated_completionshelpers)Breaking changes
[orchestrator.val]config section removedbuffer.env_ratiosconfig field removed — use per-envratiofield insteadmax_concurrentconfig field removedhash_keysdefault changed from["task", "prompt"]to["env_name", "prompt"]🤖 Generated with Claude Code
Note
High Risk
Large refactor of the RL orchestrator’s core env lifecycle, sampling/buffer semantics, and scheduler request handling, plus multiple breaking config removals; regressions could affect rollout generation, checkpoint resume, and eval metrics.
Overview
Refactors the orchestrator to stop using
vf.EnvGroupand instead manage per-environment wrapper objects (Env/TrainEnv/EvalEnv) with container classes (TrainEnvs/EvalEnvs) that handle spawn/connect/shutdown, sampling args, and group-scoring detection.Updates config and runtime behavior to match the new model: removes
[orchestrator.val],orchestrator.max_concurrent,orchestrator.verification, and temperature scheduling; movesbuffer.env_ratiosto per-envratio; requires per-eval-envnum_examples/rollouts_per_example; changes buffer checkpoint hashing default to['env_name','prompt']; and changeseval/*/failed_rolloutsto log a ratio.Reworks buffer and scheduler logic around
env_name(no longer overloadingtask) with per-env datasets/pools, ratio-aware sampling, updated checkpoint save/load, and new group-scoring execution viarun_group; updates monitors/W&B tables to includeenv_name, and removes the env-var helper modules in favor of directos.environreads inWorld.Reviewed by Cursor Bugbot for commit 6b01f9d. Bugbot is set up for automated code reviews on this repo. Configure here.