diff --git a/openseek/competition/pz/UCI001/gsm8k.py b/openseek/competition/pz/UCI001/gsm8k.py new file mode 100644 index 0000000..0b8cb63 --- /dev/null +++ b/openseek/competition/pz/UCI001/gsm8k.py @@ -0,0 +1,97 @@ +# Copyright 2024 Bytedance Ltd. and/or its affiliates +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import re + + +def extract_solution(solution_str, method="strict"): + assert method in ["strict", "flexible"] + + if method == "strict": + # Strict: extract content of the last \boxed{...} (align with evaluator's parser) + if "boxed" not in solution_str: + final_answer = None + else: + ans = solution_str.split("boxed")[-1] + if len(ans) == 0: + final_answer = None + elif ans[0] == "{": + stack = 1 + a = "" + for c in ans[1:]: + if c == "{": + stack += 1 + a += c + elif c == "}": + stack -= 1 + if stack == 0: + break + a += c + else: + a += c + final_answer = a.replace(",", "").replace("$", "") + else: + a = ans.split("$")[0].strip() + final_answer = a.replace(",", "").replace("$", "") + elif method == "flexible": + answer = re.findall("(\\-?[0-9\\.\\,]+)", solution_str) + final_answer = None + if len(answer) == 0: + # no reward is there is no answer + pass + else: + invalid_str = ["", "."] + # find the last number that is not '.' + for final_answer in reversed(answer): + if final_answer not in invalid_str: + break + return final_answer + + +def compute_score(solution_str, ground_truth, method="strict", format_score=0.2, score=1.0): + """GSM8K reward with \\boxed{} strict parsing and numeric equivalence. + + Scoring policy: + - Strict format (\\boxed{...}) and numerically correct -> score (default 1.0) + - Else if strict format present (even if wrong) -> format_score (default 0.2) + - Else -> 0.0 + """ + # Extract strict (format-aware) and flexible answers + answer_strict = extract_solution(solution_str=solution_str, method="strict") + has_strict_format = answer_strict is not None + + # Numeric equivalence helper + def _to_float(x): + try: + return float(str(x).replace(",", "").replace("$", "").strip()) + except Exception: + return None + + def _num_equal(a, b): + av = _to_float(a) + bv = _to_float(b) + if av is not None and bv is not None: + return av == bv + return str(a).strip() == str(b).strip() + + # 1) Strict-correct → full score + if answer_strict is not None and _num_equal(answer_strict, ground_truth): + return float(score) + + # 2) Format-only bonus if strict format present + if has_strict_format: + return float(0.2) + + # 3) No reward + return 0.0 diff --git a/openseek/competition/pz/UCI001/gsm8k_lxm2_newprompt_trainval.py b/openseek/competition/pz/UCI001/gsm8k_lxm2_newprompt_trainval.py new file mode 100644 index 0000000..d47752b --- /dev/null +++ b/openseek/competition/pz/UCI001/gsm8k_lxm2_newprompt_trainval.py @@ -0,0 +1,168 @@ +# Copyright 2024 Bytedance Ltd. and/or its affiliates +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Preprocess the GSM8k dataset to parquet format +""" + +import argparse +import os +import re + +import datasets +from glob import glob + +# from verl.utils.hdfs_io import copy, makedirs + + +def extract_solution(solution_str): + solution = re.search("#### (\\-?[0-9\\.\\,]+)", solution_str) + assert solution is not None + final_solution = solution.group(0) + final_solution = final_solution.split("#### ")[1].replace(",", "") + return final_solution + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--local_dir", default="/aipilot/ai-platform/datasets/openseek_data/sft_data/Big-Math-RL-Verified-Processed_pri-mid") + parser.add_argument("--ms_base_dir", default="/aipilot/ai-platform/datasets/openseek_data/sft_data/Big-Math-RL-Verified-Processed") + parser.add_argument("--hdfs_dir", default=None) + parser.add_argument("--val_ratio", type=float, default=0.05, help="Validation split ratio (0-1)") + parser.add_argument("--seed", type=int, default=42, help="Random seed for shuffling/splitting") + + args = parser.parse_args() + + ms_base_dir = args.ms_base_dir + + train_candidates = glob(os.path.join(ms_base_dir, "**", "big-math-rl-verified-processed-train.arrow"), recursive=True) + # test_candidates = glob(os.path.join(ms_base_dir, "**", "gsm8k-test.arrow"), recursive=True) + + assert len(train_candidates) > 0, f"未在 {ms_base_dir} 下找到 gsm8k-train.arrow" + # assert len(test_candidates) > 0, f"未在 {ms_base_dir} 下找到 gsm8k-test.arrow" + + train_data_source = max(train_candidates, key=os.path.getmtime) + # test_data_source = max(test_candidates, key=os.path.getmtime) + + print(f"[Info] train arrow: {train_data_source}") + # print(f"[Info] test arrow: {test_data_source}") + + from datasets import Dataset + train_dataset = Dataset.from_file(train_data_source) + # test_dataset = Dataset.from_file(test_data_source) + + print(f"[Info] train rows: {train_dataset.num_rows}") + # print(f"[Info] test rows: {test_dataset.num_rows}") + + assert train_dataset.num_rows > 0, "train 数据为空" + # assert test_dataset.num_rows > 0, "test 数据为空" + + # train_dataset = dataset["train"] + # test_dataset = dataset["test"] + + # instruction_following = 'Let\'s think step by step and output the final answer after "####".' + # instruction_following = 'Please reason step by step.\nIn the last line, write the answer after "The answer is:" and don\'t include any other text.' + instruction_following = 'Please reason step by step, and put your final answer within \\boxed{}.\nQuestion:\n' + + # 先进行过滤,避免在 map 中返回 None 导致错误 + allowed_sources = {"orca_math", "cn_k12", "gsm8k"} + + def _filter_source(ex): + return ex.get("source") in allowed_sources + + def _is_float_convertible_solution(ex): + val = ex.get("solution") + if val is None: + return False + try: + float(str(val).strip().replace(",", "")) + return True + except Exception: + return False + + print(f"[Info] before filter rows: {train_dataset.num_rows}") + train_dataset = train_dataset.filter(_filter_source) + print(f"[Info] after source filter (source in {sorted(list(allowed_sources))}) rows: {train_dataset.num_rows}") + train_dataset = train_dataset.filter(_is_float_convertible_solution) + print(f"[Info] after float-convertible solution filter rows: {train_dataset.num_rows}") + + # add a row to each data item that represents a unique id + def make_map_fn(split): + def process_fn(example, idx): + # print(example) + question_raw = example.pop("prompt") + + question = instruction_following + question_raw + + answer_raw = example.pop("solution") + solution = answer_raw + data = { + # "data_source": data_source, + "data_source": "openai/gsm8k", + "prompt": [ + { + "role": "user", + "content": question, + } + ], + "ability": "math", + "reward_model": {"style": "rule", "ground_truth": solution}, + "extra_info": { + "split": split, + "index": idx, + "answer": answer_raw, + "question": question_raw, + }, + } + return data + + return process_fn + + # 首先执行统一的 map 处理,随后再进行随机划分 + processed_dataset = train_dataset.map(function=make_map_fn("trainval"), with_indices=True) + print(f"[Info] processed rows: {processed_dataset.num_rows}") + + # 随机划分 train/validation(默认 10% 为验证集),保证可复现性 + split_result = processed_dataset.train_test_split(test_size=args.val_ratio, seed=args.seed, shuffle=True) + train_dataset = split_result["train"] + val_dataset = split_result["test"] + + # 将 extra_info.split 字段分别标注为 train/validation + def _set_split_field(split_name): + def _fn(ex): + extra = dict(ex.get("extra_info", {})) + extra["split"] = split_name + return {"extra_info": extra} + return _fn + + train_dataset = train_dataset.map(_set_split_field("train")) + val_dataset = val_dataset.map(_set_split_field("test")) + print(f"[Info] split rows -> train: {train_dataset.num_rows}, val: {val_dataset.num_rows}") + + local_dir = args.local_dir + hdfs_dir = args.hdfs_dir + + os.makedirs(local_dir, exist_ok=True) + + # 分别输出 train/validation 两个 parquet 文件 + base_name = "big-math-rl-verified-processed_orca_cnk12_gsm8k_newprompt_2" + train_output = os.path.join(local_dir, f"{base_name}_train.parquet") + val_output = os.path.join(local_dir, f"{base_name}_val.parquet") + train_dataset.to_parquet(train_output) + val_dataset.to_parquet(val_output) + print(f"[Info] saved -> train: {train_output}\n[Info] saved -> val : {val_output}") + + if hdfs_dir is not None: + makedirs(hdfs_dir) + + copy(src=local_dir, dst=hdfs_dir) diff --git a/openseek/competition/pz/UCI001/ppo_trainer.yaml b/openseek/competition/pz/UCI001/ppo_trainer.yaml new file mode 100644 index 0000000..6b2cf24 --- /dev/null +++ b/openseek/competition/pz/UCI001/ppo_trainer.yaml @@ -0,0 +1,1051 @@ +# Format checks enforced on CI: +# 1. Comments must appear above each field. +# 2. There must be a blank line between each field. +# 3. Inline comments (after a field on the same line) are not allowed. +# 4. Indentation level is respected for nested fields. + +# dataset config +data: + + # Tokenizer class or path. If null, it will be inferred from the model. + tokenizer: null + + # Whether to use shared memory for data loading. + use_shm: False + + # Training set parquet. Can be a list or a single file. + # The program will read all files into memory, so it can't be too large (< 100GB). + # The path can be either a local path or an HDFS path. + # For HDFS path, we provide utils to download it to DRAM and convert it to a local path. + train_files: ~/data/rlhf/gsm8k/train.parquet + + # Validation parquet. Can be a list or a single file. + val_files: ~/data/rlhf/gsm8k/test.parquet + + # The field in the dataset where the prompt is located. Default is 'prompt'. + prompt_key: prompt + + # The field used to select the reward function (if using different ones per example). + reward_fn_key: data_source + + # Maximum prompt length. All prompts will be left-padded to this length. + # An error will be reported if the length is too long. + max_prompt_length: 512 + + # Maximum response length. Rollout in RL algorithms (e.g. PPO) generates up to this length. + max_response_length: 512 + + # Batch size sampled for one training iteration of different RL algorithms. + train_batch_size: 1024 + + # Batch size used during validation. Can be null. + val_batch_size: null + + # Whether to return the original input_ids without adding chat template. + # This is used when the reward model's chat template differs from the policy. + # If using a model-based RM with different templates, this should be True. + return_raw_input_ids: False + + # Whether to return the original chat (prompt) without applying chat template. + return_raw_chat: False + + # Whether to return the full prompt with chat template. + return_full_prompt: False + + # Whether to shuffle the data in the dataloader. + shuffle: True + + # Whether to shuffle the validation set. + validation_shuffle: False + + # Whether to filter overlong prompts. + filter_overlong_prompts: False + + # Number of workers for filtering overlong prompts. + # For large-scale datasets, filtering can be time-consuming. + # Use multiprocessing to speed up. Default is 1. + filter_overlong_prompts_workers: 1 + + # Truncate the input_ids or prompt if they exceed max_prompt_length. + # Options: 'error', 'left', or 'right'. Default is 'error'. + truncation: error + + # The field in the multi-modal dataset where the image is located. Default is 'images'. + image_key: images + + # The field in the multi-modal dataset where the video is located. + video_key: videos + + # If the remote tokenizer has a Python file, this flag determines whether to allow using it. + trust_remote_code: False + + # Optional: specify a custom dataset class path and name if overriding default loading behavior. + custom_cls: + + # The path to the file containing your customized dataset class. If not specified, pre-implemented dataset will be used. + path: null + + # The name of the dataset class within the specified file. + name: null + +# config for actor, rollout and reference model +actor_rollout_ref: + + # Whether it's a hybrid engine, currently only supports hybrid engine + hybrid_engine: true + + # common configs for the model + model: + + # Huggingface model path. This can be either local path or HDFS path. + path: ~/models/deepseek-llm-7b-chat + + # Custom chat template for the model. + custom_chat_template: null + + # Whether to use shared memory (SHM) for accelerating the loading of model weights + use_shm: false + + # Additional Python packages to register huggingface models/tokenizers. + external_lib: null + + # Used to override model's original configurations, mainly dropout + override_config: {} + + # Enable gradient checkpointing for actor + enable_gradient_checkpointing: true + + # Enable activation offloading for actor + enable_activation_offload: false + + # Whether to remove padding tokens in inputs during training + use_remove_padding: false + + # Set to positive value to enable LoRA (e.g., 32) + lora_rank: 0 + + # LoRA scaling factor + lora_alpha: 16 + + # Target modules to apply LoRA. Options: "all-linear" (not recommended for VLMs) or + # [q_proj,k_proj,v_proj,o_proj,gate_proj,up_proj,down_proj] + target_modules: all-linear + + # Exclude modules from applying Lora. Similar usage to target_modules and Peft. + # Example: '.*visual.*' for excluding the ViT in Qwen2.5-VL, as currently vllm does not support ViT Lora. + exclude_modules: null + + # Whether to use Liger for linear layer fusion + use_liger: false + + # Whether to use custom fused kernels (e.g., FlashAttention, fused MLP) + use_fused_kernels: false + + # Options for fused kernels. If use_fused_kernels is true, this will be used. + fused_kernel_options: + + # Implementation backend for fused kernels. Options: "triton" or "torch". + impl_backend: torch + + # Whether to enable loading a remote code model + trust_remote_code: false + + # actor configs + actor: + + # fsdp, fsdp2 or megatron. fsdp backend used here. + strategy: fsdp + + # Split each sample into sub-batches of this size for PPO + ppo_mini_batch_size: 256 + + # [Deprecated] Global micro batch size + ppo_micro_batch_size: null + + # Local per-GPU micro batch size + ppo_micro_batch_size_per_gpu: null + + # Whether to automatically adjust batch size at runtime + use_dynamic_bsz: false + + # Max tokens per GPU in one PPO batch; affects gradient accumulation + # Typically it should be: n * ${data.max_prompt_length} + ${data.max_response_length} + ppo_max_token_len_per_gpu: 16384 + + # Gradient clipping for actor updates + grad_clip: 1.0 + + # PPO clip ratio + clip_ratio: 0.2 + + # Lower bound for asymmetric clipping (used in dual-clip PPO) + clip_ratio_low: 0.2 + + # Upper bound for asymmetric clipping (used in dual-clip PPO) + clip_ratio_high: 0.2 + + # policy loss config + policy_loss: + + # Loss function mode: vanilla / clip-cov / kl-cov from https://arxiv.org/abs/2505.22617 + loss_mode: "vanilla" + + # Ratio of tokens to be clipped for clip-cov loss + clip_cov_ratio: 0.0002 + + # Lower bound for clip-cov loss + clip_cov_lb: 1.0 + + # Upper bound for clip-cov loss + clip_cov_ub: 5.0 + + # Ratio of tokens to be applied kl penalty for kl-cov loss + kl_cov_ratio: 0.0002 + + # KL divergence penalty coefficient + ppo_kl_coef: 0.1 + + # Constant C in Dual-clip PPO; clips when advantage < 0 and ratio > C + clip_ratio_c: 3.0 + + # Loss aggregation mode: "token-mean", "seq-mean-token-sum", or "seq-mean-token-mean" + loss_agg_mode: token-mean + + # Entropy regularization coefficient in PPO loss + entropy_coeff: 0 + + # Whether to use KL loss instead of KL reward penalty. True for GRPO + use_kl_loss: false + + # Whether to use torch.compile() + use_torch_compile: true + + # KL loss coefficient when use_kl_loss is enabled. For GRPO + kl_loss_coef: 0.001 + + # Type of KL divergence loss. Options: "kl"(k1), "abs", "mse"(k2), "low_var_kl"(k3), "full" + kl_loss_type: low_var_kl + + # Number of PPO epochs per batch + ppo_epochs: 1 + + # Shuffle training data across PPO epochs + shuffle: false + + # Sequence parallelism size for Ulysses-style model parallelism + ulysses_sequence_parallel_size: 1 + + # calculate entropy with chunking to reduce memory peak + entropy_from_logits_with_chunking: False + + # recompute entropy + entropy_checkpointing: False + + # checkpoint configs + checkpoint: + + # What to include in saved checkpoints + # with 'hf_model' you can save whole model as hf format, now only use sharded model checkpoint to save space + save_contents: ['model', 'optimizer', 'extra'] + + # For more flexibility, you can specify the contents to load from the checkpoint. + load_contents: ${actor_rollout_ref.actor.checkpoint.save_contents} + + # optimizer configs + optim: + + # Learning rate + lr: 1e-6 + + # Warmup steps; negative value delegates to lr_warmup_steps_ratio + lr_warmup_steps: -1 + + # Warmup steps ratio (used if lr_warmup_steps is negative) + lr_warmup_steps_ratio: 0.0 + + # Minimum LR ratio for cosine schedule + min_lr_ratio: 0.0 + + # Number of cosine cycles in LR schedule + num_cycles: 0.5 + + # LR warmup style: "constant" or "cosine" + warmup_style: constant + + # Total training steps (must be overridden at runtime) + total_training_steps: -1 + + # Weight decay + weight_decay: 0.01 + + # configs for FSDP + fsdp_config: + ########################################################## + model_dtype: fp32 + ########################################################## + + # policy for wrapping the model + wrap_policy: + + # Minimum number of parameters to trigger wrapping a layer with FSDP + min_num_params: 0 + + # Whether to offload model parameters to CPU (trades speed for memory) + param_offload: false + + # Whether to offload optimizer state to CPU + optimizer_offload: false + + # Only for FSDP2: offload param/grad/optimizer during train + offload_policy: false + + # Only for FSDP2: Reshard after forward pass to reduce memory footprint + reshard_after_forward: true + + # Number of GPUs in each FSDP shard group; -1 means auto + fsdp_size: -1 + + # Only for FSDP1: FSDP1 configuration, prefetch the next forward-pass all-gather + # before the current forward computation. + forward_prefetch: False + + # profiler configs + profiler: + + # True for each task has its own database, False for all tasks in one training step share one database. + discrete: False + + # Whether to profile all ranks. + all_ranks: False + + # The ranks that will be profiled. null or [0,1,...] + ranks: null + + # Reference model config. + # Reference model will be enabled when actor.use_kl_loss or/and algorithm.use_kl_in_reward is/are True. + ref: + + # actor_rollout_ref.ref: FSDP config same as actor. For models larger than 7B, it’s recommended to turn on offload for ref by default + strategy: ${actor_rollout_ref.actor.strategy} + + # config for FSDP strategy + fsdp_config: + + # whether to offload parameters in FSDP + param_offload: False + + # whether to perform reshard after model forward to save memory. + # only for fsdp2, [True, False, int between 1 and fsdp_size] + reshard_after_forward: True + + # Only for FSDP1: FSDP1 configuration, prefetch the next forward-pass all-gather + # before the current forward computation. + forward_prefetch: False + + # the wrap policy for FSDP model + wrap_policy: + + # minimum number of params in a wrapped module + min_num_params: 0 + + # whether to enable torch.compile + use_torch_compile: ${actor_rollout_ref.actor.use_torch_compile} + + # [Will be deprecated, use log_prob_micro_batch_size_per_gpu] + # The batch size for one forward pass in the computation of log_prob. Global batch size. + log_prob_micro_batch_size: null + + # The batch size for one forward pass in the computation of log_prob. Local batch size per GPU. + log_prob_micro_batch_size_per_gpu: null + + # enable dynamic batch size (sequence packing) for log_prob computation + log_prob_use_dynamic_bsz: ${actor_rollout_ref.actor.use_dynamic_bsz} + + # the max token length per GPU + log_prob_max_token_len_per_gpu: ${actor_rollout_ref.actor.ppo_max_token_len_per_gpu} + + # sequence parallel size + ulysses_sequence_parallel_size: ${actor_rollout_ref.actor.ulysses_sequence_parallel_size} + + # calculate entropy with chunking to reduce memory peak + entropy_from_logits_with_chunking: False + + # recompute entropy + entropy_checkpointing: False + + # profiler configs + profiler: + + # True for each task has its own database, False for all tasks in one training step share one database. + discrete: False + + # Whether to profile all ranks. + all_ranks: False + + # The ranks that will be profiled. null or [0,1,...] + ranks: null + + # Rollout model config. + rollout: + + # actor_rollout_ref.rollout.name: hf/vllm/sglang. + name: vllm + + # sync: LLM, async: AsyncLLM + mode: sync + + # Sampling temperature for rollout. + temperature: 1.0 + + # Top-k sampling parameter. -1 for vLLM rollout, 0 for HF rollout. + top_k: -1 + + # Top-p sampling parameter. Default 1.0. + top_p: 1 + + + # typically the same as data max prompt length + prompt_length: ${data.max_prompt_length} + + # typically the same as data max response length + response_length: ${data.max_response_length} + + # for vllm rollout + # Rollout model parameters type. Align with actor model's FSDP/Megatron type. + dtype: bfloat16 + + # Fraction of GPU memory used by vLLM/SGLang for KV cache. + gpu_memory_utilization: 0.5 + + # Whether to ignore EOS and continue generating after EOS is hit. + ignore_eos: False + + # Whether to disable CUDA graph. Default True to allow cache freeing. + enforce_eager: True + + # Whether to free engine KVCache after generation. Set enforce_eager=True when enabled. + free_cache_engine: True + + # Which loader to use for rollout model weights: dummy_dtensor, hf, megatron, etc. + # safetensors (for huge model, and set use_shm=True); dummy_dtensor: randomly init model weight + load_format: dummy_dtensor + + # for huge model, layered summon can save memory (prevent OOM) but make it slower + layered_summon: False + + # TP size for rollout. Only effective for vLLM. + tensor_model_parallel_size: 2 + + # max number of tokens in a batch + max_num_batched_tokens: 8192 + + # max length for rollout + max_model_len: null + + # max length of sequences + max_num_seqs: 1024 + + # [Will be deprecated, use log_prob_micro_batch_size_per_gpu] The batch size for one forward pass in the computation of log_prob. Global batch size. + log_prob_micro_batch_size: null + + # The batch size for one forward pass in the computation of log_prob. Local batch size per GPU. + log_prob_micro_batch_size_per_gpu: null + + # enable dynamic batch size (sequence packing) for log_prob computation + log_prob_use_dynamic_bsz: ${actor_rollout_ref.actor.use_dynamic_bsz} + + # max token length for log_prob computation + log_prob_max_token_len_per_gpu: ${actor_rollout_ref.actor.ppo_max_token_len_per_gpu} + + # disable logging statistics + disable_log_stats: True + + # may get higher throughput when set to True. When activated, Please increase max_num_batched_tokens or decrease max_model_len. + enable_chunked_prefill: True + + # for hf rollout + # Whether to sample during training rollout. False uses greedy sampling. + do_sample: True + + # number of responses (i.e. num sample times). > 1 for grpo + n: 1 + + # Whether to wake up inference engine in multi-stage. (Wake up model weights first, then resume kv cache) + multi_stage_wake_up: false + + # Extra inference engine arguments (vllm, sglang). + engine_kwargs: + + # for vllm + vllm: + + # Swap space (in GB) used by inference engine. null uses default (e.g., 4 GB). + swap_space: null + + # Whether to disable the preprocessor cache for multimodel models. + disable_mm_preprocessor_cache: False + + # for sglang + sglang: + + # The attention backend for sglang engine. Options: flashinfer, triton, flashmla, null for default. + attention_backend: null + + # Sampling parameters used during validation. + val_kwargs: + + # sampling parameters for validation + # Top-k sampling parameter. -1 for vLLM rollout, 0 for HF rollout. + top_k: -1 + + # Top-p sampling parameter. Default 1.0. + top_p: 1.0 + + # Sampling temperature for rollout. + temperature: 0 + + # whether to repeat n times for validation + n: 1 + + # Whether to sample during training rollout. False uses greedy sampling. + do_sample: False + + # Multi-turn interaction config for tools or chat. + multi_turn: + + # set to True for multi-turn tool interaction tasks; should set rollout.name to sglang as well + enable: False + + # null for no limit (default max_length // 3) + max_assistant_turns: null + + # null for no tool + tool_config_path: null + + # null for no limit (default max_length // 3) + max_user_turns: null + + # max parallel call for tools in single turn + max_parallel_calls: 1 + + # max length of tool response + max_tool_response_length: 256 + + # truncate side of tool response: left, middle, right + tool_response_truncate_side: middle + + # null for no interaction + interaction_config_path: null + + # null for default callback + completion_callback: null + + # - When set to True, the model's default chat template is used for multi-turn rollout, which typically matches production behavior. + # - When set to False, the token ids recorded for training are used instead; unlike the default chat template, these always include the model's full output, + # which may contain additional content such as reasoning content. This maintains the consistency between training and rollout, but it will lead to longer prompts. + use_inference_chat_template: False + + # Tokenization is performed turn by turn and the resulting token ids are concatenated to form the full conversation. + # To ensure this matches the result of tokenizing the entire conversation at once, a sanity check is run at the end of each multi-turn rollout to compare the two sets of token ids. + # Some models are known to produce different tokenization results when tokenizing turn by turn vs. all at once. aThis behavior has already been validated for them. + # To reduce excessive warnings, you can turn off the sanity check for these models if you are using their default chat template: + # Qwen/QwQ-32B, Qwen/Qwen3-xxB + # - off: disable tokenization sanity check + # - strict: enable strict tokenization sanity check (default) + # - ignore_strippable: ignore strippable tokens when checking tokenization sanity + tokenization_sanity_check_mode: strict + + # Format of the multi-turn interaction. Options: hermes, llama3_json, ... + format: hermes + + # support logging rollout prob for debugging purpose + calculate_log_probs: False + + # profiler configs + profiler: + + # True for each task has its own database, False for all tasks in one training step share one database. + discrete: False + + # Whether to profile all ranks. + all_ranks: False + + # The ranks that will be profiled. null or [0,1,...] + ranks: null + + # [Experimental] agent loop based rollout configs + agent: + + # Number of agent loop workers + num_workers: 8 + +# configs for the critic +critic: + + # Number of rollouts per update (mirrors actor rollout_n) + rollout_n: ${actor_rollout_ref.rollout.n} + + # fsdp or fsdp2 strategy used for critic model training + strategy: ${actor_rollout_ref.actor.strategy} + + # optimizer configs + optim: + + # Learning rate + lr: 1e-5 + + # Warmup steps ratio; total steps will be injected at runtime + lr_warmup_steps_ratio: 0. + + # Minimum LR ratio for cosine schedule + min_lr_ratio: null + + # LR warmup style: "constant" or "cosine" + warmup_style: constant + + # Total training steps (must be overridden at runtime) + total_training_steps: -1 + + # Weight decay + weight_decay: 0.01 + + # model config for the critic + model: + + # Path to pretrained model weights + path: ~/models/deepseek-llm-7b-chat + + # Whether to use shared memory for loading the model + use_shm: False + + # Tokenizer path (defaults to actor's model path) + tokenizer_path: ${actor_rollout_ref.model.path} + + # Hugging Face config override + override_config: { } + + # External model implementation (optional) + external_lib: ${actor_rollout_ref.model.external_lib} + + # Enable gradient checkpointing to save memory + enable_gradient_checkpointing: True + + # Offload activations to CPU to reduce GPU memory usage + enable_activation_offload: False + + # Use remove padding optimization (saves compute) + use_remove_padding: False + + # Whether to trust remote code from Hugging Face models + trust_remote_code: ${actor_rollout_ref.model.trust_remote_code} + + # FSDP-specific config + fsdp_config: + + ########################################################## + model_dtype: fp32 + ########################################################## + + # Whether to offload model parameters to CPU + param_offload: False + + # Whether to offload optimizer state to CPU + optimizer_offload: False + + # Only for FSDP2: offload param/grad/optimizer during train + offload_policy: False + + # Only for FSDP2: Reshard after forward pass to reduce memory footprint + reshard_after_forward: True + + # Policy for wrapping layers with FSDP + wrap_policy: + + # Minimum number of parameters to trigger wrapping + min_num_params: 0 + + # Number of GPUs in each FSDP shard group; -1 means auto + fsdp_size: -1 + + # Only for FSDP1: FSDP1 configuration, prefetch the next forward-pass all-gather + # before the current forward computation. + forward_prefetch: False + + # Set to positive value to enable LoRA (e.g., 32) + lora_rank: 0 + + # LoRA scaling factor + lora_alpha: 16 + + # LoRA target modules: "all-linear" or list of linear projection layers + target_modules: all-linear + + # PPO mini-batch size per update + ppo_mini_batch_size: ${actor_rollout_ref.actor.ppo_mini_batch_size} + + # [Deprecated] Global micro batch size + ppo_micro_batch_size: null + + # Local per-GPU micro batch size + ppo_micro_batch_size_per_gpu: null + + # Forward-only batch size (global) + forward_micro_batch_size: ${critic.ppo_micro_batch_size} + + # Forward-only batch size (per GPU) + forward_micro_batch_size_per_gpu: ${critic.ppo_micro_batch_size_per_gpu} + + # Whether to automatically adjust batch size at runtime + use_dynamic_bsz: ${actor_rollout_ref.actor.use_dynamic_bsz} + + # Max tokens per GPU in one PPO batch (doubled for critic) + ppo_max_token_len_per_gpu: 32768 + + # Max token length per GPU in forward pass + forward_max_token_len_per_gpu: ${critic.ppo_max_token_len_per_gpu} + + # Sequence parallelism size for Ulysses-style model parallelism + ulysses_sequence_parallel_size: 1 + + # Number of PPO epochs per batch + ppo_epochs: ${actor_rollout_ref.actor.ppo_epochs} + + # Shuffle training data across PPO epochs + shuffle: ${actor_rollout_ref.actor.shuffle} + + # Gradient clipping for critic updates + grad_clip: 1.0 + + # PPO value function clipping range + cliprange_value: 0.5 + + # Loss aggregation mode: "token-mean", "seq-mean-token-sum", or "seq-mean-token-mean" + loss_agg_mode: ${actor_rollout_ref.actor.loss_agg_mode} + + # checkpoint configs + checkpoint: + + # What to include in saved checkpoints + # with 'hf_model' you can save whole model as hf format, now only use sharded model checkpoint to save space + save_contents: ['model', 'optimizer', 'extra'] + + # What to include when loading checkpoints + load_contents: ${critic.checkpoint.save_contents} + + # profiler configs + # the corresponding dataclass is verl.utils.debug.ProfilerConfig. + profiler: + + # True for each task has its own database, False for all tasks in one training step share one database. + discrete: False + + # Whether to profile all ranks. + all_ranks: False + + # The ranks that will be profiled. null or [0,1,...] + ranks: null + +# configs for the reward model +reward_model: + + # Whether to enable reward model. If False, we compute the reward only with the user-defined reward functions. + # In GSM8K and Math examples, we disable reward model. + # For RLHF alignment example using full_hh_rlhf, we utilize reward model to assess the responses. + # If False, the following parameters are not effective + enable: False + + # FSDP strategy: "fsdp" or "fsdp2" + strategy: ${actor_rollout_ref.actor.strategy} + + # model config for reward scoring + model: + + # Input tokenizer. If the reward model’s chat template is inconsistent with the policy, + # we need to first decode to plaintext, then apply the rm’s chat_template. + # Then score with RM. If chat_templates are consistent, it can be set to null. + input_tokenizer: ${actor_rollout_ref.model.path} + + # RM’s HDFS path or local path. Note that RM only supports AutoModelForSequenceClassification. + # Other model types need to define their own RewardModelWorker and pass it from the code. + path: ~/models/FsfairX-LLaMA3-RM-v0.1 + + # Whether to use shared memory for loading the model + use_shm: False + + # External model implementation (optional) + external_lib: ${actor_rollout_ref.model.external_lib} + + # Use remove padding optimization (saves compute) + use_remove_padding: False + + # Whether to use fused reward kernels for speedup + use_fused_kernels: ${actor_rollout_ref.model.use_fused_kernels} + + # Whether to enable loading a remote code model, default to False + trust_remote_code: False + + # FSDP-specific config + fsdp_config: + + # Policy for wrapping layers with FSDP + wrap_policy: + + # Minimum number of parameters to trigger wrapping + min_num_params: 0 + + # Whether to offload model parameters to CPU + param_offload: False + + # Only for FSDP2: Reshard after forward pass to reduce memory footprint + reshard_after_forward: True + + # Number of GPUs in each FSDP shard group; -1 means auto + fsdp_size: -1 + + # Only for FSDP1: FSDP1 configuration, prefetch the next forward-pass all-gather + # before the current forward computation. + forward_prefetch: False + + # [Deprecated] Global micro batch size + micro_batch_size: null + + # Local per-GPU micro batch size + micro_batch_size_per_gpu: null + + # Maximum sequence length to process for scoring + max_length: null + + # Sequence parallelism size for Ulysses-style model parallelism + ulysses_sequence_parallel_size: 1 + + # Whether to dynamically adjust batch size at runtime + use_dynamic_bsz: ${critic.use_dynamic_bsz} + + # Maximum number of tokens per GPU in one forward pass + forward_max_token_len_per_gpu: ${critic.forward_max_token_len_per_gpu} + + # Reward Manager. This defines the mechanism of computing rule-based reward and handling different reward sources. + # Default is naive. If all verification functions are multiprocessing-safe, + # the reward manager can be set to prime for parallel verification. + reward_manager: naive + + # Whether to launch custom reward function asynchronously during log_prob + launch_reward_fn_async: False + + # Cloud/local sandbox fusion configuration for custom reward logic + sandbox_fusion: + + # Cloud/local function URL for sandbox execution + url: null + + # Max concurrent requests allowed to sandbox + max_concurrent: 64 + + # Max memory limit for each sandbox process in MB + memory_limit_mb: 1024 + + # profiler configs + profiler: + + # True for each task has its own database, False for all tasks in one training step share one database. + discrete: False + + # Whether to profile all ranks. + all_ranks: False + + # The ranks that will be profiled. null or [0,1,...] + ranks: null + +# custom reward function definition +custom_reward_function: + + # The path to the file containing your customized reward function. + # If not specified, pre-implemented reward functions will be used. + path: null + + # The name of the reward function within the specified file. Default is 'compute_score'. + name: compute_score + +# config for the algorithm +algorithm: + + # Discount factor for future rewards + gamma: 1.0 + + # Trade-off between bias and variance in the GAE estimator + lam: 1.0 + + # Advantage estimator type: "gae", "grpo", "reinforce_plus_plus", etc. + adv_estimator: gae + + # Whether to normalize advantages by std (specific to GRPO) + norm_adv_by_std_in_grpo: True + + # Whether to enable in-reward KL penalty + use_kl_in_reward: False + + # How to estimate KL divergence: "kl", "abs", "mse", "low_var_kl", or "full" + kl_penalty: kl + + # KL control configuration + kl_ctrl: + + # KL control type: "fixed" or "adaptive" + type: fixed + + # Initial coefficient for KL penalty + kl_coef: 0.001 + + # Horizon value for adaptive controller (if enabled) + horizon: 10000 + + # Target KL divergence (used for adaptive controller) + target_kl: 0.1 + + # Whether to enable preference feedback PPO + use_pf_ppo: False + + # Preference feedback PPO settings + pf_ppo: + + # Method for reweighting samples: "pow", "max_min", or "max_random" + reweight_method: pow + + # Power used for weight scaling in "pow" method + weight_pow: 2.0 + +# config for the trainer +trainer: + + # Whether to balance batch sizes across distributed workers + balance_batch: True + + # Number of epochs in training + total_epochs: 30 + + # Total training steps (can be set explicitly or derived from epochs) + total_training_steps: null + + # The steps that will be profiled. null means no profiling. null or [1,2,5,...] + profile_steps: null + + # controller Nvidia Nsight Systems Options. Must set when profile_steps is not None. + ## reference https://docs.nvidia.com/nsight-systems/UserGuide/index.html + ## reference https://docs.ray.io/en/latest/ray-observability/user-guides/profiling.html + controller_nsight_options: + + # Select the API(s) to be traced. + trace: "cuda,nvtx,cublas,ucx" + + # Track the GPU memory usage by CUDA kernels. Must be string type "true" or "false". + cuda-memory-usage: "true" + + # CUDA graphs will be traced as a whole + cuda-graph-trace: "graph" + + # worker Nvidia Nsight Systems Options. Must set when profile_steps is not None. + worker_nsight_options: + + # Select the API(s) to be traced. + trace: "cuda,nvtx,cublas,ucx" + + # Track the GPU memory usage by CUDA kernels. Must be string type "true" or "false". + cuda-memory-usage: "true" + + # CUDA graphs will be traced as a whole + cuda-graph-trace: "graph" + + # Profiling only in a range of torch.cuda.profiler.start and stop. Do not change this config. + capture-range: "cudaProfilerApi" + + # Specify the desired behavior when a capture range ends. + # In verl we need the orch.cuda.profiler.start/stop pair to repeats n times. + # valid values are "repeat-shutdown:n" or null. + # For normal whole step profiling, n = len(profile_steps); + # but for discrete profiling, n = len(profile_steps) * Number(subtasks). + # Or you can just leave it null and the program will use n = len(profile_steps) * 6; + capture-range-end: null + + # Send signal to the target application's process group. We let the program to exit by itself. + kill: none + + # Project name for experiment tracking (e.g., wandb) + project_name: verl_examples + + # Experiment name for run identification in tracking tools + experiment_name: gsm8k + + # Logging backends to use: "console", "wandb", etc. + logger: [ 'console', 'wandb' ] + + # Number of generations to log during validation + log_val_generations: 0 + + # Directory for logging rollout data; no dump if null + rollout_data_dir: null + + # Directory for logging validation data; no dump if null + validation_data_dir: null + + # Number of nodes used in the training + nnodes: 1 + + # Number of GPUs per node + n_gpus_per_node: 8 + + # Save frequency (by iteration) for model checkpoints + save_freq: -1 + + # ESI redundant time (in seconds) for model checkpointsAdd commentMore actions + esi_redundant_time: 0 + + # Resume mode: "auto", "disable", or "resume_path" + # "auto": resume from last checkpoint if available + # "disable": start from scratch + # "resume_path": resume from a user-defined path + resume_mode: auto + + # Path to resume training from (only used when resume_mode is "resume_path") + resume_from_path: null + + # Whether to run validation before training begins + val_before_train: True + + # Whether to run validation only + val_only: False + + # Validation frequency (in training iterations) + test_freq: -1 + + # Number of iterations to warm up the critic before updating policy + critic_warmup: 0 + + # Default path to distributed filesystem for saving checkpoints + default_hdfs_dir: null + + # Whether to delete local checkpoints after loading + del_local_ckpt_after_load: False + + # Default local directory for saving checkpoints + default_local_dir: checkpoints/${trainer.project_name}/${trainer.experiment_name} + + # Maximum number of actor checkpoints to keep + max_actor_ckpt_to_keep: null + + # Maximum number of critic checkpoints to keep + max_critic_ckpt_to_keep: null + + # Timeout (in seconds) for Ray worker to wait for registration + ray_wait_register_center_timeout: 300 + + # Device to run training on (e.g., "cuda", "cpu") + device: cuda + +# configs related to ray initialization +ray_init: + + # Number of CPUs for Ray. Use a fixed number instead of null when using SLURM. + num_cpus: null + + # Path to save Ray timeline JSON for performance profiling + timeline_json_file: null diff --git a/openseek/competition/pz/UCI001/ray_trainer.py b/openseek/competition/pz/UCI001/ray_trainer.py new file mode 100644 index 0000000..6b53aaa --- /dev/null +++ b/openseek/competition/pz/UCI001/ray_trainer.py @@ -0,0 +1,1307 @@ +# Copyright 2024 Bytedance Ltd. and/or its affiliates +# Copyright 2023-2024 SGLang Team +# Copyright 2025 ModelBest Inc. and/or its affiliates +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +FSDP PPO Trainer with Ray-based single controller. +This trainer supports model-agonistic model initialization with huggingface +""" + +import json +import os +import uuid +from collections import defaultdict +from copy import deepcopy +from dataclasses import dataclass, field +from enum import Enum +from pprint import pprint +from typing import Optional, Type + +import numpy as np +import ray +import torch +from omegaconf import OmegaConf, open_dict +from torch.utils.data import Dataset, Sampler +from torchdata.stateful_dataloader import StatefulDataLoader +from tqdm import tqdm + +from verl import DataProto +from verl.protocol import pad_dataproto_to_divisor, unpad_dataproto +from verl.single_controller.base import Worker +from verl.single_controller.ray import RayClassWithInitArgs, RayResourcePool, RayWorkerGroup +from verl.single_controller.ray.base import create_colocated_worker_cls +from verl.trainer.ppo import core_algos +from verl.trainer.ppo.core_algos import AdvantageEstimator, agg_loss +from verl.trainer.ppo.metric_utils import ( + compute_data_metrics, + compute_throughout_metrics, + compute_timing_metrics, + process_validation_metrics, +) +from verl.trainer.ppo.reward import compute_reward, compute_reward_async +from verl.utils.checkpoint.checkpoint_manager import find_latest_ckpt_path, should_save_ckpt_esi +from verl.utils.debug import marked_timer +from verl.utils.metric import ( + reduce_metrics, +) +from verl.utils.seqlen_balancing import get_seqlen_balanced_partitions, log_seqlen_unbalance +from verl.utils.torch_functional import masked_mean +from verl.utils.tracking import ValidationGenerationsLogger + +WorkerType = Type[Worker] + + +class Role(Enum): + """ + To create more roles dynamically, you can subclass Role and add new members + """ + + Actor = 0 + Rollout = 1 + ActorRollout = 2 + Critic = 3 + RefPolicy = 4 + RewardModel = 5 + ActorRolloutRef = 6 + + +@dataclass +class ResourcePoolManager: + """ + Define a resource pool specification. Resource pool will be initialized first. + """ + + resource_pool_spec: dict[str, list[int]] + mapping: dict[Role, str] + resource_pool_dict: dict[str, RayResourcePool] = field(default_factory=dict) + + def create_resource_pool(self): + for resource_pool_name, process_on_nodes in self.resource_pool_spec.items(): + # max_colocate_count means the number of WorkerGroups (i.e. processes) in each RayResourcePool + # For FSDP backend, we recommend using max_colocate_count=1 that merge all WorkerGroups into one. + # For Megatron backend, we recommend using max_colocate_count>1 + # that can utilize different WorkerGroup for differnt models + resource_pool = RayResourcePool(process_on_nodes=process_on_nodes, use_gpu=True, max_colocate_count=1, name_prefix=resource_pool_name) + self.resource_pool_dict[resource_pool_name] = resource_pool + + self._check_resource_available() + + def get_resource_pool(self, role: Role) -> RayResourcePool: + """Get the resource pool of the worker_cls""" + return self.resource_pool_dict[self.mapping[role]] + + def get_n_gpus(self) -> int: + """Get the number of gpus in this cluster.""" + return sum([n_gpus for process_on_nodes in self.resource_pool_spec.values() for n_gpus in process_on_nodes]) + + def _check_resource_available(self): + """Check if the resource pool can be satisfied in this ray cluster.""" + node_available_resources = ray.state.available_resources_per_node() + node_available_gpus = {node: node_info.get("GPU", 0) if "GPU" in node_info else node_info.get("NPU", 0) for node, node_info in node_available_resources.items()} + + # check total required gpus can be satisfied + total_available_gpus = sum(node_available_gpus.values()) + total_required_gpus = sum([n_gpus for process_on_nodes in self.resource_pool_spec.values() for n_gpus in process_on_nodes]) + if total_available_gpus < total_required_gpus: + raise ValueError(f"Total available GPUs {total_available_gpus} is less than total desired GPUs {total_required_gpus}") + + # check each resource pool can be satisfied, O(#resource_pools * #nodes) + for resource_pool_name, process_on_nodes in self.resource_pool_spec.items(): + num_gpus, num_nodes = process_on_nodes[0], len(process_on_nodes) + for node, available_gpus in node_available_gpus.items(): + if available_gpus >= num_gpus: + node_available_gpus[node] -= num_gpus + num_nodes -= 1 + if num_nodes == 0: + break + if num_nodes > 0: + raise ValueError(f"Resource pool {resource_pool_name}: {num_gpus}*{num_nodes}" + "cannot be satisfied in this ray cluster") + + +def apply_kl_penalty(data: DataProto, kl_ctrl: core_algos.AdaptiveKLController, kl_penalty="kl", multi_turn=False): + """Apply KL penalty to the token-level rewards. + + This function computes the KL divergence between the reference policy and current policy, + then applies a penalty to the token-level rewards based on this divergence. + + Args: + data (DataProto): The data containing batched model outputs and inputs. + kl_ctrl (core_algos.AdaptiveKLController): Controller for adaptive KL penalty. + kl_penalty (str, optional): Type of KL penalty to apply. Defaults to "kl". + multi_turn (bool, optional): Whether the data is from a multi-turn conversation. Defaults to False. + + Returns: + tuple: A tuple containing: + - The updated data with token-level rewards adjusted by KL penalty + - A dictionary of metrics related to the KL penalty + """ + responses = data.batch["responses"] + response_length = responses.size(1) + token_level_scores = data.batch["token_level_scores"] + batch_size = data.batch.batch_size[0] + + if multi_turn: + loss_mask = data.batch["loss_mask"] + response_mask = loss_mask[:, -response_length:] + else: + attention_mask = data.batch["attention_mask"] + response_mask = attention_mask[:, -response_length:] + + # compute kl between ref_policy and current policy + # When apply_kl_penalty, algorithm.use_kl_in_reward=True, so the reference model has been enabled. + kld = core_algos.kl_penalty(data.batch["old_log_probs"], data.batch["ref_log_prob"], kl_penalty=kl_penalty) # (batch_size, response_length) + kld = kld * response_mask + beta = kl_ctrl.value + + token_level_rewards = token_level_scores - beta * kld + + current_kl = masked_mean(kld, mask=response_mask, axis=-1) # average over sequence + current_kl = torch.mean(current_kl, dim=0).item() + + # according to https://github.com/huggingface/trl/blob/951ca1841f29114b969b57b26c7d3e80a39f75a0/trl/trainer/ppo_trainer.py#L837 + kl_ctrl.update(current_kl=current_kl, n_steps=batch_size) + data.batch["token_level_rewards"] = token_level_rewards + + metrics = {"actor/reward_kl_penalty": current_kl, "actor/reward_kl_penalty_coeff": beta} + + return data, metrics + + +def compute_response_mask(data: DataProto): + """Compute the attention mask for the response part of the sequence. + + This function extracts the portion of the attention mask that corresponds to the model's response, + which is used for masking computations that should only apply to response tokens. + + Args: + data (DataProto): The data containing batched model outputs and inputs. + + Returns: + torch.Tensor: The attention mask for the response tokens. + """ + responses = data.batch["responses"] + response_length = responses.size(1) + attention_mask = data.batch["attention_mask"] + return attention_mask[:, -response_length:] + + +def compute_advantage(data: DataProto, adv_estimator, gamma=1.0, lam=1.0, num_repeat=1, multi_turn=False, norm_adv_by_std_in_grpo=True, config=None): + """Compute advantage estimates for policy optimization. + + This function computes advantage estimates using various estimators like GAE, GRPO, REINFORCE++, etc. + The advantage estimates are used to guide policy optimization in RL algorithms. + + Args: + data (DataProto): The data containing batched model outputs and inputs. + adv_estimator: The advantage estimator to use (e.g., GAE, GRPO, REINFORCE++). + gamma (float, optional): Discount factor for future rewards. Defaults to 1.0. + lam (float, optional): Lambda parameter for GAE. Defaults to 1.0. + num_repeat (int, optional): Number of times to repeat the computation. Defaults to 1. + multi_turn (bool, optional): Whether the data is from a multi-turn conversation. Defaults to False. + norm_adv_by_std_in_grpo (bool, optional): Whether to normalize advantages by standard deviation in GRPO. Defaults to True. + config (dict, optional): Configuration dictionary for algorithm settings. Defaults to None. + + Returns: + DataProto: The updated data with computed advantages and returns. + """ + # Back-compatible with trainers that do not compute response mask in fit + if "response_mask" not in data.batch.keys(): + data.batch["response_mask"] = compute_response_mask(data) + # prepare response group + if adv_estimator == AdvantageEstimator.GAE: + # Compute advantages and returns using Generalized Advantage Estimation (GAE) + advantages, returns = core_algos.compute_gae_advantage_return( + token_level_rewards=data.batch["token_level_rewards"], + values=data.batch["values"], + response_mask=data.batch["response_mask"], + gamma=gamma, + lam=lam, + ) + data.batch["advantages"] = advantages + data.batch["returns"] = returns + if config.get("use_pf_ppo", False): + data = core_algos.compute_pf_ppo_reweight_data( + data, + config.get("pf_ppo_reweight_method", "pow"), + config.get("pf_ppo_weight_pow", 2.0), + ) + elif adv_estimator == AdvantageEstimator.GRPO: + # Initialize the mask for GRPO calculation + grpo_calculation_mask = data.batch["response_mask"] + if multi_turn: + # If multi-turn, replace the mask with the relevant part of loss_mask + # Get length from the initial response mask + response_length = grpo_calculation_mask.size(1) + # This mask is the one intended for GRPO + grpo_calculation_mask = data.batch["loss_mask"][:, -response_length:] + # Call compute_grpo_outcome_advantage with parameters matching its definition + advantages, returns = core_algos.compute_grpo_outcome_advantage( + token_level_rewards=data.batch["token_level_rewards"], + response_mask=grpo_calculation_mask, + index=data.non_tensor_batch["uid"], + norm_adv_by_std_in_grpo=norm_adv_by_std_in_grpo, + ) + data.batch["advantages"] = advantages + data.batch["returns"] = returns + else: + # handle all other adv estimator type other than GAE and GRPO + adv_estimator_fn = core_algos.get_adv_estimator_fn(adv_estimator) + adv_kwargs = { + "token_level_rewards": data.batch["token_level_rewards"], + "response_mask": data.batch["response_mask"], + "config": config, + } + if "uid" in data.non_tensor_batch: # optional + adv_kwargs["index"] = data.non_tensor_batch["uid"] + if "reward_baselines" in data.batch: # optional + adv_kwargs["reward_baselines"] = data.batch["reward_baselines"] + + # calculate advantage estimator + advantages, returns = adv_estimator_fn(**adv_kwargs) + data.batch["advantages"] = advantages + data.batch["returns"] = returns + return data + + +class RayPPOTrainer: + # TODO: support each role have individual ray_worker_group_cls, + # i.e., support different backend of different role + def __init__( + self, + config, + tokenizer, + role_worker_mapping: dict[Role, WorkerType], + resource_pool_manager: ResourcePoolManager, + ray_worker_group_cls: RayWorkerGroup = RayWorkerGroup, + processor=None, + reward_fn=None, + val_reward_fn=None, + train_dataset: Optional[Dataset] = None, + val_dataset: Optional[Dataset] = None, + collate_fn=None, + train_sampler: Optional[Sampler] = None, + device_name="cuda", + ): + """ + Initialize distributed PPO trainer with Ray backend. + Note that this trainer runs on the driver process on a single CPU/GPU node. + + Args: + config: Configuration object containing training parameters. + tokenizer: Tokenizer used for encoding and decoding text. + role_worker_mapping (dict[Role, WorkerType]): Mapping from roles to worker classes. + resource_pool_manager (ResourcePoolManager): Manager for Ray resource pools. + ray_worker_group_cls (RayWorkerGroup, optional): Class for Ray worker groups. Defaults to RayWorkerGroup. + processor: Optional data processor, used for multimodal data + reward_fn: Function for computing rewards during training. + val_reward_fn: Function for computing rewards during validation. + train_dataset (Optional[Dataset], optional): Training dataset. Defaults to None. + val_dataset (Optional[Dataset], optional): Validation dataset. Defaults to None. + collate_fn: Function to collate data samples into batches. + train_sampler (Optional[Sampler], optional): Sampler for the training dataset. Defaults to None. + device_name (str, optional): Device name for training (e.g., "cuda", "cpu"). Defaults to "cuda". + """ + + # Store the tokenizer for text processing + self.tokenizer = tokenizer + self.processor = processor + self.config = config + self.reward_fn = reward_fn + self.val_reward_fn = val_reward_fn + + self.hybrid_engine = config.actor_rollout_ref.hybrid_engine + assert self.hybrid_engine, "Currently, only support hybrid engine" + + if self.hybrid_engine: + assert Role.ActorRollout in role_worker_mapping, f"{role_worker_mapping.keys()=}" + + self.role_worker_mapping = role_worker_mapping + self.resource_pool_manager = resource_pool_manager + self.use_reference_policy = Role.RefPolicy in role_worker_mapping + self.use_rm = Role.RewardModel in role_worker_mapping + self.ray_worker_group_cls = ray_worker_group_cls + self.device_name = device_name + self.validation_generations_logger = ValidationGenerationsLogger() + + # if ref_in_actor is True, the reference policy will be actor without lora applied + self.ref_in_actor = config.actor_rollout_ref.model.get("lora_rank", 0) > 0 + + # define in-reward KL control + # kl loss control currently not suppoorted + if config.algorithm.use_kl_in_reward: + self.kl_ctrl_in_reward = core_algos.get_kl_controller(config.algorithm.kl_ctrl) + + if self.config.algorithm.adv_estimator == AdvantageEstimator.GAE: + self.use_critic = True + elif self.config.algorithm.adv_estimator in [ + AdvantageEstimator.GRPO, + AdvantageEstimator.GRPO_PASSK, + AdvantageEstimator.REINFORCE_PLUS_PLUS, + AdvantageEstimator.REMAX, + AdvantageEstimator.RLOO, + AdvantageEstimator.OPO, + AdvantageEstimator.REINFORCE_PLUS_PLUS_BASELINE, + ]: + self.use_critic = False + else: + raise NotImplementedError + + self._validate_config() + self._create_dataloader(train_dataset, val_dataset, collate_fn, train_sampler) + + def _validate_config(self): + config = self.config + # number of GPUs total + n_gpus = config.trainer.n_gpus_per_node * config.trainer.nnodes + if config.actor_rollout_ref.actor.strategy == "megatron": + model_parallel_size = config.actor_rollout_ref.actor.megatron.tensor_model_parallel_size * config.actor_rollout_ref.actor.megatron.pipeline_model_parallel_size + assert n_gpus % (model_parallel_size * config.actor_rollout_ref.actor.megatron.context_parallel_size) == 0, f"n_gpus ({n_gpus}) must be divisible by model_parallel_size ({model_parallel_size}) times context_parallel_size ({config.actor_rollout_ref.actor.megatron.context_parallel_size})" + megatron_dp = n_gpus // (model_parallel_size * config.actor_rollout_ref.actor.megatron.context_parallel_size) + minimal_bsz = megatron_dp * config.actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu + else: + minimal_bsz = n_gpus + + # 1. Check total batch size for data correctness + real_train_batch_size = config.data.train_batch_size * config.actor_rollout_ref.rollout.n + assert real_train_batch_size % minimal_bsz == 0, f"real_train_batch_size ({real_train_batch_size}) must be divisible by minimal possible batch size ({minimal_bsz})" + + # A helper function to check "micro_batch_size" vs "micro_batch_size_per_gpu" + # We throw an error if the user sets both. The new convention is "..._micro_batch_size_per_gpu". + def check_mutually_exclusive(mbs, mbs_per_gpu, name: str): + settings = { + "actor_rollout_ref.actor": "micro_batch_size", + "critic": "micro_batch_size", + "reward_model": "micro_batch_size", + "actor_rollout_ref.ref": "log_prob_micro_batch_size", + "actor_rollout_ref.rollout": "log_prob_micro_batch_size", + } + + if name in settings: + param = settings[name] + param_per_gpu = f"{param}_per_gpu" + + if mbs is None and mbs_per_gpu is None: + raise ValueError(f"[{name}] Please set at least one of '{name}.{param}' or '{name}.{param_per_gpu}'.") + + if mbs is not None and mbs_per_gpu is not None: + raise ValueError(f"[{name}] You have set both '{name}.{param}' AND '{name}.{param_per_gpu}'. Please remove '{name}.{param}' because only '*_{param_per_gpu}'" + "is supported (the former is deprecated).") + + if not config.actor_rollout_ref.actor.use_dynamic_bsz: + # actor: ppo_micro_batch_size vs. ppo_micro_batch_size_per_gpu + check_mutually_exclusive( + config.actor_rollout_ref.actor.ppo_micro_batch_size, + config.actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu, + "actor_rollout_ref.actor", + ) + + if self.use_reference_policy: + # reference: log_prob_micro_batch_size vs. log_prob_micro_batch_size_per_gpu + check_mutually_exclusive( + config.actor_rollout_ref.ref.log_prob_micro_batch_size, + config.actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu, + "actor_rollout_ref.ref", + ) + + # The rollout section also has log_prob_micro_batch_size vs. log_prob_micro_batch_size_per_gpu + check_mutually_exclusive( + config.actor_rollout_ref.rollout.log_prob_micro_batch_size, + config.actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu, + "actor_rollout_ref.rollout", + ) + + if self.use_critic and not config.critic.use_dynamic_bsz: + # Check for critic micro-batch size conflicts + check_mutually_exclusive(config.critic.ppo_micro_batch_size, config.critic.ppo_micro_batch_size_per_gpu, "critic") + + # Check for reward model micro-batch size conflicts + if config.reward_model.enable and not config.reward_model.use_dynamic_bsz: + check_mutually_exclusive(config.reward_model.micro_batch_size, config.reward_model.micro_batch_size_per_gpu, "reward_model") + + # Actor + # check if train_batch_size is larger than ppo_mini_batch_size + # if NOT dynamic_bsz, we must ensure: + # ppo_mini_batch_size is divisible by ppo_micro_batch_size + # ppo_micro_batch_size * sequence_parallel_size >= n_gpus + if not config.actor_rollout_ref.actor.use_dynamic_bsz: + assert config.data.train_batch_size >= config.actor_rollout_ref.actor.ppo_mini_batch_size + sp_size = config.actor_rollout_ref.actor.get("ulysses_sequence_parallel_size", 1) + if config.actor_rollout_ref.actor.ppo_micro_batch_size is not None: + assert config.actor_rollout_ref.actor.ppo_mini_batch_size % config.actor_rollout_ref.actor.ppo_micro_batch_size == 0 + assert config.actor_rollout_ref.actor.ppo_micro_batch_size * sp_size >= n_gpus + + assert config.actor_rollout_ref.actor.loss_agg_mode in [ + "token-mean", + "seq-mean-token-sum", + "seq-mean-token-mean", + "seq-mean-token-sum-norm", + ], f"Invalid loss_agg_mode: {config.actor_rollout_ref.actor.loss_agg_mode}" + + if config.algorithm.use_kl_in_reward and config.actor_rollout_ref.actor.use_kl_loss: + print("NOTICE: You have both enabled in-reward kl and kl loss.") + + # critic + if self.use_critic and not config.critic.use_dynamic_bsz: + assert config.data.train_batch_size >= config.critic.ppo_mini_batch_size + sp_size = config.critic.get("ulysses_sequence_parallel_size", 1) + if config.critic.ppo_micro_batch_size is not None: + assert config.critic.ppo_mini_batch_size % config.critic.ppo_micro_batch_size == 0 + assert config.critic.ppo_micro_batch_size * sp_size >= n_gpus + + # Check if use_remove_padding is enabled when using sequence parallelism for fsdp + if config.actor_rollout_ref.actor.strategy == "fsdp" and (config.actor_rollout_ref.actor.get("ulysses_sequence_parallel_size", 1) > 1 or config.actor_rollout_ref.ref.get("ulysses_sequence_parallel_size", 1) > 1): + assert config.actor_rollout_ref.model.use_remove_padding, "When using sequence parallelism for actor/ref policy, you must enable `use_remove_padding`." + + if self.use_critic and config.critic.strategy == "fsdp": + if config.critic.get("ulysses_sequence_parallel_size", 1) > 1: + assert config.critic.model.use_remove_padding, "When using sequence parallelism for critic, you must enable `use_remove_padding`." + + if config.data.get("val_batch_size", None) is not None: + print("WARNING: val_batch_size is deprecated." + " Validation datasets are sent to inference engines as a whole batch," + " which will schedule the memory themselves.") + + # check eval config + if config.actor_rollout_ref.rollout.val_kwargs.do_sample: + assert config.actor_rollout_ref.rollout.temperature > 0, "validation gen temperature should be greater than 0 when enabling do_sample" + + # check multi_turn with tool config + if config.actor_rollout_ref.rollout.multi_turn.enable: + assert config.actor_rollout_ref.rollout.multi_turn.tool_config_path is not None or config.actor_rollout_ref.rollout.multi_turn.interaction_config_path is not None, "tool_config_path or interaction_config_path must be set when enabling multi_turn with tool, due to no role-playing support" + assert config.algorithm.adv_estimator in [AdvantageEstimator.GRPO], "only GRPO is tested for multi-turn with tool" + + print("[validate_config] All configuration checks passed successfully!") + + def _create_dataloader(self, train_dataset, val_dataset, collate_fn, train_sampler): + """ + Creates the train and validation dataloaders. + """ + # TODO: we have to make sure the batch size is divisible by the dp size + from verl.trainer.main_ppo import create_rl_dataset, create_rl_sampler + + if train_dataset is None: + train_dataset = create_rl_dataset(self.config.data.train_files, self.config.data, self.tokenizer, self.processor) + if val_dataset is None: + val_dataset = create_rl_dataset(self.config.data.val_files, self.config.data, self.tokenizer, self.processor) + self.train_dataset, self.val_dataset = train_dataset, val_dataset + + if train_sampler is None: + train_sampler = create_rl_sampler(self.config.data, self.train_dataset) + if collate_fn is None: + from verl.utils.dataset.rl_dataset import collate_fn as default_collate_fn + + collate_fn = default_collate_fn + + self.train_dataloader = StatefulDataLoader( + dataset=self.train_dataset, + batch_size=self.config.data.get("gen_batch_size", self.config.data.train_batch_size), + num_workers=self.config.data.get("dataloader_num_workers", 8), + drop_last=True, + collate_fn=collate_fn, + sampler=train_sampler, + ) + + val_batch_size = self.config.data.val_batch_size # Prefer config value if set + if val_batch_size is None: + val_batch_size = len(self.val_dataset) + + self.val_dataloader = StatefulDataLoader( + dataset=self.val_dataset, + batch_size=val_batch_size, + num_workers=self.config.data.get("dataloader_num_workers", 8), + shuffle=self.config.data.get("validation_shuffle", True), + drop_last=False, + collate_fn=collate_fn, + ) + + assert len(self.train_dataloader) >= 1, "Train dataloader is empty!" + assert len(self.val_dataloader) >= 1, "Validation dataloader is empty!" + + print(f"Size of train dataloader: {len(self.train_dataloader)}, Size of val dataloader: {len(self.val_dataloader)}") + + total_training_steps = len(self.train_dataloader) * self.config.trainer.total_epochs + + if self.config.trainer.total_training_steps is not None: + total_training_steps = self.config.trainer.total_training_steps + + self.total_training_steps = total_training_steps + print(f"Total training steps: {self.total_training_steps}") + + try: + OmegaConf.set_struct(self.config, True) + with open_dict(self.config): + if OmegaConf.select(self.config, "actor_rollout_ref.actor.optim"): + self.config.actor_rollout_ref.actor.optim.total_training_steps = total_training_steps + if OmegaConf.select(self.config, "critic.optim"): + self.config.critic.optim.total_training_steps = total_training_steps + except Exception as e: + print(f"Warning: Could not set total_training_steps in config. Structure missing? Error: {e}") + + def _dump_generations(self, inputs, outputs, scores, reward_extra_infos_dict, dump_path): + """Dump rollout/validation samples as JSONL.""" + os.makedirs(dump_path, exist_ok=True) + filename = os.path.join(dump_path, f"{self.global_steps}.jsonl") + + n = len(inputs) + base_data = { + "input": inputs, + "output": outputs, + "score": scores, + "step": [self.global_steps] * n, + } + + for k, v in reward_extra_infos_dict.items(): + if len(v) == n: + base_data[k] = v + + lines = [] + for i in range(n): + entry = {k: v[i] for k, v in base_data.items()} + lines.append(json.dumps(entry, ensure_ascii=False)) + + with open(filename, "w") as f: + f.write("\n".join(lines) + "\n") + + print(f"Dumped generations to {filename}") + + def _maybe_log_val_generations(self, inputs, outputs, scores): + """Log a table of validation samples to the configured logger (wandb or swanlab)""" + + generations_to_log = self.config.trainer.log_val_generations + + if generations_to_log == 0: + return + + import numpy as np + + # Create tuples of (input, output, score) and sort by input text + samples = list(zip(inputs, outputs, scores)) + samples.sort(key=lambda x: x[0]) # Sort by input text + + # Use fixed random seed for deterministic shuffling + rng = np.random.RandomState(42) + rng.shuffle(samples) + + # Take first N samples after shuffling + samples = samples[:generations_to_log] + + # Log to each configured logger + self.validation_generations_logger.log(self.config.trainer.logger, samples, self.global_steps) + + def _validate(self): + data_source_lst = [] + reward_extra_infos_dict: dict[str, list] = defaultdict(list) + + # Lists to collect samples for the table + sample_inputs = [] + sample_outputs = [] + sample_scores = [] + + for test_data in self.val_dataloader: + test_batch = DataProto.from_single_dict(test_data) + + # repeat test batch + test_batch = test_batch.repeat(repeat_times=self.config.actor_rollout_ref.rollout.val_kwargs.n, interleave=True) + + # we only do validation on rule-based rm + if self.config.reward_model.enable and test_batch[0].non_tensor_batch["reward_model"]["style"] == "model": + return {} + + # Store original inputs + input_ids = test_batch.batch["input_ids"] + # TODO: Can we keep special tokens except for padding tokens? + input_texts = [self.tokenizer.decode(ids, skip_special_tokens=True) for ids in input_ids] + sample_inputs.extend(input_texts) + + batch_keys_to_pop = ["input_ids", "attention_mask", "position_ids"] + non_tensor_batch_keys_to_pop = ["raw_prompt_ids"] + if "multi_modal_data" in test_batch.non_tensor_batch: + non_tensor_batch_keys_to_pop.append("multi_modal_data") + if "raw_prompt" in test_batch.non_tensor_batch: + non_tensor_batch_keys_to_pop.append("raw_prompt") + if "tools_kwargs" in test_batch.non_tensor_batch: + non_tensor_batch_keys_to_pop.append("tools_kwargs") + if "interaction_kwargs" in test_batch.non_tensor_batch: + non_tensor_batch_keys_to_pop.append("interaction_kwargs") + test_gen_batch = test_batch.pop( + batch_keys=batch_keys_to_pop, + non_tensor_batch_keys=non_tensor_batch_keys_to_pop, + ) + + test_gen_batch.meta_info = { + "eos_token_id": self.tokenizer.eos_token_id, + "pad_token_id": self.tokenizer.pad_token_id, + "recompute_log_prob": False, + "do_sample": self.config.actor_rollout_ref.rollout.val_kwargs.do_sample, + "validate": True, + } + print(f"test_gen_batch meta info: {test_gen_batch.meta_info}", flush=True) + + # pad to be divisible by dp_size + size_divisor = self.actor_rollout_wg.world_size if not self.async_rollout_mode else self.config.actor_rollout_ref.rollout.agent.num_workers + test_gen_batch_padded, pad_size = pad_dataproto_to_divisor(test_gen_batch, size_divisor) + if not self.async_rollout_mode: + test_output_gen_batch_padded = self.actor_rollout_wg.generate_sequences(test_gen_batch_padded) + else: + test_output_gen_batch_padded = self.async_rollout_manager.generate_sequences(test_gen_batch_padded) + + # unpad + test_output_gen_batch = unpad_dataproto(test_output_gen_batch_padded, pad_size=pad_size) + print("validation generation end", flush=True) + + # Store generated outputs + output_ids = test_output_gen_batch.batch["responses"] + output_texts = [self.tokenizer.decode(ids, skip_special_tokens=True) for ids in output_ids] + sample_outputs.extend(output_texts) + + test_batch = test_batch.union(test_output_gen_batch) + + # evaluate using reward_function + print("[validate] Calling val_reward_fn ...", flush=True) + result = self.val_reward_fn(test_batch, return_dict=True) + reward_tensor = result["reward_tensor"] + scores = reward_tensor.sum(-1).cpu().tolist() + sample_scores.extend(scores) + + # Print a brief preview of inputs/outputs and scores for sanity check + try: + n_preview = min(3, len(output_texts)) + io_preview = [ + { + "input": sample_inputs[i][:200], + "output": output_texts[i][:200], + "score": float(scores[i]) if i < len(scores) else None, + } + for i in range(n_preview) + ] + print(f"[validate] Preview (first {n_preview}): {io_preview}", flush=True) + print(f"[validate] Batch reward summary -> mean={float(np.mean(scores)) if len(scores)>0 else 'nan':}, min={float(np.min(scores)) if len(scores)>0 else 'nan':}, max={float(np.max(scores)) if len(scores)>0 else 'nan':}, n={len(scores)}", flush=True) + except Exception: + pass + + reward_extra_infos_dict["reward"].extend(scores) + print(f"len reward_extra_infos_dict['reward']: {len(reward_extra_infos_dict['reward'])}") + if "reward_extra_info" in result: + for key, lst in result["reward_extra_info"].items(): + reward_extra_infos_dict[key].extend(lst) + print(f"len reward_extra_infos_dict['{key}']: {len(reward_extra_infos_dict[key])}") + + data_source_lst.append(test_batch.non_tensor_batch.get("data_source", ["unknown"] * reward_tensor.shape[0])) + + self._maybe_log_val_generations(inputs=sample_inputs, outputs=sample_outputs, scores=sample_scores) + try: + if len(sample_scores) > 0: + print(f"[validate] Aggregated val scores: mean={float(np.mean(sample_scores))}, std={float(np.std(sample_scores))}, n={len(sample_scores)}", flush=True) + except Exception: + pass + + # dump generations + val_data_dir = self.config.trainer.get("validation_data_dir", None) + if val_data_dir: + self._dump_generations( + inputs=sample_inputs, + outputs=sample_outputs, + scores=sample_scores, + reward_extra_infos_dict=reward_extra_infos_dict, + dump_path=val_data_dir, + ) + + for key_info, lst in reward_extra_infos_dict.items(): + assert len(lst) == 0 or len(lst) == len(sample_scores), f"{key_info}: {len(lst)=}, {len(sample_scores)=}" + + data_sources = np.concatenate(data_source_lst, axis=0) + + data_src2var2metric2val = process_validation_metrics(data_sources, sample_inputs, reward_extra_infos_dict) + metric_dict = {} + for data_source, var2metric2val in data_src2var2metric2val.items(): + core_var = "acc" if "acc" in var2metric2val else "reward" + for var_name, metric2val in var2metric2val.items(): + n_max = max([int(name.split("@")[-1].split("/")[0]) for name in metric2val.keys()]) + for metric_name, metric_val in metric2val.items(): + if (var_name == core_var) and any(metric_name.startswith(pfx) for pfx in ["mean", "maj", "best"]) and (f"@{n_max}" in metric_name): + metric_sec = "val-core" + else: + metric_sec = "val-aux" + pfx = f"{metric_sec}/{data_source}/{var_name}/{metric_name}" + metric_dict[pfx] = metric_val + + return metric_dict + + def init_workers(self): + """Initialize distributed training workers using Ray backend. + + Creates: + 1. Ray resource pools from configuration + 2. Worker groups for each role (actor, critic, etc.) + """ + self.resource_pool_manager.create_resource_pool() + + self.resource_pool_to_cls = {pool: {} for pool in self.resource_pool_manager.resource_pool_dict.values()} + + # create actor and rollout + if self.hybrid_engine: + resource_pool = self.resource_pool_manager.get_resource_pool(Role.ActorRollout) + actor_rollout_cls = RayClassWithInitArgs( + cls=self.role_worker_mapping[Role.ActorRollout], + config=self.config.actor_rollout_ref, + role="actor_rollout", + ) + self.resource_pool_to_cls[resource_pool]["actor_rollout"] = actor_rollout_cls + else: + raise NotImplementedError + + # create critic + if self.use_critic: + resource_pool = self.resource_pool_manager.get_resource_pool(Role.Critic) + critic_cls = RayClassWithInitArgs(cls=self.role_worker_mapping[Role.Critic], config=self.config.critic) + self.resource_pool_to_cls[resource_pool]["critic"] = critic_cls + + # create reference policy if needed + if self.use_reference_policy: + resource_pool = self.resource_pool_manager.get_resource_pool(Role.RefPolicy) + ref_policy_cls = RayClassWithInitArgs(self.role_worker_mapping[Role.RefPolicy], config=self.config.actor_rollout_ref, role="ref") + self.resource_pool_to_cls[resource_pool]["ref"] = ref_policy_cls + + # create a reward model if reward_fn is None + if self.use_rm: + # we create a RM here + resource_pool = self.resource_pool_manager.get_resource_pool(Role.RewardModel) + rm_cls = RayClassWithInitArgs(self.role_worker_mapping[Role.RewardModel], config=self.config.reward_model) + self.resource_pool_to_cls[resource_pool]["rm"] = rm_cls + + # initialize WorkerGroup + # NOTE: if you want to use a different resource pool for each role, which can support different parallel size, + # you should not use `create_colocated_worker_cls`. + # Instead, directly pass different resource pool to different worker groups. + # See https://github.com/volcengine/verl/blob/master/examples/ray/tutorial.ipynb for more information. + all_wg = {} + wg_kwargs = {} # Setting up kwargs for RayWorkerGroup + if OmegaConf.select(self.config.trainer, "ray_wait_register_center_timeout") is not None: + wg_kwargs["ray_wait_register_center_timeout"] = self.config.trainer.ray_wait_register_center_timeout + if OmegaConf.select(self.config.trainer, "profile_steps") is not None: + wg_kwargs["profile_steps"] = OmegaConf.select(self.config.trainer, "profile_steps") + assert OmegaConf.select(self.config.trainer, "worker_nsight_options") is not None, "worker_nsight_options must be set when profile_steps is set" + wg_kwargs["worker_nsight_options"] = OmegaConf.to_container(OmegaConf.select(self.config.trainer, "worker_nsight_options")) + + for resource_pool, class_dict in self.resource_pool_to_cls.items(): + worker_dict_cls = create_colocated_worker_cls(class_dict=class_dict) + wg_dict = self.ray_worker_group_cls(resource_pool=resource_pool, ray_cls_with_init=worker_dict_cls, device_name=self.device_name, **wg_kwargs) + spawn_wg = wg_dict.spawn(prefix_set=class_dict.keys()) + all_wg.update(spawn_wg) + + if self.use_critic: + self.critic_wg = all_wg["critic"] + self.critic_wg.init_model() + + if self.use_reference_policy and not self.ref_in_actor: + self.ref_policy_wg = all_wg["ref"] + self.ref_policy_wg.init_model() + + if self.use_rm: + self.rm_wg = all_wg["rm"] + self.rm_wg.init_model() + + # we should create rollout at the end so that vllm can have a better estimation of kv cache memory + self.actor_rollout_wg = all_wg["actor_rollout"] + self.actor_rollout_wg.init_model() + + # create async rollout manager and request scheduler + self.async_rollout_mode = False + if self.config.actor_rollout_ref.rollout.mode == "async": + from verl.experimental.agent_loop import AgentLoopManager + + self.async_rollout_mode = True + self.async_rollout_manager = AgentLoopManager( + config=self.config, + worker_group=self.actor_rollout_wg, + ) + + def _save_checkpoint(self): + from verl.utils.fs import local_mkdir_safe + + # path: given_path + `/global_step_{global_steps}` + `/actor` + local_global_step_folder = os.path.join(self.config.trainer.default_local_dir, f"global_step_{self.global_steps}") + + print(f"local_global_step_folder: {local_global_step_folder}") + actor_local_path = os.path.join(local_global_step_folder, "actor") + + actor_remote_path = None if self.config.trainer.default_hdfs_dir is None else os.path.join(self.config.trainer.default_hdfs_dir, f"global_step_{self.global_steps}", "actor") + + remove_previous_ckpt_in_save = self.config.trainer.get("remove_previous_ckpt_in_save", False) + if remove_previous_ckpt_in_save: + print("Warning: remove_previous_ckpt_in_save is deprecated," + " set max_actor_ckpt_to_keep=1 and max_critic_ckpt_to_keep=1 instead") + max_actor_ckpt_to_keep = self.config.trainer.get("max_actor_ckpt_to_keep", None) if not remove_previous_ckpt_in_save else 1 + max_critic_ckpt_to_keep = self.config.trainer.get("max_critic_ckpt_to_keep", None) if not remove_previous_ckpt_in_save else 1 + + self.actor_rollout_wg.save_checkpoint(actor_local_path, actor_remote_path, self.global_steps, max_ckpt_to_keep=max_actor_ckpt_to_keep) + + if self.use_critic: + critic_local_path = os.path.join(local_global_step_folder, "critic") + critic_remote_path = None if self.config.trainer.default_hdfs_dir is None else os.path.join(self.config.trainer.default_hdfs_dir, f"global_step_{self.global_steps}", "critic") + self.critic_wg.save_checkpoint(critic_local_path, critic_remote_path, self.global_steps, max_ckpt_to_keep=max_critic_ckpt_to_keep) + + # save dataloader + local_mkdir_safe(local_global_step_folder) + dataloader_local_path = os.path.join(local_global_step_folder, "data.pt") + dataloader_state_dict = self.train_dataloader.state_dict() + torch.save(dataloader_state_dict, dataloader_local_path) + + # latest checkpointed iteration tracker (for atomic usage) + local_latest_checkpointed_iteration = os.path.join(self.config.trainer.default_local_dir, "latest_checkpointed_iteration.txt") + with open(local_latest_checkpointed_iteration, "w") as f: + f.write(str(self.global_steps)) + + def _load_checkpoint(self): + if self.config.trainer.resume_mode == "disable": + return 0 + + # load from hdfs + if self.config.trainer.default_hdfs_dir is not None: + raise NotImplementedError("load from hdfs is not implemented yet") + else: + checkpoint_folder = self.config.trainer.default_local_dir # TODO: check path + if not os.path.isabs(checkpoint_folder): + working_dir = os.getcwd() + checkpoint_folder = os.path.join(working_dir, checkpoint_folder) + global_step_folder = find_latest_ckpt_path(checkpoint_folder) # None if no latest + + # find global_step_folder + if self.config.trainer.resume_mode == "auto": + if global_step_folder is None: + print("Training from scratch") + return 0 + else: + if self.config.trainer.resume_mode == "resume_path": + assert isinstance(self.config.trainer.resume_from_path, str), "resume ckpt must be str type" + assert "global_step_" in self.config.trainer.resume_from_path, "resume ckpt must specify the global_steps" + global_step_folder = self.config.trainer.resume_from_path + if not os.path.isabs(global_step_folder): + working_dir = os.getcwd() + global_step_folder = os.path.join(working_dir, global_step_folder) + print(f"Load from checkpoint folder: {global_step_folder}") + # set global step + self.global_steps = int(global_step_folder.split("global_step_")[-1]) + + print(f"Setting global step to {self.global_steps}") + print(f"Resuming from {global_step_folder}") + + actor_path = os.path.join(global_step_folder, "actor") + critic_path = os.path.join(global_step_folder, "critic") + # load actor + self.actor_rollout_wg.load_checkpoint(actor_path, del_local_after_load=self.config.trainer.del_local_ckpt_after_load) + # load critic + if self.use_critic: + self.critic_wg.load_checkpoint(critic_path, del_local_after_load=self.config.trainer.del_local_ckpt_after_load) + + # load dataloader, + # TODO: from remote not implemented yet + dataloader_local_path = os.path.join(global_step_folder, "data.pt") + if os.path.exists(dataloader_local_path): + dataloader_state_dict = torch.load(dataloader_local_path, weights_only=False) + self.train_dataloader.load_state_dict(dataloader_state_dict) + else: + print(f"Warning: No dataloader state found at {dataloader_local_path}, will start from scratch") + + def _balance_batch(self, batch: DataProto, metrics, logging_prefix="global_seqlen"): + """Reorder the data on single controller such that each dp rank gets similar total tokens""" + attention_mask = batch.batch["attention_mask"] + batch_size = attention_mask.shape[0] + global_seqlen_lst = batch.batch["attention_mask"].view(batch_size, -1).sum(-1).tolist() # (train_batch_size,) + world_size = self.actor_rollout_wg.world_size + global_partition_lst = get_seqlen_balanced_partitions(global_seqlen_lst, k_partitions=world_size, equal_size=True) + # reorder based on index. The data will be automatically equally partitioned by dispatch function + global_idx = torch.tensor([j for partition in global_partition_lst for j in partition]) + batch.reorder(global_idx) + global_balance_stats = log_seqlen_unbalance(seqlen_list=global_seqlen_lst, partitions=global_partition_lst, prefix=logging_prefix) + metrics.update(global_balance_stats) + + def fit(self): + """ + The training loop of PPO. + The driver process only need to call the compute functions of the worker group through RPC + to construct the PPO dataflow. + The light-weight advantage computation is done on the driver process. + """ + from omegaconf import OmegaConf + + from verl.utils.tracking import Tracking + + logger = Tracking( + project_name=self.config.trainer.project_name, + experiment_name=self.config.trainer.experiment_name, + default_backend=self.config.trainer.logger, + config=OmegaConf.to_container(self.config, resolve=True), + ) + + self.global_steps = 0 + + # load checkpoint before doing anything + self._load_checkpoint() + + # perform validation before training + # currently, we only support validation using the reward_function. + # Print validation setup for visibility + try: + print( + f"[validate] Setup: test_freq={self.config.trainer.test_freq}, " + f"val_before_train={self.config.trainer.get('val_before_train', True)}, " + f"has_val_reward_fn={self.val_reward_fn is not None}", + flush=True, + ) + except Exception: + pass + + if self.val_reward_fn is not None and self.config.trainer.get("val_before_train", True): + print(f"[validate] Running initial validation at global_step={self.global_steps} (val_before_train)", flush=True) + val_metrics = self._validate() + assert val_metrics, f"{val_metrics=}" + # Print a concise summary for realtime logs + try: + core_items = {k: v for k, v in val_metrics.items() if any(x in k for x in ("val-core/", "val-aux/"))} + preview = dict(list(core_items.items())[:8]) if len(core_items) > 8 else core_items + print(f"[validate] Initial metrics summary: {preview}", flush=True) + except Exception: + pass + pprint(f"Initial validation metrics: {val_metrics}") + logger.log(data=val_metrics, step=self.global_steps) + if self.config.trainer.get("val_only", False): + return + + # add tqdm + progress_bar = tqdm(total=self.total_training_steps, initial=self.global_steps, desc="Training Progress") + + # we start from step 1 + self.global_steps += 1 + last_val_metrics = None + self.max_steps_duration = 0 + + for epoch in range(self.config.trainer.total_epochs): + for batch_dict in self.train_dataloader: + do_profile = self.global_steps in self.config.trainer.profile_steps if self.config.trainer.profile_steps is not None else False + if do_profile: + self.actor_rollout_wg.start_profile() + if self.use_reference_policy: + self.ref_policy_wg.start_profile() + if self.use_critic: + self.critic_wg.start_profile() + if self.use_rm: + self.rm_wg.start_profile() + + metrics = {} + timing_raw = {} + batch: DataProto = DataProto.from_single_dict(batch_dict) + + # pop those keys for generation + batch_keys_to_pop = ["input_ids", "attention_mask", "position_ids"] + non_tensor_batch_keys_to_pop = ["raw_prompt_ids"] + if "multi_modal_data" in batch.non_tensor_batch: + non_tensor_batch_keys_to_pop.append("multi_modal_data") + if "raw_prompt" in batch.non_tensor_batch: + non_tensor_batch_keys_to_pop.append("raw_prompt") + if "tools_kwargs" in batch.non_tensor_batch: + non_tensor_batch_keys_to_pop.append("tools_kwargs") + if "interaction_kwargs" in batch.non_tensor_batch: + non_tensor_batch_keys_to_pop.append("interaction_kwargs") + gen_batch = batch.pop( + batch_keys=batch_keys_to_pop, + non_tensor_batch_keys=non_tensor_batch_keys_to_pop, + ) + + is_last_step = self.global_steps >= self.total_training_steps + + with marked_timer("step", timing_raw): + # generate a batch + with marked_timer("gen", timing_raw, color="red"): + if not self.async_rollout_mode: + gen_batch_output = self.actor_rollout_wg.generate_sequences(gen_batch) + else: + gen_batch_output = self.async_rollout_manager.generate_sequences(gen_batch) + timing_raw.update(gen_batch_output.meta_info["timing"]) + gen_batch_output.meta_info.pop("timing", None) + + if self.config.algorithm.adv_estimator == AdvantageEstimator.REMAX: + with marked_timer("gen_max", timing_raw, color="purple"): + gen_baseline_batch = deepcopy(gen_batch) + gen_baseline_batch.meta_info["do_sample"] = False + gen_baseline_output = self.actor_rollout_wg.generate_sequences(gen_baseline_batch) + + batch = batch.union(gen_baseline_output) + reward_baseline_tensor = self.reward_fn(batch) + reward_baseline_tensor = reward_baseline_tensor.sum(dim=-1) + + batch.pop(batch_keys=list(gen_baseline_output.batch.keys())) + + batch.batch["reward_baselines"] = reward_baseline_tensor + + del gen_baseline_batch, gen_baseline_output + + batch.non_tensor_batch["uid"] = np.array([str(uuid.uuid4()) for _ in range(len(batch.batch))], dtype=object) + # repeat to align with repeated responses in rollout + batch = batch.repeat(repeat_times=self.config.actor_rollout_ref.rollout.n, interleave=True) + batch = batch.union(gen_batch_output) + + batch.batch["response_mask"] = compute_response_mask(batch) + # Balance the number of valid tokens across DP ranks. + # NOTE: This usually changes the order of data in the `batch`, + # which won't affect the advantage calculation (since it's based on uid), + # but might affect the loss calculation (due to the change of mini-batching). + # TODO: Decouple the DP balancing and mini-batching. + if self.config.trainer.balance_batch: + self._balance_batch(batch, metrics=metrics) + + # compute global_valid tokens + batch.meta_info["global_token_num"] = torch.sum(batch.batch["attention_mask"], dim=-1).tolist() + + with marked_timer("reward", timing_raw, color="yellow"): + # compute reward model score + if self.use_rm: + reward_tensor = self.rm_wg.compute_rm_score(batch) + batch = batch.union(reward_tensor) + + if self.config.reward_model.launch_reward_fn_async: + future_reward = compute_reward_async.remote(batch, self.config, self.tokenizer) + else: + reward_tensor, reward_extra_infos_dict = compute_reward(batch, self.reward_fn) + + # Aggregate and log reward statistics per step for visibility + try: + # reward per sample (sum over response_length) + reward_per_sample = reward_tensor.sum(dim=-1) + metrics.update( + { + "reward/mean": reward_per_sample.mean().item(), + "reward/std": reward_per_sample.float().std(unbiased=False).item(), + "reward/min": reward_per_sample.min().item(), + "reward/max": reward_per_sample.max().item(), + "reward/token_mean": reward_tensor.float().mean().item(), + } + ) + except Exception as _: + # best-effort logging; do not break training on stats + pass + + # recompute old_log_probs + with marked_timer("old_log_prob", timing_raw, color="blue"): + old_log_prob = self.actor_rollout_wg.compute_log_prob(batch) + entropys = old_log_prob.batch["entropys"] + response_masks = batch.batch["response_mask"] + loss_agg_mode = self.config.actor_rollout_ref.actor.loss_agg_mode + entropy_agg = agg_loss(loss_mat=entropys, loss_mask=response_masks, loss_agg_mode=loss_agg_mode) + old_log_prob_metrics = {"actor/entropy": entropy_agg.detach().item()} + metrics.update(old_log_prob_metrics) + old_log_prob.batch.pop("entropys") + batch = batch.union(old_log_prob) + + if "rollout_log_probs" in batch.batch.keys(): + # TODO: we may want to add diff of probs too. + rollout_old_log_probs = batch.batch["rollout_log_probs"] + actor_old_log_probs = batch.batch["old_log_probs"] + attention_mask = batch.batch["attention_mask"] + responses = batch.batch["responses"] + response_length = responses.size(1) + response_mask = attention_mask[:, -response_length:] + + rollout_probs = torch.exp(rollout_old_log_probs) + actor_probs = torch.exp(actor_old_log_probs) + rollout_probs_diff = torch.abs(rollout_probs - actor_probs) + rollout_probs_diff = torch.masked_select(rollout_probs_diff, response_mask.bool()) + rollout_probs_diff_max = torch.max(rollout_probs_diff) + rollout_probs_diff_mean = torch.mean(rollout_probs_diff) + rollout_probs_diff_std = torch.std(rollout_probs_diff) + metrics.update( + { + "training/rollout_probs_diff_max": rollout_probs_diff_max.detach().item(), + "training/rollout_probs_diff_mean": rollout_probs_diff_mean.detach().item(), + "training/rollout_probs_diff_std": rollout_probs_diff_std.detach().item(), + } + ) + + if self.use_reference_policy: + # compute reference log_prob + with marked_timer("ref", timing_raw, color="olive"): + if not self.ref_in_actor: + ref_log_prob = self.ref_policy_wg.compute_ref_log_prob(batch) + else: + ref_log_prob = self.actor_rollout_wg.compute_ref_log_prob(batch) + batch = batch.union(ref_log_prob) + + # compute values + if self.use_critic: + with marked_timer("values", timing_raw, color="cyan"): + values = self.critic_wg.compute_values(batch) + batch = batch.union(values) + + with marked_timer("adv", timing_raw, color="brown"): + # we combine with rule-based rm + reward_extra_infos_dict: dict[str, list] + if self.config.reward_model.launch_reward_fn_async: + reward_tensor, reward_extra_infos_dict = ray.get(future_reward) + batch.batch["token_level_scores"] = reward_tensor + + if reward_extra_infos_dict: + batch.non_tensor_batch.update({k: np.array(v) for k, v in reward_extra_infos_dict.items()}) + + # compute rewards. apply_kl_penalty if available + if self.config.algorithm.use_kl_in_reward: + batch, kl_metrics = apply_kl_penalty(batch, kl_ctrl=self.kl_ctrl_in_reward, kl_penalty=self.config.algorithm.kl_penalty) + metrics.update(kl_metrics) + else: + batch.batch["token_level_rewards"] = batch.batch["token_level_scores"] + + # Print within-group reward mean per step (for GRPO-style multi-sample groups) + try: + import os + enable_print = os.getenv("VERL_PRINT_GROUP_REWARDS", "1") + if str(enable_print).lower() in ("1", "true", "yes", "on"): + # Per-sample scalar reward (sum over tokens) + per_sample_scores = batch.batch["token_level_rewards"].sum(-1).detach().cpu().tolist() + uids = batch.non_tensor_batch.get("uid", None) + if uids is not None: + from collections import defaultdict + group_scores = defaultdict(list) + for i, uid in enumerate(uids): + group_scores[uid].append(per_sample_scores[i]) + + # Limit how many groups to print to avoid flooding logs + max_groups_env = os.getenv("VERL_PRINT_GROUP_REWARDS_MAX_GROUPS", "3") + try: + max_groups = int(max_groups_env) + except Exception: + max_groups = 3 + printed = 0 + for uid, scores in group_scores.items(): + # Only print groups that actually have multiple samples + if len(scores) > 1: + try: + mean_score = float(sum(scores) / len(scores)) + except Exception: + mean_score = float('nan') + print(f"[step {self.global_steps}] group uid={uid} mean_reward={mean_score:.4f} n={len(scores)}") + printed += 1 + if max_groups > 0 and printed >= max_groups: + break + else: + # Fallback: print batch mean if uid not available + if len(per_sample_scores) > 0: + try: + batch_mean = float(sum(per_sample_scores) / len(per_sample_scores)) + except Exception: + batch_mean = float('nan') + print(f"[step {self.global_steps}] batch mean_reward={batch_mean:.4f} n={len(per_sample_scores)}") + except Exception as _e: + # Be conservative: never break training on logging + pass + + # compute advantages, executed on the driver process + + norm_adv_by_std_in_grpo = self.config.algorithm.get("norm_adv_by_std_in_grpo", True) # GRPO adv normalization factor + + batch = compute_advantage( + batch, + adv_estimator=self.config.algorithm.adv_estimator, + gamma=self.config.algorithm.gamma, + lam=self.config.algorithm.lam, + num_repeat=self.config.actor_rollout_ref.rollout.n, + norm_adv_by_std_in_grpo=norm_adv_by_std_in_grpo, + multi_turn=self.config.actor_rollout_ref.rollout.multi_turn.enable, + config=self.config.algorithm, + ) + + # update critic + if self.use_critic: + with marked_timer("update_critic", timing_raw, color="pink"): + critic_output = self.critic_wg.update_critic(batch) + critic_output_metrics = reduce_metrics(critic_output.meta_info["metrics"]) + metrics.update(critic_output_metrics) + + # implement critic warmup + if self.config.trainer.critic_warmup <= self.global_steps: + # update actor + with marked_timer("update_actor", timing_raw, color="red"): + batch.meta_info["multi_turn"] = self.config.actor_rollout_ref.rollout.multi_turn.enable + actor_output = self.actor_rollout_wg.update_actor(batch) + actor_output_metrics = reduce_metrics(actor_output.meta_info["metrics"]) + metrics.update(actor_output_metrics) + + # Log rollout generations if enabled + rollout_data_dir = self.config.trainer.get("rollout_data_dir", None) + if rollout_data_dir: + with marked_timer("dump_rollout_generations", timing_raw, color="green"): + print(batch.batch.keys()) + inputs = self.tokenizer.batch_decode(batch.batch["prompts"], skip_special_tokens=True) + outputs = self.tokenizer.batch_decode(batch.batch["responses"], skip_special_tokens=True) + scores = batch.batch["token_level_scores"].sum(-1).cpu().tolist() + self._dump_generations( + inputs=inputs, + outputs=outputs, + scores=scores, + reward_extra_infos_dict=reward_extra_infos_dict, + dump_path=rollout_data_dir, + ) + + # validate + if self.val_reward_fn is not None and self.config.trainer.test_freq > 0 and (is_last_step or self.global_steps % self.config.trainer.test_freq == 0): + print(f"[validate] Triggered at global_step={self.global_steps} (is_last_step={is_last_step}) with test_freq={self.config.trainer.test_freq}", flush=True) + with marked_timer("testing", timing_raw, color="green"): + val_metrics: dict = self._validate() + # Print a concise summary for realtime logs + try: + core_items = {k: v for k, v in val_metrics.items() if any(x in k for x in ("val-core/", "val-aux/"))} + preview = dict(list(core_items.items())[:8]) if len(core_items) > 8 else core_items + print(f"[validate] Metrics summary at step {self.global_steps}: {preview}", flush=True) + except Exception: + pass + if is_last_step: + last_val_metrics = val_metrics + metrics.update(val_metrics) + + esi_close_to_expiration = should_save_ckpt_esi(max_steps_duration=self.max_steps_duration, redundant_time=self.config.trainer.esi_redundant_time) + if self.config.trainer.save_freq > 0 and (is_last_step or self.global_steps % self.config.trainer.save_freq == 0 or esi_close_to_expiration): + if esi_close_to_expiration: + print("Force saving checkpoint: ESI instance expiration approaching.") + with marked_timer("save_checkpoint", timing_raw, color="green"): + self._save_checkpoint() + + steps_duration = timing_raw["step"] + self.max_steps_duration = max(self.max_steps_duration, steps_duration) + # training metrics + metrics.update( + { + "training/global_step": self.global_steps, + "training/epoch": epoch, + } + ) + # collect metrics + metrics.update(compute_data_metrics(batch=batch, use_critic=self.use_critic)) + metrics.update(compute_timing_metrics(batch=batch, timing_raw=timing_raw)) + # TODO: implement actual tflpo and theoretical tflpo + n_gpus = self.resource_pool_manager.get_n_gpus() + metrics.update(compute_throughout_metrics(batch=batch, timing_raw=timing_raw, n_gpus=n_gpus)) + + # TODO: make a canonical logger that supports various backend + logger.log(data=metrics, step=self.global_steps) + + progress_bar.update(1) + self.global_steps += 1 + + if do_profile: + self.actor_rollout_wg.stop_profile() + if self.use_reference_policy: + self.ref_policy_wg.stop_profile() + if self.use_critic: + self.critic_wg.stop_profile() + if self.use_rm: + self.rm_wg.stop_profile() + + if is_last_step: + pprint(f"Final validation metrics: {last_val_metrics}") + progress_bar.close() + return diff --git "a/openseek/competition/pz/UCI001/\345\206\263\350\265\233\346\212\200\346\234\257\346\212\245\345\221\212--UCI001.pdf" "b/openseek/competition/pz/UCI001/\345\206\263\350\265\233\346\212\200\346\234\257\346\212\245\345\221\212--UCI001.pdf" new file mode 100644 index 0000000..2160da7 Binary files /dev/null and "b/openseek/competition/pz/UCI001/\345\206\263\350\265\233\346\212\200\346\234\257\346\212\245\345\221\212--UCI001.pdf" differ diff --git "a/openseek/competition/pz/UCI001/\345\206\263\350\265\233\350\256\255\347\273\203\345\221\275\344\273\244/run.sh" "b/openseek/competition/pz/UCI001/\345\206\263\350\265\233\350\256\255\347\273\203\345\221\275\344\273\244/run.sh" new file mode 100644 index 0000000..056d6c1 --- /dev/null +++ "b/openseek/competition/pz/UCI001/\345\206\263\350\265\233\350\256\255\347\273\203\345\221\275\344\273\244/run.sh" @@ -0,0 +1,31 @@ +PYTHONUNBUFFERED=1 python3 -m verl.trainer.main_ppo \ +data.train_files=/home/openseek_data/sft_data/Big-Math-RL-Verified-Processed_pri-mid/big-math-rl-verified-processed_orca_cnk12_gsm8k_newprompt_2_train.parquet \ +data.val_files=/home/openseek_data/sft_data/Big-Math-RL-Verified-Processed_pri-mid/big-math-rl-verified-processed_orca_cnk12_gsm8k_newprompt_2_val.parquet \ +data.train_batch_size=128 \ +data.max_prompt_length=1300 \ +data.max_response_length=2700 \ +data.trust_remote_code=True \ + actor_rollout_ref.model.path=/home/openseek_data/OpenSeek-Small-v1-SFT/BAAI/OpenSeek-Small-v1-SFT \ + actor_rollout_ref.model.trust_remote_code=True \ + actor_rollout_ref.actor.optim.lr=1e-6 \ + actor_rollout_ref.actor.ppo_mini_batch_size=64 \ + actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=8 \ + actor_rollout_ref.rollout.name=vllm \ + actor_rollout_ref.rollout.n=16 \ + actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=8 \ + actor_rollout_ref.rollout.tensor_model_parallel_size=1 \ + actor_rollout_ref.rollout.gpu_memory_utilization=0.4 \ + actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=4 \ + critic.optim.lr=1e-5 \ + critic.model.path=/home/openseek_data/OpenSeek-Small-v1-SFT/BAAI/OpenSeek-Small-v1-SFT \ + critic.model.trust_remote_code=True \ + critic.ppo_micro_batch_size_per_gpu=4 \ + algorithm.adv_estimator=grpo \ + algorithm.kl_ctrl.kl_coef=0.02 \ + trainer.logger=[console,tensorboard] \ + trainer.val_before_train=False \ + trainer.n_gpus_per_node=8 \ + trainer.nnodes=1 \ + trainer.save_freq=200 \ + trainer.test_freq=50 \ + trainer.total_epochs=3 2>&1 | tee verl_demo2.log diff --git "a/openseek/competition/pz/UCI001/\345\210\235\350\265\233\346\212\200\346\234\257\346\212\245\345\221\212--UCI001.pdf" "b/openseek/competition/pz/UCI001/\345\210\235\350\265\233\346\212\200\346\234\257\346\212\245\345\221\212--UCI001.pdf" new file mode 100644 index 0000000..687ddee Binary files /dev/null and "b/openseek/competition/pz/UCI001/\345\210\235\350\265\233\346\212\200\346\234\257\346\212\245\345\221\212--UCI001.pdf" differ diff --git "a/openseek/competition/pz/UCI001/\345\210\235\350\265\233\351\205\215\347\275\256\346\226\207\344\273\266/train_deepseek_v3_1_4b.yaml" "b/openseek/competition/pz/UCI001/\345\210\235\350\265\233\351\205\215\347\275\256\346\226\207\344\273\266/train_deepseek_v3_1_4b.yaml" new file mode 100644 index 0000000..bcb7fb3 --- /dev/null +++ "b/openseek/competition/pz/UCI001/\345\210\235\350\265\233\351\205\215\347\275\256\346\226\207\344\273\266/train_deepseek_v3_1_4b.yaml" @@ -0,0 +1,144 @@ +system: + recompute_method: "uniform" + recompute_granularity: "full" + recompute_num_layers: 6 + moe_router_dtype: fp32 + no_shared_fs: ${experiment.runner.no_shared_fs} + num_workers: 4 + tensor_model_parallel_size: 1 + pipeline_model_parallel_size: 1 + expert_model_parallel_size: 1 + context_parallel_size: 1 + disable_bias_linear: true + reset_position_ids: True + reset_attention_mask: True + qk_layernorm: true + sequence_parallel: true + use_distributed_optimizer: true + overlap_grad_reduce: true + overlap_param_gather: true + finetune: true + precision: + bf16: true + attention_softmax_in_fp32: true + accumulate_allreduce_grads_in_fp32: true + logging: + log_interval: 1 + tensorboard_log_interval: 1 + wandb_project: ${experiment.exp_name} + wandb_exp_name: ${experiment.exp_name} + log_timers_to_tensorboard: true + log_validation_ppl_to_tensorboard: true + log_throughput: true + log_params_norm: true + log_num_zeros_in_grad: true + log_memory_to_tensorboard: true + checkpoint: + save_interval: ${experiment.save_steps} + load: ${experiment.load} + ckpt_format: ${experiment.ckpt_format} + +model: + transformer_impl: transformer_engine + attention_backend: unfused + ## 12 -> 6 for lower mem usage + num_layers: 6 + hidden_size: 1280 + num_attention_heads: 10 + group_query_attention: false + num_query_groups: 10 # num_key_value_heads + seq_length: 4096 + max_position_embeddings: 4096 + norm_epsilon: 1e-6 + use_rotary_position_embeddings: true + rotary_base: 1000000 + swiglu: true + normalization: RMSNorm + init_method_std: 6e-3 + attention_dropout: 0.0 + hidden_dropout: 0.0 + clip_grad: 1.0 + position_embedding_type: rope + untie_embeddings_and_output_weights: false + no_position_embedding: true + no_rope_fusion: true + + # mla args ================== + multi_latent_attention: true + kv_lora_rank: 512 + qk_head_dim: 128 + qk_pos_emb_head_dim: 64 + v_head_dim: 128 + + # moe args =================== + ffn_hidden_size: 7168 + moe_ffn_hidden_size: 896 + moe_grouped_gemm: true + moe_shared_expert_intermediate_size: 1792 + num_experts: 64 + moe_router_load_balancing_type: "seq_aux_loss" + moe_router_score_function: sigmoid + moe_router_enable_expert_bias: true + moe_router_bias_update_rate: 0.001 + moe_aux_loss_coeff: 0.0001 + ## 12 -> 6 for lower mem usage + moe_layer_freq: "[0]+[1]*5" + # moe_layer_freq: "[0]+[1]*11" + # node limited routing + moe_router_num_groups: 1 + moe_router_group_topk: 1 + moe_router_topk: 6 + moe_router_topk_scaling_factor: 2.446 + moe_token_dispatcher_type: "alltoall" + # moe_permute_fusion: true + + # moe args =================== + # num_mtp_predictor: 1 + # mtp_loss_coeff: 0.3 + + # training + seed: ${experiment.seed} + micro_batch_size: 4 + global_batch_size: 1024 + eval_iters: 0 + # train_samples: 24576000 #100B tokens + train_samples: 1228800 #5B #7372800 #30B #6144000 #25B #4882812 #20B + + optimizer: + weight_decay: 0.1 + adam_beta1: 0.9 + adam_beta2: 0.95 + lr_scheduler: + lr: 1.0e-5 + min_lr: 5.0e-6 + lr_warmup_samples: 200 + lr_decay_style: cosine + + +data: + # exp: baseline + data_path: + - 0.3755 + - ${experiment.dataset_base_dir}/cot_synthesis2_CC-high/23_text_document + - 1.3135 + - ${experiment.dataset_base_dir}/cot_synthesis2_math-high/12_text_document + - 0.2573 + - ${experiment.dataset_base_dir}/cot_synthesis2_OpenSource-high/1_text_document + - 0.6314 + - ${experiment.dataset_base_dir}/cot_synthesis2_wiki-high/5_text_document + - 0.5074 + - ${experiment.dataset_base_dir}/cot_synthesis_math-high/11_text_document + - 0.4081 + - ${experiment.dataset_base_dir}/cot_synthesis_OpenSource-high/4_text_document + - 0.5397 + - ${experiment.dataset_base_dir}/Nemotron-CC-high-synthetic-diverse_qa_pairs-high/part_244_text_document + - 0.4616 + - ${experiment.dataset_base_dir}/Nemotron-CC-high-synthetic-extract_knowledge-high/part_498_text_document + + split: 1 + no_mmap_bin_files: true + tokenizer: + tokenizer_type: QwenTokenizerFS + tokenizer_path: ../hf_openseek/tokenizer + vocab_size: 151851 + make_vocab_size_divisible_by: 64 diff --git "a/openseek/competition/pz/UCI001/\345\210\235\350\265\233\351\205\215\347\275\256\346\226\207\344\273\266/train_deepseek_v3_1_4b_test6.yaml" "b/openseek/competition/pz/UCI001/\345\210\235\350\265\233\351\205\215\347\275\256\346\226\207\344\273\266/train_deepseek_v3_1_4b_test6.yaml" new file mode 100644 index 0000000..a41cb4d --- /dev/null +++ "b/openseek/competition/pz/UCI001/\345\210\235\350\265\233\351\205\215\347\275\256\346\226\207\344\273\266/train_deepseek_v3_1_4b_test6.yaml" @@ -0,0 +1,158 @@ +system: + recompute_method: "uniform" + recompute_granularity: "full" + recompute_num_layers: 6 + moe_router_dtype: fp32 + no_shared_fs: ${experiment.runner.no_shared_fs} + num_workers: 4 + tensor_model_parallel_size: 1 + pipeline_model_parallel_size: 1 + expert_model_parallel_size: 1 + context_parallel_size: 1 + disable_bias_linear: true + reset_position_ids: True + reset_attention_mask: True + qk_layernorm: true + sequence_parallel: true + use_distributed_optimizer: true + overlap_grad_reduce: true + overlap_param_gather: true + finetune: true + precision: + bf16: true + attention_softmax_in_fp32: true + accumulate_allreduce_grads_in_fp32: true + logging: + log_interval: 1 + tensorboard_log_interval: 1 + wandb_project: ${experiment.exp_name} + wandb_exp_name: ${experiment.exp_name} + log_timers_to_tensorboard: true + log_validation_ppl_to_tensorboard: true + log_throughput: true + log_params_norm: true + log_num_zeros_in_grad: true + log_memory_to_tensorboard: true + checkpoint: + save_interval: ${experiment.save_steps} + load: ${experiment.load} + ckpt_format: ${experiment.ckpt_format} + +model: + transformer_impl: transformer_engine + attention_backend: unfused + ## 12 -> 6 for lower mem usage + num_layers: 6 + hidden_size: 1280 + num_attention_heads: 10 + group_query_attention: false + num_query_groups: 10 # num_key_value_heads + seq_length: 4096 + max_position_embeddings: 4096 + norm_epsilon: 1e-6 + use_rotary_position_embeddings: true + rotary_base: 1000000 + swiglu: true + normalization: RMSNorm + init_method_std: 6e-3 + attention_dropout: 0.0 + hidden_dropout: 0.0 + clip_grad: 1.0 + position_embedding_type: rope + untie_embeddings_and_output_weights: false + no_position_embedding: true + no_rope_fusion: true + + # mla args ================== + multi_latent_attention: true + kv_lora_rank: 512 + qk_head_dim: 128 + qk_pos_emb_head_dim: 64 + v_head_dim: 128 + + # moe args =================== + ffn_hidden_size: 7168 + moe_ffn_hidden_size: 896 + moe_grouped_gemm: true + moe_shared_expert_intermediate_size: 1792 + num_experts: 64 + moe_router_load_balancing_type: "seq_aux_loss" + moe_router_score_function: sigmoid + moe_router_enable_expert_bias: true + moe_router_bias_update_rate: 0.001 + moe_aux_loss_coeff: 0.0001 + ## 12 -> 6 for lower mem usage + moe_layer_freq: "[0]+[1]*5" + # moe_layer_freq: "[0]+[1]*11" + # node limited routing + moe_router_num_groups: 1 + moe_router_group_topk: 1 + moe_router_topk: 6 + moe_router_topk_scaling_factor: 2.446 + moe_token_dispatcher_type: "alltoall" + # moe_permute_fusion: true + + # moe args =================== + # num_mtp_predictor: 1 + # mtp_loss_coeff: 0.3 + + # training + seed: ${experiment.seed} + micro_batch_size: 4 + global_batch_size: 1024 + eval_iters: 0 + # train_samples: 24576000 #100B tokens + train_samples: 6144000 #25B #7372800 #30B #6144000 #25B #4882812 #20B + + optimizer: + weight_decay: 0.1 + adam_beta1: 0.9 + adam_beta2: 0.95 + lr_scheduler: + lr: 3.0e-5 + min_lr: 5.0e-6 + lr_warmup_samples: 200 + lr_decay_style: cosine + + +data: + # exp: baseline + data_path: + - 1.8171 + - ${experiment.dataset_base_dir}/zh_cc-high-loss0/part_28_text_document + - 0.6414 + - ${experiment.dataset_base_dir}/arxiv/007_00000_text_document + - 6.0237 + - ${experiment.dataset_base_dir}/cot_synthesis2_arxiv-high/2_text_document + - 0.3755 + - ${experiment.dataset_base_dir}/cot_synthesis2_CC-high/23_text_document + - 1.3135 + - ${experiment.dataset_base_dir}/cot_synthesis2_math-high/12_text_document + - 0.2573 + - ${experiment.dataset_base_dir}/cot_synthesis2_OpenSource-high/1_text_document + - 0.6314 + - ${experiment.dataset_base_dir}/cot_synthesis2_wiki-high/5_text_document + - 0.2225 + - ${experiment.dataset_base_dir}/cot_synthesis_CC-high/74_text_document + - 0.5074 + - ${experiment.dataset_base_dir}/cot_synthesis_math-high/11_text_document + - 0.4081 + - ${experiment.dataset_base_dir}/cot_synthesis_OpenSource-high/4_text_document + - 0.4000 + - ${experiment.dataset_base_dir}/cot_synthesis_wiki-high/4_text_document + - 1.8165 + - ${experiment.dataset_base_dir}/math-high/part_04_text_document + - 0.5397 + - ${experiment.dataset_base_dir}/Nemotron-CC-high-synthetic-diverse_qa_pairs-high/part_244_text_document + - 0.4616 + - ${experiment.dataset_base_dir}/Nemotron-CC-high-synthetic-extract_knowledge-high/part_498_text_document + # - 6.1982 + # - ${experiment.dataset_base_dir}/pes2o/pubmedcentral_3_text_document + + split: 1 + no_mmap_bin_files: true + tokenizer: + tokenizer_type: QwenTokenizerFS + tokenizer_path: ../hf_openseek/tokenizer + vocab_size: 151851 + make_vocab_size_divisible_by: 64