diff --git a/openseek/competition/pz/yuanboyang/README.md b/openseek/competition/pz/yuanboyang/README.md new file mode 100644 index 0000000..6d1b9a5 --- /dev/null +++ b/openseek/competition/pz/yuanboyang/README.md @@ -0,0 +1,151 @@ +# 决赛代码(可完整运行的代码库) + +## 文件结构 +project_root/ +├── README.md # 使用说明(本文件) +├── requirementsverl.txt # verl 训练环境依赖 +├── requirementstest.txt # 测试/评测环境依赖 +├── download.py # 训练集下载处理 +├── verl/ # 修改过的 verl 源码 +│ └── verl/utils/reward_score/geo3k.py # reward 函数修改 +│ └── verl/examples/data_preprocess/gsm8k.py # 验证集下载处理 + +## 1. 数据下载与处理 +- 训练集下载处理:`download.py` +- 验证集下载处理:`verl/examples/data_preprocess/gsm8k.py` + +## 2. 代码修改说明 +### 基于 [verl](https://github.com/volcengine/verl) 源码的修改 +- 主要修改点: + - 对于数据源和prompt的修改: + - examples/data_preprocess/gsm8k.py: + - 将 + ```python + import datasets + ... + data_source = "openai/gsm8k" + dataset = datasets.load_dataset(data_source, "main") + train_dataset = dataset["train"] + test_dataset = dataset["test"] + ``` + - 修改为 + ```python + from modelscope.msdatasets import MsDataset + ... + data_source = "hiyouga/geometry3k" # 注意:这里的源地址可能是一个笔误,但加载代码本身是针对 modelscope/gsm8k 的 + train_dataset = MsDataset.load('modelscope/gsm8k', subset_name='main', split='train', trust_remote_code=True) + test_dataset = MsDataset.load('modelscope/gsm8k', subset_name='main', split='test', trust_remote_code=True) + ``` + - 将 + ```python + instruction_following = 'Let\'s think step by step and output the final answer after "####".' + question = question_raw + " " + instruction_following + ``` + - 修改为 + ```python + instruction_following = instruction = r'Please reason step by step,and must put your final answer within \boxed{}.Question:' + question = instruction + " " + question_raw + ``` + - 对于trust_remote_code=True的修改: + - verl/model_merger/base_model_merger.py: + - 将 + ```python + with init_empty_weights(): + model = auto_model_class.from_config( + self.model_config, torch_dtype=torch.bfloat16, trust_remote_code=self.config.trust_remote_code + ) + ``` + - 修改为 + ```python + with init_empty_weights(): + model = auto_model_class.from_config( + self.model_config, torch_dtype=torch.bfloat16, trust_remote_code=True + ) + ``` + - verl/trainer/main_ppo.py: + - 将 + ```python + trust_remote_code = config.data.get("trust_remote_code", False) + ``` + - 修改为 + ```python + trust_remote_code = True + ``` + - verl/workers/fsdp_workers.py: + - 将 + ```python + trust_remote_code=trust_remote_code + ``` + - 修改为 + ```python + trust_remote_code=True + ``` + + - 修改了 `verl/utils/reward_score/geo3k.py` 中的 reward 函数: + - verl/utils/reward_score/geo3k.py: + - 将 + ```python + pattern = re.compile(r".*.*\\boxed\{.*\}.*", re.DOTALL) + ``` + - 修改为 + ```python + pattern = re.compile(r".*\\boxed\{.*\}.*", re.DOTALL) + ``` + +### 基于 [transformers](https://github.com/huggingface/transformers) 源码的修改 +- 修改文件: + - `/root/miniconda3/envs/verl/lib/python3.10/site-packages/transformers/configuration_utils.py` +- 修改内容: + - 将第 917 行改为: + ```python + json.dumps(config_dict, indent=2, sort_keys=False) + "\n" + ``` + +## 3. 环境依赖 +```bash +# verl 环境 +pip install -r requirementsverl.txt + +# 测试环境 +pip install -r requirementstest.txt +``` +## 4. 运行指令 +```bash +nohup env PYTHONUNBUFFERED=1 python3 -m verl.trainer.main_ppo \ + algorithm.adv_estimator=grpo \ + data.train_files=/usr/train3.parquet \ # 需要自己修改位置 + data.train_batch_size=264 \ + data.max_prompt_length=2048 \ + data.max_response_length=512 \ + actor_rollout_ref.model.path=/root/.cache/modelscope/hub/models/BAAI/OpenSeek-Small-v1-SFT \ # 需要自己修改位置 + actor_rollout_ref.actor.optim.lr=1e-5 \ + actor_rollout_ref.actor.ppo_mini_batch_size=72 \ + actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=4 \ + actor_rollout_ref.rollout.name=vllm \ + +actor_rollout_ref.actor.fsdp_config.model_dtype=bf16 \ + actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=4 \ + actor_rollout_ref.rollout.tensor_model_parallel_size=1 \ + actor_rollout_ref.rollout.gpu_memory_utilization=0.5 \ + actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=4 \ + trainer.logger=tensorboard \ + trainer.val_before_train=True \ + trainer.n_gpus_per_node=6 \ + trainer.nnodes=1 \ + trainer.save_freq=200 \ + trainer.test_freq=10 \ + trainer.total_epochs=15 \ + data.val_files=$HOME/data/gsm8k/test.parquet \ + actor_rollout_ref.rollout.n=6 \ + > train.log 2>&1 & +``` +## 5. 模型融合及评测 +### 模型融合 +```bash +python3 -m verl.model_merger merge \ + --backend fsdp \ + --local_dir /usr/checkpoints/verl_examples/gsm8k/global_step_8000/actor \ + --target_dir /usr/checkpoints/verl_examples/gsm8k/global_step_8000/actor/huggingface +``` +### 评测 +- 使用官方代码'/OpenSeek/evaluation/qwen_eval/sh/run_evaluate.sh' +- 以上均需要自行修改模型位置 diff --git a/openseek/competition/pz/yuanboyang/download.py b/openseek/competition/pz/yuanboyang/download.py new file mode 100644 index 0000000..f33a838 --- /dev/null +++ b/openseek/competition/pz/yuanboyang/download.py @@ -0,0 +1,89 @@ +import argparse +import os +from modelscope.msdatasets import MsDataset + +def main(): + """ + 主函数,从 ModelScope 加载数据集,进行处理,并保存为 Parquet 文件。 + """ + parser = argparse.ArgumentParser(description="Convert Big-Math dataset from ModelScope to a verl-compatible PARQUET format.") + # 我们仍然保留 output_file 参数,以便您可以指定输出路径 + parser.add_argument("--output_file", type=str, required=True, help="Path for the output PARQUET file (e.g., train.parquet).") + args = parser.parse_args() + + # 数据集信息 + dataset_name = 'open-r1/Big-Math-RL-Verified-Processed' + subset_name = 'all' + split = 'train' + data_source_name = "Big-Math" # 用于在数据中标记来源 + + print(f"Loading dataset '{dataset_name}' from ModelScope...") + + # 1. 使用 MsDataset.load 直接加载数据集 + # 这一步就已经得到了一个结构化的数据集对象 + dataset = MsDataset.load(dataset_name, subset_name=subset_name, split=split) + + print(f"Loaded {len(dataset)} records. Starting preprocessing...") + + # 2. 定义处理函数,将原始数据格式映射到目标格式 + # 这个函数会被 .map() 方法应用到每一条记录上 + def process_fn(example, idx): + # 从原始记录中提取需要的字段 + # 注意:这里的键名 ('prompt', 'solution' 等) 需要根据您数据集的实际列名来定 + # 请根据 'open-r1/Big-Math-RL-Verified-Processed' 数据集的实际情况调整 + problem_raw = example.get("prompt", "") + answer_clean = example.get("solution", "") + domain = example.get("domain", []) + solve_rate = example.get("llama8b_solve_rate", None) + + # 构建 prompt 内容 + instruction = r'Please reason step by step,and must put your final answer within \boxed{}.Question:' + prompt_content = instruction+ " " + problem_raw + + # 构建 reward_model 字段 + reward_model_data = { + "style": "rule", + "ground_truth": str(answer_clean) # 确保是字符串 + } + + # 组装成最终的数据结构 + processed_data = { + "data_source": 'hiyouga/geometry3k', + "prompt": [ + { + "role": "user", + "content": prompt_content, + } + ], + "ability": "math", + "reward_model": reward_model_data, + "extra_info": { + "index": idx, + "original_problem": problem_raw, + "domain": domain, + "llama8b_solve_rate": solve_rate, + }, + } + return processed_data + + # 3. 使用 .map() 方法应用处理函数 + # MsDataset 的 .map() 实现通常非常稳健 + processed_dataset = dataset.map(function=process_fn, with_indices=True) + + print("Preprocessing complete.") + + # 确保输出目录存在 + output_dir = os.path.dirname(args.output_file) + if output_dir: + os.makedirs(output_dir, exist_ok=True) + + # 4. 将处理好的数据集直接保存为 Parquet 文件 + print(f"Saving output to '{args.output_file}'...") + processed_dataset.to_parquet(args.output_file) + # processed_dataset.to_json(args.output_file, lines=True, force_ascii=False) + + print("Conversion finished successfully!") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/openseek/competition/pz/yuanboyang/my_actual_changes.diff b/openseek/competition/pz/yuanboyang/my_actual_changes.diff new file mode 100644 index 0000000..d864d5d --- /dev/null +++ b/openseek/competition/pz/yuanboyang/my_actual_changes.diff @@ -0,0 +1,220 @@ +diff --git a/examples/data_preprocess/gsm8k.py b/examples/data_preprocess/gsm8k.py +index f39c4f09..a3bbdc44 100644 +--- a/examples/data_preprocess/gsm8k.py ++++ b/examples/data_preprocess/gsm8k.py +@@ -22,7 +22,7 @@ import re + import datasets + + from verl.utils.hdfs_io import copy, makedirs +- ++from modelscope.msdatasets import MsDataset + + def extract_solution(solution_str): + solution = re.search("#### (\\-?[0-9\\.\\,]+)", solution_str) +@@ -39,21 +39,19 @@ if __name__ == "__main__": + + args = parser.parse_args() + +- data_source = "openai/gsm8k" +- +- dataset = datasets.load_dataset(data_source, "main") ++ data_source = "hiyouga/geometry3k" + +- train_dataset = dataset["train"] +- test_dataset = dataset["test"] ++ train_dataset = MsDataset.load('modelscope/gsm8k', subset_name='main', split='train',trust_remote_code=True) ++ test_dataset = MsDataset.load('modelscope/gsm8k', subset_name='main', split='test',trust_remote_code=True) + +- instruction_following = 'Let\'s think step by step and output the final answer after "####".' ++ instruction_following = instruction = r'Please reason step by step,and must put your final answer within \boxed{}.Question:' + + # add a row to each data item that represents a unique id + def make_map_fn(split): + def process_fn(example, idx): + question_raw = example.pop("question") +- +- question = question_raw + " " + instruction_following ++ # 使用新的 prompt 模板 ++ question = instruction+ " " + question_raw + + answer_raw = example.pop("answer") + solution = extract_solution(answer_raw) +diff --git a/scripts/install_vllm_sglang_mcore.sh b/scripts/install_vllm_sglang_mcore.sh +index 0e305c5d..59000579 100755 +--- a/scripts/install_vllm_sglang_mcore.sh ++++ b/scripts/install_vllm_sglang_mcore.sh +@@ -9,7 +9,7 @@ echo "1. install inference frameworks and pytorch they need" + if [ $USE_SGLANG -eq 1 ]; then + pip install "sglang[all]==0.4.6.post1" --no-cache-dir --find-links https://flashinfer.ai/whl/cu124/torch2.6/flashinfer-python && pip install torch-memory-saver --no-cache-dir + fi +-pip install --no-cache-dir "vllm==0.8.5.post1" "torch==2.6.0" "torchvision==0.21.0" "torchaudio==2.6.0" "tensordict==0.6.2" torchdata ++pip install --no-cache-dir "vllm==0.8.2" "torch==2.6.0" "torchvision==0.21.0" "torchaudio==2.6.0" "tensordict==0.6.2" torchdata + + echo "2. install basic packages" + pip install "transformers[hf_xet]>=4.51.0" accelerate datasets peft hf-transfer \ +diff --git a/verl/model_merger/base_model_merger.py b/verl/model_merger/base_model_merger.py +index b46f40f8..6d081dc8 100644 +--- a/verl/model_merger/base_model_merger.py ++++ b/verl/model_merger/base_model_merger.py +@@ -293,7 +293,7 @@ class BaseModelMerger(ABC): + auto_model_class = self.get_transformers_auto_model_class() + with init_empty_weights(): + model = auto_model_class.from_config( +- self.model_config, torch_dtype=torch.bfloat16, trust_remote_code=self.config.trust_remote_code ++ self.model_config, torch_dtype=torch.bfloat16, trust_remote_code=True + ) + model.to_empty(device="cpu") + model = self.patch_model_generation_config(model) +diff --git a/verl/trainer/config/_generated_ppo_megatron_trainer.yaml b/verl/trainer/config/_generated_ppo_megatron_trainer.yaml +index 03d4d5ca..eb282768 100644 +--- a/verl/trainer/config/_generated_ppo_megatron_trainer.yaml ++++ b/verl/trainer/config/_generated_ppo_megatron_trainer.yaml +@@ -251,7 +251,7 @@ actor_rollout_ref: + moe_config: + freeze_moe_router: false + use_fused_kernels: false +- trust_remote_code: false ++ trust_remote_code: True + data: + tokenizer: null + use_shm: false +@@ -274,7 +274,7 @@ data: + truncation: error + image_key: images + video_key: videos +- trust_remote_code: false ++ trust_remote_code: True + custom_cls: + path: null + name: null +@@ -391,7 +391,7 @@ reward_model: + input_tokenizer: ${actor_rollout_ref.model.path} + path: ~/models/FsfairX-LLaMA3-RM-v0.1 + external_lib: ${actor_rollout_ref.model.external_lib} +- trust_remote_code: false ++ trust_remote_code: True + micro_batch_size: null + micro_batch_size_per_gpu: null + max_length: null +diff --git a/verl/trainer/config/_generated_ppo_trainer.yaml b/verl/trainer/config/_generated_ppo_trainer.yaml +index 3c7a73f7..8554e613 100644 +--- a/verl/trainer/config/_generated_ppo_trainer.yaml ++++ b/verl/trainer/config/_generated_ppo_trainer.yaml +@@ -232,7 +232,7 @@ actor_rollout_ref: + use_fused_kernels: false + fused_kernel_options: + impl_backend: torch +- trust_remote_code: false ++ trust_remote_code: True + data: + tokenizer: null + use_shm: false +@@ -255,7 +255,7 @@ data: + truncation: error + image_key: images + video_key: videos +- trust_remote_code: false ++ trust_remote_code: True + custom_cls: + path: null + name: null +@@ -359,7 +359,7 @@ reward_model: + input_tokenizer: ${actor_rollout_ref.model.path} + path: ~/models/FsfairX-LLaMA3-RM-v0.1 + external_lib: ${actor_rollout_ref.model.external_lib} +- trust_remote_code: false ++ trust_remote_code: True + use_shm: false + use_remove_padding: false + use_fused_kernels: ${actor_rollout_ref.model.use_fused_kernels} +diff --git a/verl/trainer/config/critic/critic.yaml b/verl/trainer/config/critic/critic.yaml +index f201a34b..b4efa215 100644 +--- a/verl/trainer/config/critic/critic.yaml ++++ b/verl/trainer/config/critic/critic.yaml +@@ -47,7 +47,7 @@ model: + external_lib: ${oc.select:actor_rollout_ref.model.external_lib,null} + + # Whether to trust remote code from Hugging Face models +- trust_remote_code: ${oc.select:actor_rollout_ref.model.trust_remote_code,false} ++ trust_remote_code: True + + # PPO mini-batch size per update + ppo_mini_batch_size: ${oc.select:actor_rollout_ref.actor.ppo_mini_batch_size,256} +diff --git a/verl/trainer/config/data/legacy_data.yaml b/verl/trainer/config/data/legacy_data.yaml +index 028405b4..f73d0d82 100644 +--- a/verl/trainer/config/data/legacy_data.yaml ++++ b/verl/trainer/config/data/legacy_data.yaml +@@ -73,7 +73,7 @@ image_key: images + video_key: videos + + # If the remote tokenizer has a Python file, this flag determines whether to allow using it. +-trust_remote_code: False ++trust_remote_code: True + + # Optional: specify a custom dataset class path and name if overriding default loading behavior. + custom_cls: +diff --git a/verl/trainer/config/reward_model/reward_model.yaml b/verl/trainer/config/reward_model/reward_model.yaml +index 08ae37ac..1947fc90 100644 +--- a/verl/trainer/config/reward_model/reward_model.yaml ++++ b/verl/trainer/config/reward_model/reward_model.yaml +@@ -26,7 +26,7 @@ model: + external_lib: ${actor_rollout_ref.model.external_lib} + + # Whether to enable loading a remote code model, default to False +- trust_remote_code: False ++ trust_remote_code: True + + # [Deprecated] Global micro batch size + # will be deprecated, use micro_batch_size_per_gpu +diff --git a/verl/trainer/config/rollout/rollout.yaml b/verl/trainer/config/rollout/rollout.yaml +index 8622cb68..b32c99f0 100644 +--- a/verl/trainer/config/rollout/rollout.yaml ++++ b/verl/trainer/config/rollout/rollout.yaml +@@ -80,7 +80,7 @@ disable_log_stats: True + do_sample: True + + # number of responses (i.e. num sample times). > 1 for grpo +-n: 1 ++n: 8 + + # The over_sample_rate parameter controls the early termination threshold for training rollouts, + # where the system will abort remaining requests when (1 - over_sample_rate) * total_requests completions are reached. +diff --git a/verl/trainer/main_ppo.py b/verl/trainer/main_ppo.py +index 7ab01b45..1a67ea8e 100644 +--- a/verl/trainer/main_ppo.py ++++ b/verl/trainer/main_ppo.py +@@ -251,7 +251,7 @@ class TaskRunner: + # Instantiate the tokenizer and processor. + from verl.utils import hf_processor, hf_tokenizer + +- trust_remote_code = config.data.get("trust_remote_code", False) ++ trust_remote_code = True + tokenizer = hf_tokenizer(local_path, trust_remote_code=trust_remote_code) + # Used for multimodal LLM, could be None + processor = hf_processor(local_path, trust_remote_code=trust_remote_code, use_fast=True) +diff --git a/verl/utils/reward_score/geo3k.py b/verl/utils/reward_score/geo3k.py +index 8a850875..c687aff7 100644 +--- a/verl/utils/reward_score/geo3k.py ++++ b/verl/utils/reward_score/geo3k.py +@@ -17,7 +17,7 @@ from mathruler.grader import extract_boxed_content, grade_answer + + + def format_reward(predict_str: str) -> float: +- pattern = re.compile(r".*.*\\boxed\{.*\}.*", re.DOTALL) ++ pattern = re.compile(r".*\\boxed\{.*\}.*", re.DOTALL) + match_result = re.fullmatch(pattern, predict_str) + return 1.0 if match_result else 0.0 + +diff --git a/verl/workers/fsdp_workers.py b/verl/workers/fsdp_workers.py +index ce6f6ad6..7f33ad11 100644 +--- a/verl/workers/fsdp_workers.py ++++ b/verl/workers/fsdp_workers.py +@@ -343,7 +343,7 @@ class ActorRolloutRefWorker(Worker, DistProfilerExtension): + pretrained_model_name_or_path=local_path, + torch_dtype=torch_dtype, + config=actor_model_config, +- trust_remote_code=trust_remote_code, ++ trust_remote_code=True, + ) + + # Apply Liger kernel to the model if use_liger is set to True diff --git a/openseek/competition/pz/yuanboyang/requirementstest.txt b/openseek/competition/pz/yuanboyang/requirementstest.txt new file mode 100644 index 0000000..eed93ca --- /dev/null +++ b/openseek/competition/pz/yuanboyang/requirementstest.txt @@ -0,0 +1,161 @@ +aiohappyeyeballs==2.6.1 +aiohttp==3.12.15 +aiosignal==1.4.0 +airportsdata==20250811 +annotated-types==0.7.0 +anthropic==0.65.0 +antlr4-python3-runtime==4.11.1 +anyio==4.10.0 +asttokens==3.0.0 +async-timeout==5.0.1 +attrs==25.3.0 +blobfile==3.0.0 +build==1.3.0 +certifi==2025.8.3 +cffi==1.17.1 +charset-normalizer==3.4.3 +click==8.2.1 +cloudpickle==3.1.1 +compressed-tensors==0.11.0 +cuda-bindings==12.9.2 +cuda-pathfinder==1.2.1 +cuda-python==12.9.0 +datasets==4.0.0 +decorator==5.2.1 +decord==0.6.0 +dill==0.3.8 +diskcache==5.6.3 +distro==1.9.0 +einops==0.8.1 +exceptiongroup==1.3.0 +executing==2.2.1 +fastapi==0.116.1 +filelock==3.19.1 +flashinfer-python==0.2.14.post1 +frozendict==2.4.6 +frozenlist==1.7.0 +fsspec==2025.3.0 +h11==0.16.0 +hf-xet==1.1.9 +hf_transfer==0.1.9 +httpcore==1.0.9 +httpx==0.28.1 +huggingface-hub==0.34.4 +idna==3.10 +interegular==0.3.3 +ipython==8.37.0 +jedi==0.19.2 +Jinja2==3.1.6 +jiter==0.10.0 +joblib==1.5.2 +jsonschema==4.25.1 +jsonschema-specifications==2025.4.1 +lark==1.2.2 +-e git+https://github.com/FlagAI-Open/OpenSeek.git@00bdb7fc9e0a347111d4061d51d6af1842810b5f#egg=latex2sympy2&subdirectory=evaluation/qwen_eval/latex2sympy +latex2sympy2_extended==1.10.2 +llguidance==0.7.30 +lxml==6.0.1 +MarkupSafe==3.0.2 +math-verify==0.8.0 +matplotlib-inline==0.1.7 +modelscope==1.29.2 +mpmath==1.3.0 +msgspec==0.19.0 +multidict==6.6.4 +multiprocess==0.70.16 +nest-asyncio==1.6.0 +networkx==3.4.2 +ninja==1.13.0 +numpy==2.2.6 +nvidia-cublas-cu12==12.8.4.1 +nvidia-cuda-cupti-cu12==12.8.90 +nvidia-cuda-nvrtc-cu12==12.8.93 +nvidia-cuda-runtime-cu12==12.8.90 +nvidia-cudnn-cu12==9.10.2.21 +nvidia-cudnn-frontend==1.14.0 +nvidia-cufft-cu12==11.3.3.83 +nvidia-cufile-cu12==1.13.1.3 +nvidia-curand-cu12==10.3.9.90 +nvidia-cusolver-cu12==11.7.3.90 +nvidia-cusparse-cu12==12.5.8.93 +nvidia-cusparselt-cu12==0.7.1 +nvidia-ml-py==12.575.51 +nvidia-nccl-cu12==2.27.3 +nvidia-nvjitlink-cu12==12.8.93 +nvidia-nvtx-cu12==12.8.90 +openai==1.99.1 +openai-harmony==0.0.4 +orjson==3.11.3 +outlines==0.1.11 +outlines_core==0.1.26 +packaging==25.0 +pandas==2.3.2 +parso==0.8.5 +partial-json-parser==0.2.1.1.post6 +Pebble==5.1.3 +pexpect==4.9.0 +pillow==11.3.0 +prometheus_client==0.22.1 +prompt_toolkit==3.0.52 +propcache==0.3.2 +psutil==7.0.0 +ptyprocess==0.7.0 +pure_eval==0.2.3 +pyarrow==21.0.0 +pybase64==1.4.2 +pycountry==24.6.1 +pycparser==2.22 +pycryptodomex==3.23.0 +pydantic==2.11.7 +pydantic_core==2.33.2 +Pygments==2.19.2 +pynvml==12.0.0 +pyproject_hooks==1.2.0 +python-dateutil==2.9.0.post0 +python-multipart==0.0.20 +pytz==2025.2 +PyYAML==6.0.2 +pyzmq==27.0.2 +referencing==0.36.2 +regex==2025.9.1 +requests==2.32.5 +rpds-py==0.27.1 +safetensors==0.6.2 +scikit-learn==1.7.2 +scipy==1.15.3 +sentencepiece==0.2.1 +setproctitle==1.3.6 +sgl-kernel==0.3.7 +sglang==0.5.1.post3 +six==1.17.0 +sniffio==1.3.1 +soundfile==0.13.1 +stack-data==0.6.3 +starlette==0.47.3 +sympy==1.14.0 +threadpoolctl==3.6.0 +tiktoken==0.11.0 +timeout-decorator==0.5.0 +timm==1.0.16 +tokenizers==0.21.4 +tomli==2.2.1 +torch==2.8.0 +torch_memory_saver==0.0.8 +torchao==0.9.0 +torchaudio==2.8.0 +torchvision==0.23.0 +tqdm==4.67.1 +traitlets==5.14.3 +transformers==4.55.2 +triton==3.4.0 +typing-inspection==0.4.1 +typing_extensions==4.15.0 +tzdata==2025.2 +urllib3==2.5.0 +uvicorn==0.35.0 +uvloop==0.21.0 +wcwidth==0.2.13 +word2number==1.1 +xgrammar==0.1.23 +xxhash==3.5.0 +yarl==1.20.1 diff --git a/openseek/competition/pz/yuanboyang/requirementsverl.txt b/openseek/competition/pz/yuanboyang/requirementsverl.txt new file mode 100644 index 0000000..b7ef40e --- /dev/null +++ b/openseek/competition/pz/yuanboyang/requirementsverl.txt @@ -0,0 +1,278 @@ +absl-py==2.3.1 +accelerate==1.10.1 +addict==2.4.0 +aiohappyeyeballs==2.6.1 +aiohttp==3.12.15 +aiohttp-cors==0.8.1 +aiosignal==1.4.0 +airportsdata==20250811 +annotated-types==0.7.0 +anthropic==0.64.0 +antlr4-python3-runtime==4.9.3 +anyio==4.10.0 +astor==0.8.1 +asttokens==3.0.0 +async-timeout==5.0.1 +attrs==25.3.0 +autocommand==2.2.2 +av==15.1.0 +backports.tarfile==1.2.0 +blake3==1.0.5 +cachetools==5.5.2 +cbor2==5.7.0 +certifi==2025.8.3 +cffi==1.17.1 +cfgv==3.4.0 +charset-normalizer==3.4.3 +click==8.2.1 +cloudpickle==3.1.1 +codetiming==1.4.0 +colorful==0.5.7 +compressed-tensors==0.10.1 +cuda-bindings==13.0.1 +cuda-pathfinder==1.2.1 +cuda-python==13.0.1 +cupy-cuda12x==13.6.0 +datasets==3.6.0 +decorator==5.2.1 +decord==0.6.0 +Deprecated==1.2.18 +depyf==0.18.0 +dill==0.3.8 +diskcache==5.6.3 +distlib==0.4.0 +distro==1.9.0 +dnspython==2.7.0 +einops==0.8.1 +email-validator==2.3.0 +exceptiongroup==1.3.0 +executing==2.2.0 +fastapi==0.116.1 +fastapi-cli==0.0.10 +fastapi-cloud-cli==0.1.5 +fastrlock==0.8.3 +fastuuid==0.12.0 +filelock==3.19.1 +flash_attn==2.8.3 +flashinfer-python==0.2.3+cu124torch2.6 +frozenlist==1.7.0 +fsspec==2025.3.0 +gguf==0.17.1 +gitdb==4.0.12 +GitPython==3.1.45 +google-api-core==2.25.1 +google-auth==2.40.3 +googleapis-common-protos==1.70.0 +grpcio==1.74.0 +h11==0.16.0 +hf-xet==1.1.9 +hf_transfer==0.1.9 +httpcore==1.0.9 +httptools==0.6.4 +httpx==0.28.1 +huggingface-hub==0.34.4 +hydra-core==1.3.2 +identify==2.6.13 +idna==3.10 +importlib_metadata==8.0.0 +inflect==7.3.1 +iniconfig==2.1.0 +interegular==0.3.3 +ipython==8.37.0 +jaraco.collections==5.1.0 +jaraco.context==5.3.0 +jaraco.functools==4.0.1 +jaraco.text==3.12.1 +jedi==0.19.2 +Jinja2==3.1.6 +jiter==0.10.0 +jsonschema==4.25.1 +jsonschema-specifications==2025.4.1 +lark==1.2.2 +-e git+https://github.com/FlagAI-Open/OpenSeek.git@00bdb7fc9e0a347111d4061d51d6af1842810b5f#egg=latex2sympy2&subdirectory=evaluation/qwen_eval/latex2sympy +liger_kernel==0.6.2 +litellm==1.76.1 +llguidance==0.7.30 +llvmlite==0.44.0 +lm-format-enforcer==0.10.12 +Markdown==3.8.2 +markdown-it-py==4.0.0 +MarkupSafe==3.0.2 +mathruler==0.1.0 +matplotlib-inline==0.1.7 +mdurl==0.1.2 +mistral_common==1.8.4 +modelscope==1.29.1 +more-itertools==10.3.0 +mpmath==1.3.0 +msgpack==1.1.1 +msgspec==0.19.0 +multidict==6.6.4 +multiprocess==0.70.16 +nanobind==2.8.0 +nest-asyncio==1.6.0 +networkx==3.4.2 +ninja==1.13.0 +nodeenv==1.9.1 +numba==0.61.2 +numpy==1.26.4 +nvidia-cublas-cu11==11.11.3.6 +nvidia-cublas-cu12==12.6.4.1 +nvidia-cuda-cupti-cu11==11.8.87 +nvidia-cuda-cupti-cu12==12.6.80 +nvidia-cuda-nvrtc-cu11==11.8.89 +nvidia-cuda-nvrtc-cu12==12.6.77 +nvidia-cuda-runtime-cu11==11.8.89 +nvidia-cuda-runtime-cu12==12.6.77 +nvidia-cudnn-cu11==9.1.0.70 +nvidia-cudnn-cu12==9.5.1.17 +nvidia-cufft-cu11==10.9.0.58 +nvidia-cufft-cu12==11.3.0.4 +nvidia-cufile-cu12==1.11.1.6 +nvidia-curand-cu11==10.3.0.86 +nvidia-curand-cu12==10.3.7.77 +nvidia-cusolver-cu11==11.4.1.48 +nvidia-cusolver-cu12==11.7.1.2 +nvidia-cusparse-cu11==11.7.5.86 +nvidia-cusparse-cu12==12.5.4.2 +nvidia-cusparselt-cu12==0.6.3 +nvidia-ml-py==12.575.51 +nvidia-nccl-cu11==2.21.5 +nvidia-nccl-cu12==2.26.2 +nvidia-nvjitlink-cu12==12.6.85 +nvidia-nvtx-cu11==11.8.86 +nvidia-nvtx-cu12==12.6.77 +omegaconf==2.3.0 +openai==1.102.0 +openai-harmony==0.0.4 +opencensus==0.11.4 +opencensus-context==0.1.3 +opencv-fixer==0.2.5 +opencv-python==4.12.0.88 +opencv-python-headless==4.11.0.86 +opentelemetry-api==1.26.0 +opentelemetry-exporter-otlp==1.26.0 +opentelemetry-exporter-otlp-proto-common==1.26.0 +opentelemetry-exporter-otlp-proto-grpc==1.26.0 +opentelemetry-exporter-otlp-proto-http==1.26.0 +opentelemetry-exporter-prometheus==0.47b0 +opentelemetry-proto==1.26.0 +opentelemetry-sdk==1.26.0 +opentelemetry-semantic-conventions==0.47b0 +opentelemetry-semantic-conventions-ai==0.4.13 +optree==0.17.0 +orjson==3.11.3 +outlines==0.1.11 +outlines_core==0.1.26 +packaging==25.0 +pandas==2.3.2 +parso==0.8.5 +partial-json-parser==0.2.1.1.post6 +peft==0.17.1 +pexpect==4.9.0 +pillow==11.3.0 +platformdirs==4.4.0 +pluggy==1.6.0 +pre_commit==4.3.0 +prometheus-fastapi-instrumentator==7.1.0 +prometheus_client==0.22.1 +prompt_toolkit==3.0.52 +propcache==0.3.2 +proto-plus==1.26.1 +protobuf==4.25.8 +psutil==7.0.0 +ptyprocess==0.7.0 +pure_eval==0.2.3 +py-cpuinfo==9.0.0 +py-spy==0.4.1 +pyarrow==21.0.0 +pyasn1==0.6.1 +pyasn1_modules==0.4.2 +pybase64==1.4.2 +pybind11==3.0.1 +pycountry==24.6.1 +pycparser==2.22 +pydantic==2.11.7 +pydantic-extra-types==2.10.5 +pydantic_core==2.33.2 +pyext==0.7 +Pygments==2.19.2 +pylatexenc==2.10 +pynvml==12.0.0 +pytest==8.4.1 +python-dateutil==2.9.0.post0 +python-dotenv==1.1.1 +python-json-logger==3.3.0 +python-multipart==0.0.20 +pytz==2025.2 +pyvers==0.1.0 +PyYAML==6.0.2 +pyzmq==27.0.2 +qwen-vl-utils==0.0.11 +ray==2.47.1 +referencing==0.36.2 +regex==2025.8.29 +requests==2.32.5 +rich==14.1.0 +rich-toolkit==0.15.0 +rignore==0.6.4 +rpds-py==0.27.1 +rsa==4.9.1 +ruff==0.12.11 +safetensors==0.6.2 +scipy==1.15.3 +sentencepiece==0.2.1 +sentry-sdk==2.35.1 +setproctitle==1.3.6 +sgl-kernel==0.1.0 +sglang==0.4.6.post1 +shellingham==1.5.4 +simplejson==3.20.1 +six==1.17.0 +smart_open==7.3.0.post1 +smmap==5.0.2 +sniffio==1.3.1 +sortedcontainers==2.4.0 +soundfile==0.13.1 +soxr==0.5.0.post1 +stack-data==0.6.3 +starlette==0.47.3 +sympy==1.14.0 +tensorboard==2.20.0 +tensorboard-data-server==0.7.2 +tensordict==0.9.1 +tiktoken==0.11.0 +tokenizers==0.21.4 +tomli==2.2.1 +torch==2.7.0 +torch_memory_saver==0.0.8 +torchao==0.12.0 +torchaudio==2.7.0 +torchdata==0.11.0 +torchvision==0.22.0 +tqdm==4.67.1 +traitlets==5.14.3 +transformers==4.52.4 +triton==3.3.0 +typeguard==4.3.0 +typer==0.17.3 +typing-inspection==0.4.1 +typing_extensions==4.15.0 +tzdata==2025.2 +urllib3==2.5.0 +uvicorn==0.35.0 +uvloop==0.21.0 +-e git+https://github.com/volcengine/verl.git@c780fc34b45e01a1538d6386947585d4f7370bef#egg=verl +virtualenv==20.34.0 +vllm==0.9.1 +wandb==0.21.3 +watchfiles==1.1.0 +wcwidth==0.2.13 +websockets==15.0.1 +Werkzeug==3.1.3 +wrapt==1.17.3 +xformers==0.0.30 +xgrammar==0.1.19 +xxhash==3.5.0 +yarl==1.20.1 +zipp==3.23.0 diff --git a/openseek/competition/pz/yuanboyang/verl/examples/data_preprocess/gsm8k.py b/openseek/competition/pz/yuanboyang/verl/examples/data_preprocess/gsm8k.py new file mode 100644 index 0000000..936cccc --- /dev/null +++ b/openseek/competition/pz/yuanboyang/verl/examples/data_preprocess/gsm8k.py @@ -0,0 +1,91 @@ +# Copyright 2024 Bytedance Ltd. and/or its affiliates +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Preprocess the GSM8k dataset to parquet format +""" + +import argparse +import os +import re + +import datasets + +from verl.utils.hdfs_io import copy, makedirs +from modelscope.msdatasets import MsDataset + +def extract_solution(solution_str): + solution = re.search("#### (\\-?[0-9\\.\\,]+)", solution_str) + assert solution is not None + final_solution = solution.group(0) + final_solution = final_solution.split("#### ")[1].replace(",", "") + return final_solution + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--local_dir", default="~/data/gsm8k") + parser.add_argument("--hdfs_dir", default=None) + + args = parser.parse_args() + + data_source = "hiyouga/geometry3k" + + train_dataset = MsDataset.load('modelscope/gsm8k', subset_name='main', split='train',trust_remote_code=True) + test_dataset = MsDataset.load('modelscope/gsm8k', subset_name='main', split='test',trust_remote_code=True) + + instruction_following = instruction = r'Please reason step by step,and must put your final answer within \boxed{}.Question:' + + # add a row to each data item that represents a unique id + def make_map_fn(split): + def process_fn(example, idx): + question_raw = example.pop("question") + # 使用新的 prompt 模板 + question = instruction+ " " + question_raw + + answer_raw = example.pop("answer") + solution = extract_solution(answer_raw) + data = { + "data_source": data_source, + "prompt": [ + { + "role": "user", + "content": question, + } + ], + "ability": "math", + "reward_model": {"style": "rule", "ground_truth": solution}, + "extra_info": { + "split": split, + "index": idx, + "answer": answer_raw, + "question": question_raw, + }, + } + return data + + return process_fn + + train_dataset = train_dataset.map(function=make_map_fn("train"), with_indices=True) + test_dataset = test_dataset.map(function=make_map_fn("test"), with_indices=True) + + local_dir = args.local_dir + hdfs_dir = args.hdfs_dir + + train_dataset.to_parquet(os.path.join(local_dir, "train.parquet")) + test_dataset.to_parquet(os.path.join(local_dir, "test.parquet")) + + if hdfs_dir is not None: + makedirs(hdfs_dir) + + copy(src=local_dir, dst=hdfs_dir) diff --git a/openseek/competition/pz/yuanboyang/verl/verl/model_merger/base_model_merger.py b/openseek/competition/pz/yuanboyang/verl/verl/model_merger/base_model_merger.py new file mode 100644 index 0000000..34736ed --- /dev/null +++ b/openseek/competition/pz/yuanboyang/verl/verl/model_merger/base_model_merger.py @@ -0,0 +1,362 @@ +# Copyright 2024 Bytedance Ltd. and/or its affiliates +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import os +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from typing import Optional + +import torch +from accelerate import init_empty_weights +from transformers import ( + AutoConfig, + AutoModelForCausalLM, + AutoModelForTokenClassification, + AutoModelForVision2Seq, + GenerationConfig, +) + +from verl.utils import hf_processor, hf_tokenizer + + +def parse_args(): + parser = argparse.ArgumentParser(description="verl model merger") + subparsers = parser.add_subparsers(dest="operation", required=True, help="Specify 'merge' or 'test' operation.") + + base_op_parser = argparse.ArgumentParser(add_help=False) + base_op_parser.add_argument( + "--backend", type=str, required=True, choices=["fsdp", "megatron"], help="The backend of the model" + ) + base_op_parser.add_argument("--local_dir", type=str, default=None, help="Path to the saved model checkpoints.") + base_op_parser.add_argument( + "--tie-word-embedding", + action="store_true", + help="Whether to tie word embedding weights (currently only Megatron supported)", + ) + base_op_parser.add_argument("--trust-remote-code", action="store_true", help="Whether to trust remote code") + base_op_parser.add_argument( + "--is-value-model", + action="store_true", + help="Whether the model is a value model (currently only Megatron supported)", + ) + base_op_parser.add_argument( + "--use_cpu_initialization", + action="store_true", + help="Whether to use CPU initialization for the model. This is useful for large models that cannot " + "fit into GPU memory during initialization.", + ) + + merge_parser = subparsers.add_parser("merge", parents=[base_op_parser], help="Merge model checkpoints and save.") + merge_parser.add_argument( + "--target_dir", default="tmp", type=str, help="Directory to save the merged huggingface model" + ) + merge_parser.add_argument( + "--hf_upload_path", default=None, type=str, help="Hugging Face repository ID to upload the model" + ) + merge_parser.add_argument( + "--private", action="store_true", help="Whether to upload the model to a private Hugging Face repository" + ) + + test_parser = subparsers.add_parser( + "test", parents=[base_op_parser], help="Test merged model against a reference Hugging Face model" + ) + test_parser.add_argument( + "--test_hf_dir", type=str, required=True, help="Path to the reference Hugging Face model directory for testing" + ) + + args = parser.parse_args() + return args + + +@dataclass +class ModelMergerConfig: + """Configuration for model merger operations. + + Args: + operation (str): Operation type - 'merge' or 'test'. + backend (str): Backend type for the model ('fsdp' or 'megatron'). + target_dir (Optional[str]): Directory to save the merged huggingface model. Defaults to "tmp". + hf_upload_path (Optional[str]): Hugging Face repository ID to upload the model. Defaults to None. + private (bool): Whether to upload the model to a private Hugging Face repository. Defaults to False. + test_hf_dir (Optional[str]): Path to the reference Hugging Face model directory for testing. Defaults to None. + tie_word_embedding (bool): Whether to tie word embedding weights (currently only Megatron + supported). Defaults to False. + trust_remote_code (bool): Whether to trust remote code. Defaults to False. + is_value_model (bool): Whether the model is a value model (currently only Megatron + supported). Defaults to False. + local_dir (Optional[str]): Path to the saved model checkpoints. Defaults to None. + hf_model_config_path (Optional[str]): Path to HuggingFace model configuration files. Defaults to None. + hf_upload (bool): Whether to upload to HuggingFace (computed automatically). Not for initialization. + use_cpu_initialization (bool): Whether to use CPU initialization for large models. Defaults to False. + """ + + operation: str # 'merge' or 'test' + backend: str + target_dir: Optional[str] = "tmp" + hf_upload_path: Optional[str] = None + private: bool = False + test_hf_dir: Optional[str] = None + tie_word_embedding: bool = False + trust_remote_code: bool = False + is_value_model: bool = False + local_dir: Optional[str] = None + hf_model_config_path: Optional[str] = None + hf_upload: bool = field(init=False) + use_cpu_initialization: bool = False + + def __post_init__(self): + self.hf_upload = self.operation == "merge" and bool(self.hf_upload_path) + if self.operation == "test": + self.target_dir = None + self.hf_upload_path = None + self.private = False + + +def generate_config_from_args(args: argparse.Namespace) -> ModelMergerConfig: + common_config_args = { + "operation": args.operation, + "backend": args.backend, + "tie_word_embedding": args.tie_word_embedding, + "trust_remote_code": args.trust_remote_code, + "is_value_model": args.is_value_model, + "local_dir": args.local_dir, + "hf_model_config_path": os.path.join(args.local_dir, "huggingface"), + "use_cpu_initialization": args.use_cpu_initialization, + } + + if args.operation == "merge": + config = ModelMergerConfig( + **common_config_args, + target_dir=args.target_dir, + hf_upload_path=args.hf_upload_path, + private=args.private, + test_hf_dir=None, + ) + os.makedirs(config.target_dir, exist_ok=True) + elif args.operation == "test": + config = ModelMergerConfig( + **common_config_args, + test_hf_dir=args.test_hf_dir, + # the following args are not used by test operation + target_dir=None, + hf_upload_path=None, + private=False, + ) + else: + raise NotImplementedError(f"Unknown operation: {args.operation}") + return config + + +class BaseModelMerger(ABC): + """ + Abstract base class for merging distributed model checkpoints into HuggingFace format. + + This class provides common functionality for converting model checkpoints from different + distributed training backends (FSDP, Megatron) into standard HuggingFace format that + can be easily loaded and used for inference or further training. + + The merger supports two main operations: + - merge: Convert and save checkpoints to HuggingFace format + - test: Validate merged checkpoints against a reference model + + Args: + config (ModelMergerConfig): Configuration object containing paths, backend type, + and operation parameters. + + Attributes: + config (ModelMergerConfig): The configuration object passed during initialization. + hf_model_config_path (str): Path to the HuggingFace model configuration files. + model_config (PretrainedConfig): Loaded HuggingFace model configuration. + """ + + def __init__(self, config: ModelMergerConfig): + self.config = config + self.hf_model_config_path = config.hf_model_config_path + self.model_config = AutoConfig.from_pretrained( + self.hf_model_config_path, trust_remote_code=self.config.trust_remote_code + ) + + def get_transformers_auto_model_class(self): + has_remote_code = hasattr(self.model_config, "auto_map") and any( + self.model_config.architectures[0] in val for val in self.model_config.auto_map.values() + ) + if has_remote_code: + auto_class = next( + k for k, v in self.model_config.auto_map.items() if self.model_config.architectures[0] in v + ) + match auto_class: + case "AutoModelForCausalLM": + return AutoModelForCausalLM + case "AutoModelForTokenClassification": + return AutoModelForTokenClassification + case "AutoModelForVision2Seq": + return AutoModelForVision2Seq + case _: + raise NotImplementedError(f"Unknown auto class {auto_class}") + else: + if "ForTokenClassification" in self.model_config.architectures[0]: + return AutoModelForTokenClassification + elif "ForCausalLM" in self.model_config.architectures[0]: + return AutoModelForCausalLM + elif "ForConditionalGeneration" in self.model_config.architectures[0]: + return AutoModelForVision2Seq + + raise NotImplementedError(f"Unknown architecture {self.model_config.architectures}") + + def patch_model_generation_config(self, model): + """ + The generation_config created from model config may be different to the pretrained model, + this may lead to error when generating: https://github.com/volcengine/verl/issues/1246 + + This function patch the generation_config created from model config to the pretrained model. + """ + if model.can_generate(): + try: + model.generation_config = GenerationConfig.from_pretrained(self.hf_model_config_path) + except OSError: + print( + f"Warning: Generation config file not found in {self.hf_model_config_path}, using a " + f"generation config created from the model config." + ) + return model + + def save_lora_adapter(self, state_dict: dict[str, torch.Tensor]): + """ + Save lora adapter to safetensors. + + Returns: + lora_path: str, the path to the lora adapter. None if no lora adapter found. + + Note: + This function change the 'state_dict' in place. + """ + lora_params_names = [name for name in state_dict.keys() if "lora_" in name] + + if len(lora_params_names) == 0: + return None + + import json + from typing import OrderedDict + + import peft + from safetensors.torch import save_file + + lora_params = OrderedDict() + target_modules = set() + lora_key = None + + for name in lora_params_names: + lora_key = name.replace(".default.weight", ".weight") + target_modules.add(lora_key.split(".")[-3]) + lora_params[lora_key] = state_dict.pop(name) + + lora_rank = min(lora_params[lora_key].shape[0], lora_params[lora_key].shape[1]) + peft_dict = { + "r": lora_rank, + "lora_alpha": 0, # lora_alpha is not set. An error should be raised to inform the user to set it manually. + "target_modules": list(target_modules), + } + peft_config = peft.LoraConfig(**peft_dict).to_dict() + peft_config["task_type"] = peft_config["task_type"].value if peft_config["task_type"] else None + peft_config["peft_type"] = peft_config["peft_type"].value if peft_config["peft_type"] else None + peft_config["target_modules"] = list(peft_config["target_modules"]) + + lora_path = os.path.join(self.config.target_dir, "lora_adapter") + os.makedirs(lora_path, exist_ok=True) + with open(os.path.join(lora_path, "adapter_config.json"), "w", encoding="utf-8") as f: + json.dump(peft_config, f, ensure_ascii=False, indent=4) + save_file(lora_params, os.path.join(lora_path, "adapter_model.safetensors")) + + for name in list(state_dict.keys()): + key = ( + name.replace("base_model.model.", "") + .replace(".base_layer.weight", ".weight") + .replace(".base_layer.bias", ".bias") + ) + state_dict[key] = state_dict.pop(name) + + return lora_path + + def save_hf_model_and_tokenizer(self, state_dict: dict[str, torch.Tensor]): + auto_model_class = self.get_transformers_auto_model_class() + with init_empty_weights(): + model = auto_model_class.from_config( + self.model_config, torch_dtype=torch.bfloat16, trust_remote_code=True + ) + model.to_empty(device="cpu") + model = self.patch_model_generation_config(model) + + lora_path = self.save_lora_adapter(state_dict) + if lora_path: + print(f"Saving lora adapter to {lora_path}") + + print(f"Saving model to {self.config.target_dir}") + model.save_pretrained(self.config.target_dir, state_dict=state_dict) + del state_dict + del model + + processor = hf_processor(self.hf_model_config_path, trust_remote_code=self.config.trust_remote_code) + tokenizer = hf_tokenizer(self.hf_model_config_path, trust_remote_code=self.config.trust_remote_code) + if processor is not None: + print(f"Saving processor to {self.config.target_dir}") + processor.save_pretrained(self.config.target_dir) + if tokenizer is not None: + print(f"Saving tokenizer to {self.config.target_dir}") + tokenizer.save_pretrained(self.config.target_dir) + + def upload_to_huggingface(self): + import requests + from huggingface_hub import HfApi + from huggingface_hub.utils import HfHubHTTPError, RepositoryNotFoundError + + api = HfApi() + try: + # Attempt to create repository + api.create_repo(repo_id=self.config.hf_upload_path, private=self.config.private, exist_ok=True) + except HfHubHTTPError as e: + # Handle authentication/API errors + if e.response.status_code == 401: + raise PermissionError( + "Hugging Face authentication failed. Verify your token is valid and has write permissions." + ) from e + elif e.response.status_code == 404: + raise RepositoryNotFoundError(f"Repository path not found: {self.config.hf_upload_path}") from e + else: + raise ConnectionError(f"Failed to create repository ({e.response.status_code}): {e}") from e + except requests.exceptions.ConnectionError as e: + raise ConnectionError("Network connection failed. Check your internet connection.") from e + + try: + # Attempt folder upload + api.upload_folder(folder_path=self.config.target_dir, repo_id=self.config.hf_upload_path, repo_type="model") + except HfHubHTTPError as e: + if e.response.status_code == 401: + raise PermissionError("Authentication failed during upload. Token may have expired.") from e + else: + raise RuntimeError(f"Upload failed ({e.response.status_code}): {e}") from e + except requests.exceptions.ConnectionError as e: + raise ConnectionError("Network interruption during upload. Try again with stable connection.") from e + except OSError as e: + raise FileNotFoundError(f"Local folder error: {self.config.target_dir} - {str(e)}") from e + except Exception as e: + raise RuntimeError(f"Unexpected error during upload: {str(e)}") from e + + @abstractmethod + def merge_and_save(self): + raise NotImplementedError("Subclasses should implement this method") + + @abstractmethod + def cleanup(self): + raise NotImplementedError("Subclasses should implement this method to clean up resources if needed") diff --git a/openseek/competition/pz/yuanboyang/verl/verl/trainer/main_ppo.py b/openseek/competition/pz/yuanboyang/verl/verl/trainer/main_ppo.py new file mode 100644 index 0000000..632b14b --- /dev/null +++ b/openseek/competition/pz/yuanboyang/verl/verl/trainer/main_ppo.py @@ -0,0 +1,390 @@ +# Copyright 2024 Bytedance Ltd. and/or its affiliates +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Note that we don't combine the main with ray_trainer as ray_trainer is used by other main. +""" + +import os +import socket + +import hydra +import ray +from omegaconf import OmegaConf + +from verl.experimental.dataset.sampler import AbstractSampler +from verl.trainer.constants_ppo import get_ppo_ray_runtime_env +from verl.trainer.ppo.ray_trainer import RayPPOTrainer +from verl.trainer.ppo.reward import load_reward_manager +from verl.trainer.ppo.utils import need_critic, need_reference_policy +from verl.utils.config import validate_config +from verl.utils.device import is_cuda_available +from verl.utils.import_utils import load_extern_type + + +@hydra.main(config_path="config", config_name="ppo_trainer", version_base=None) +def main(config): + """Main entry point for PPO training with Hydra configuration management. + + Args: + config_dict: Hydra configuration dictionary containing training parameters. + """ + run_ppo(config) + + +# Define a function to run the PPO-like training process +def run_ppo(config) -> None: + """Initialize Ray cluster and run distributed PPO training process. + + Args: + config: Training configuration object containing all necessary parameters + for distributed PPO training including Ray initialization settings, + model paths, and training hyperparameters. + """ + # Check if Ray is not initialized + if not ray.is_initialized(): + # Initialize Ray with a local cluster configuration + # Set environment variables in the runtime environment to control tokenizer parallelism, + # NCCL debug level, VLLM logging level, and allow runtime LoRA updating + # `num_cpus` specifies the number of CPU cores Ray can use, obtained from the configuration + default_runtime_env = get_ppo_ray_runtime_env() + ray_init_kwargs = config.ray_kwargs.get("ray_init", {}) + runtime_env_kwargs = ray_init_kwargs.get("runtime_env", {}) + runtime_env = OmegaConf.merge(default_runtime_env, runtime_env_kwargs) + ray_init_kwargs = OmegaConf.create({**ray_init_kwargs, "runtime_env": runtime_env}) + print(f"ray init kwargs: {ray_init_kwargs}") + ray.init(**OmegaConf.to_container(ray_init_kwargs)) + + # Create a remote instance of the TaskRunner class, and + # Execute the `run` method of the TaskRunner instance remotely and wait for it to complete + if ( + is_cuda_available + and config.global_profiler.tool == "nsys" + and config.global_profiler.get("steps") is not None + and len(config.global_profiler.get("steps", [])) > 0 + ): + from verl.utils.import_utils import is_nvtx_available + + assert is_nvtx_available(), "nvtx is not available in CUDA platform. Please 'pip3 install nvtx'" + nsight_options = OmegaConf.to_container( + config.global_profiler.global_tool_config.nsys.controller_nsight_options + ) + runner = TaskRunner.options(runtime_env={"nsight": nsight_options}).remote() + else: + runner = TaskRunner.remote() + ray.get(runner.run.remote(config)) + + # [Optional] get the path of the timeline trace file from the configuration, default to None + # This file is used for performance analysis + timeline_json_file = config.ray_kwargs.get("timeline_json_file", None) + if timeline_json_file: + ray.timeline(filename=timeline_json_file) + + +@ray.remote(num_cpus=1) # please make sure main_task is not scheduled on head +class TaskRunner: + """Ray remote class for executing distributed PPO training tasks. + + This class encapsulates the main training logic and runs as a Ray remote actor + to enable distributed execution across multiple nodes and GPUs. + + Attributes: + role_worker_mapping: Dictionary mapping Role enums to Ray remote worker classes + mapping: Dictionary mapping Role enums to resource pool IDs for GPU allocation + """ + + def __init__(self): + self.role_worker_mapping = {} + self.mapping = {} + + def add_actor_rollout_worker(self, config): + """Add actor rollout worker based on the actor strategy.""" + from verl.single_controller.ray import RayWorkerGroup + + if config.actor_rollout_ref.actor.strategy in {"fsdp", "fsdp2"}: + from verl.workers.fsdp_workers import ActorRolloutRefWorker, AsyncActorRolloutRefWorker + + actor_rollout_cls = ( + AsyncActorRolloutRefWorker + if config.actor_rollout_ref.rollout.mode == "async" + else ActorRolloutRefWorker + ) + ray_worker_group_cls = RayWorkerGroup + + elif config.actor_rollout_ref.actor.strategy == "megatron": + from verl.workers.megatron_workers import ActorRolloutRefWorker, AsyncActorRolloutRefWorker + + actor_rollout_cls = ( + AsyncActorRolloutRefWorker + if config.actor_rollout_ref.rollout.mode == "async" + else ActorRolloutRefWorker + ) + ray_worker_group_cls = RayWorkerGroup + + else: + raise NotImplementedError + + from verl.trainer.ppo.ray_trainer import Role + + self.role_worker_mapping[Role.ActorRollout] = ray.remote(actor_rollout_cls) + + return actor_rollout_cls, ray_worker_group_cls + + def add_critic_worker(self, config): + """Add critic worker to role mapping.""" + if config.critic.strategy in {"fsdp", "fsdp2"}: + use_legacy_worker_impl = config.trainer.get("use_legacy_worker_impl", "auto") + if use_legacy_worker_impl in ["auto", "enable"]: + from verl.workers.fsdp_workers import CriticWorker + elif use_legacy_worker_impl == "disable": + from verl.workers.roles import CriticWorker + + print("Using new worker implementation") + else: + raise ValueError(f"Invalid use_legacy_worker_impl: {use_legacy_worker_impl}") + + elif config.critic.strategy == "megatron": + from verl.workers.megatron_workers import CriticWorker + + else: + raise NotImplementedError + + from verl.trainer.ppo.ray_trainer import Role + + self.role_worker_mapping[Role.Critic] = ray.remote(CriticWorker) + + def init_resource_pool_mgr(self, config): + """Initialize resource pool manager.""" + from verl.trainer.ppo.ray_trainer import Role + + global_pool_id = "global_pool" + resource_pool_spec = { + global_pool_id: [config.trainer.n_gpus_per_node] * config.trainer.nnodes, + } + self.mapping[Role.ActorRollout] = global_pool_id + self.mapping[Role.Critic] = global_pool_id + from verl.trainer.ppo.ray_trainer import ResourcePoolManager + + resource_pool_manager = ResourcePoolManager(resource_pool_spec=resource_pool_spec, mapping=self.mapping) + return resource_pool_manager + + def add_reward_model_worker(self, config): + """Add reward model worker if enabled.""" + from verl.trainer.ppo.ray_trainer import Role + + if config.reward_model.enable: + if config.reward_model.strategy in {"fsdp", "fsdp2"}: + from verl.workers.fsdp_workers import RewardModelWorker + elif config.reward_model.strategy == "megatron": + from verl.workers.megatron_workers import RewardModelWorker + else: + raise NotImplementedError + self.role_worker_mapping[Role.RewardModel] = ray.remote(RewardModelWorker) + self.mapping[Role.RewardModel] = "global_pool" + + def add_ref_policy_worker(self, config, ref_policy_cls): + """Add reference policy worker if KL loss or KL reward is used.""" + from verl.trainer.ppo.ray_trainer import Role + + if config.algorithm.use_kl_in_reward or config.actor_rollout_ref.actor.use_kl_loss: + self.role_worker_mapping[Role.RefPolicy] = ray.remote(ref_policy_cls) + self.mapping[Role.RefPolicy] = "global_pool" + + def run(self, config): + """Execute the main PPO training workflow. + + This method sets up the distributed training environment, initializes + workers, datasets, and reward functions, then starts the training process. + + Args: + config: Training configuration object containing all parameters needed + for setting up and running the PPO training process. + """ + # Print the initial configuration. `resolve=True` will evaluate symbolic values. + from pprint import pprint + + from omegaconf import OmegaConf + + from verl.utils.fs import copy_to_local + + print(f"TaskRunner hostname: {socket.gethostname()}, PID: {os.getpid()}") + pprint(OmegaConf.to_container(config, resolve=True)) + OmegaConf.resolve(config) + + actor_rollout_cls, ray_worker_group_cls = self.add_actor_rollout_worker(config) + self.add_critic_worker(config) + + # We should adopt a multi-source reward function here: + # - for rule-based rm, we directly call a reward score + # - for model-based rm, we call a model + # - for code related prompt, we send to a sandbox if there are test cases + # finally, we combine all the rewards together + # The reward type depends on the tag of the data + self.add_reward_model_worker(config) + + # Add a reference policy worker if KL loss or KL reward is used. + self.add_ref_policy_worker(config, actor_rollout_cls) + + # validate config + validate_config( + config=config, + use_reference_policy=need_reference_policy(self.role_worker_mapping), + use_critic=need_critic(config), + ) + + # Download the checkpoint from HDFS to the local machine. + # `use_shm` determines whether to use shared memory, which could lead to faster model loading if turned on + local_path = copy_to_local( + config.actor_rollout_ref.model.path, use_shm=config.actor_rollout_ref.model.get("use_shm", False) + ) + + # Instantiate the tokenizer and processor. + from verl.utils import hf_processor, hf_tokenizer + + trust_remote_code = True + tokenizer = hf_tokenizer(local_path, trust_remote_code=trust_remote_code) + # Used for multimodal LLM, could be None + processor = hf_processor(local_path, trust_remote_code=trust_remote_code, use_fast=True) + + # Load the reward manager for training and validation. + reward_fn = load_reward_manager( + config, tokenizer, num_examine=0, **config.reward_model.get("reward_kwargs", {}) + ) + val_reward_fn = load_reward_manager( + config, tokenizer, num_examine=1, **config.reward_model.get("reward_kwargs", {}) + ) + + resource_pool_manager = self.init_resource_pool_mgr(config) + + from verl.utils.dataset.rl_dataset import collate_fn + + # Create training and validation datasets. + train_dataset = create_rl_dataset(config.data.train_files, config.data, tokenizer, processor, is_train=True) + val_dataset = create_rl_dataset(config.data.val_files, config.data, tokenizer, processor, is_train=False) + train_sampler = create_rl_sampler(config.data, train_dataset) + + # Initialize the PPO trainer. + trainer = RayPPOTrainer( + config=config, + tokenizer=tokenizer, + processor=processor, + role_worker_mapping=self.role_worker_mapping, + resource_pool_manager=resource_pool_manager, + ray_worker_group_cls=ray_worker_group_cls, + reward_fn=reward_fn, + val_reward_fn=val_reward_fn, + train_dataset=train_dataset, + val_dataset=val_dataset, + collate_fn=collate_fn, + train_sampler=train_sampler, + ) + # Initialize the workers of the trainer. + trainer.init_workers() + # Start the training process. + trainer.fit() + + +def create_rl_dataset(data_paths, data_config, tokenizer, processor, is_train=True): + """Create a dataset. + + Arguments: + data_paths: List of paths to data files. + data_config: The data config. + tokenizer (Tokenizer): The tokenizer. + processor (Processor): The processor. + + Returns: + dataset (Dataset): The dataset. + """ + from torch.utils.data import Dataset + + from verl.utils.dataset.rl_dataset import RLHFDataset + + # Check if a custom dataset class is specified in the data configuration + # and if the path to the custom class is provided + if "custom_cls" in data_config and data_config.custom_cls.get("path", None) is not None: + # Dynamically load the custom dataset class + dataset_cls = load_extern_type(data_config.custom_cls.path, data_config.custom_cls.name) + # Verify that the custom dataset class inherits from torch.utils.data.Dataset + if not issubclass(dataset_cls, Dataset): + raise TypeError( + f"The custom dataset class '{data_config.custom_cls.name}' from " + f"'{data_config.custom_cls.path}' must inherit from torch.utils.data.Dataset" + ) + elif "datagen" in data_config and data_config.datagen.get("path", None) is not None and is_train: + # If a data generation strategy is specified, use the DynamicGenDataset class + from verl.utils.dataset.dynamicgen_dataset import DynamicGenDataset + + dataset_cls = DynamicGenDataset + print("Using DynamicGenDataset for data generation.") + + else: + # Use the default RLHFDataset class if no custom class is specified + dataset_cls = RLHFDataset + print(f"Using dataset class: {dataset_cls.__name__}") + + # Instantiate the dataset using the determined dataset class + dataset = dataset_cls( + data_files=data_paths, + tokenizer=tokenizer, + processor=processor, + config=data_config, + ) + + return dataset + + +def create_rl_sampler(data_config, dataset): + """Create a sampler for the dataset. + + Arguments: + data_config: The data config. + dataset (Dataset): The dataset. + + Returns: + sampler (Sampler): The sampler. + """ + import torch + from torch.utils.data import RandomSampler, SequentialSampler + + if data_config.sampler is not None and data_config.sampler.get("class_path", None) is not None: + curriculum_class = load_extern_type( + data_config.sampler.class_path, + data_config.sampler.class_name, + ) + sampler = curriculum_class( + data_source=dataset, + data_config=data_config, + ) + assert isinstance(sampler, AbstractSampler) + assert data_config.get("dataloader_num_workers", 8) == 0, ( + "If using curriculum, num_workers must be 0 to prevent data caching. " + "If the dataloader caches data before the batch is done the " + "curriculum sampler won't have the opportunity to reorder it. " + ) + + # Use a sampler to facilitate checkpoint resumption. + # If shuffling is enabled in the data configuration, create a random sampler. + elif data_config.shuffle: + train_dataloader_generator = torch.Generator() + train_dataloader_generator.manual_seed(data_config.get("seed", 1)) + sampler = RandomSampler(data_source=dataset, generator=train_dataloader_generator) + else: + # If shuffling is disabled, use a sequential sampler to iterate through the dataset in order. + sampler = SequentialSampler(data_source=dataset) + + return sampler + + +if __name__ == "__main__": + main() diff --git a/openseek/competition/pz/yuanboyang/verl/verl/utils/reward_score/geo3k.py b/openseek/competition/pz/yuanboyang/verl/verl/utils/reward_score/geo3k.py new file mode 100644 index 0000000..43cd2b3 --- /dev/null +++ b/openseek/competition/pz/yuanboyang/verl/verl/utils/reward_score/geo3k.py @@ -0,0 +1,36 @@ +# Copyright 2024 Bytedance Ltd. and/or its affiliates +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import re + +from mathruler.grader import extract_boxed_content, grade_answer + + +def format_reward(predict_str: str) -> float: + pattern = re.compile(r".*\\boxed\{.*\}.*", re.DOTALL) + match_result = re.fullmatch(pattern, predict_str) + return 1.0 if match_result else 0.0 + + +def acc_reward(predict_str: str, ground_truth: str, use_boxed: bool = True) -> float: + if use_boxed: + answer = extract_boxed_content(predict_str) + else: + answer = predict_str + return 1.0 if grade_answer(answer, ground_truth) else 0.0 + + +def compute_score(predict_str: str, ground_truth: str, use_boxed: bool = True, format_score: float = 0.1) -> float: + return (1.0 - format_score) * acc_reward(predict_str, ground_truth, use_boxed) + format_score * format_reward( + predict_str + ) diff --git a/openseek/competition/pz/yuanboyang/verl/verl/workers/fsdp_workers.py b/openseek/competition/pz/yuanboyang/verl/verl/workers/fsdp_workers.py new file mode 100644 index 0000000..ceab39e --- /dev/null +++ b/openseek/competition/pz/yuanboyang/verl/verl/workers/fsdp_workers.py @@ -0,0 +1,1789 @@ +# Copyright 2024 Bytedance Ltd. and/or its affiliates +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +The main entry point to run the PPO algorithm +""" + +import datetime +import json +import logging +import os +import warnings +from dataclasses import asdict +from typing import Any, Optional + +import numpy as np +import psutil +import torch +import torch.distributed +import torch.distributed as dist +from codetiming import Timer +from omegaconf import DictConfig, OmegaConf, open_dict +from peft import LoraConfig, TaskType, get_peft_model +from safetensors.torch import save_file +from torch.distributed.device_mesh import init_device_mesh +from torch.distributed.fsdp import FullyShardedDataParallel as FSDP + +import verl.utils.torch_functional as verl_F +from verl import DataProto +from verl.models.transformers.monkey_patch import apply_monkey_patch +from verl.single_controller.base import Worker +from verl.single_controller.base.decorator import Dispatch, make_nd_compute_dataproto_dispatch_fn, register +from verl.utils import hf_processor, hf_tokenizer +from verl.utils.activation_offload import enable_activation_offloading +from verl.utils.checkpoint.fsdp_checkpoint_manager import FSDPCheckpointManager +from verl.utils.config import omega_conf_to_dataclass +from verl.utils.device import ( + get_device_id, + get_device_name, + get_nccl_backend, + get_torch_device, + is_cuda_available, + is_npu_available, +) +from verl.utils.flops_counter import FlopsCounter +from verl.utils.fs import copy_to_local +from verl.utils.fsdp_utils import ( + CPUOffloadPolicy, + MixedPrecisionPolicy, + apply_fsdp2, + fsdp2_load_full_state_dict, + fsdp_version, + get_fsdp_wrap_policy, + get_init_weight_context_manager, + get_shard_placement_fn, + init_fn, + layered_summon_lora_params, + load_fsdp_model_to_gpu, + load_fsdp_optimizer, + offload_fsdp_model_to_cpu, + offload_fsdp_optimizer, +) +from verl.utils.import_utils import import_external_libs +from verl.utils.model import compute_position_id_with_mask +from verl.utils.profiler import DistProfiler, DistProfilerExtension, ProfilerConfig, log_gpu_memory_usage, simple_timer +from verl.utils.profiler.performance import reduce_timing, topk_reduce_ratio_min_max +from verl.utils.py_functional import convert_to_regular_types +from verl.workers.config import FSDPCriticConfig, FSDPEngineConfig, HFModelConfig, RolloutConfig +from verl.workers.rollout.rollout_worker import RolloutWorker +from verl.workers.sharding_manager.fsdp_ulysses import FSDPUlyssesShardingManager + +logger = logging.getLogger(__file__) +logger.setLevel(os.getenv("VERL_LOGGING_LEVEL", "WARN")) + +device_name = get_device_name() + + +def create_device_mesh(world_size, fsdp_size): + if fsdp_size < 0 or fsdp_size >= world_size: + device_mesh = init_device_mesh(device_name, mesh_shape=(world_size,), mesh_dim_names=["fsdp"]) + else: + device_mesh = init_device_mesh( + device_name, mesh_shape=(world_size // fsdp_size, fsdp_size), mesh_dim_names=["ddp", "fsdp"] + ) + return device_mesh + + +def get_sharding_strategy(device_mesh): + from torch.distributed.fsdp import ShardingStrategy + + if device_mesh.ndim == 1: + sharding_strategy = ShardingStrategy.FULL_SHARD + elif device_mesh.ndim == 2: + sharding_strategy = ShardingStrategy.HYBRID_SHARD + else: + raise NotImplementedError(f"Get device mesh ndim={device_mesh.ndim}, but only support 1 or 2") + return sharding_strategy + + +class ActorRolloutRefWorker(Worker, DistProfilerExtension): + """ + This worker can be instantiated as a standalone actor or a standalone rollout or a standalone reference policy + or a hybrid engine based on the config.rollout + """ + + def __init__(self, config: DictConfig, role: str, **kwargs): + Worker.__init__(self) + + self.config = config + import torch.distributed + + if not torch.distributed.is_initialized(): + rank = int(os.environ.get("RANK", 0)) + world_size = int(os.environ.get("WORLD_SIZE", 1)) + torch.distributed.init_process_group( + backend=f"cpu:gloo,{get_device_name()}:{get_nccl_backend()}", + rank=rank, + world_size=world_size, + timeout=datetime.timedelta(seconds=self.config.get("nccl_timeout", 600)), + init_method=os.environ.get("DIST_INIT_METHOD", None), + ) + + # build device mesh for FSDP + world_size = torch.distributed.get_world_size() + # TODO(sgm): support FSDP hybrid shard for larger model + self.device_mesh = create_device_mesh(world_size=world_size, fsdp_size=self.config.actor.fsdp_config.fsdp_size) + + # build device mesh for Ulysses Sequence Parallel + self.ulysses_device_mesh = None + self.ulysses_sequence_parallel_size = self.config.actor.get("ulysses_sequence_parallel_size", 1) + dp = world_size // self.ulysses_sequence_parallel_size + if self.ulysses_sequence_parallel_size > 1: + self.ulysses_device_mesh = init_device_mesh( + device_name, mesh_shape=(dp, self.ulysses_sequence_parallel_size), mesh_dim_names=["dp", "sp"] + ) + + # create training dispatch + if self.ulysses_device_mesh is not None: + is_collect = self.ulysses_device_mesh["sp"].get_local_rank() == 0 + self._register_dispatch_collect_info( + "actor", dp_rank=self.ulysses_device_mesh["dp"].get_local_rank(), is_collect=is_collect + ) + else: + self._register_dispatch_collect_info("actor", dp_rank=self.rank, is_collect=True) + + self.ulysses_sharding_manager = FSDPUlyssesShardingManager(self.ulysses_device_mesh) + self._lora_rank = self.config.model.get("lora_rank", 0) + self._is_lora = self._lora_rank > 0 + + self.role = role + assert self.role in ["actor", "rollout", "ref", "actor_rollout", "actor_rollout_ref"] + + self._is_actor = self.role in ["actor", "actor_rollout", "actor_rollout_ref"] + self._is_rollout = self.role in ["rollout", "actor_rollout", "actor_rollout_ref"] + self._is_ref = self.role in ["ref", "actor_rollout_ref"] + + # TODO(haibin.lin): + # As of now the type of config is DictConfig, if we assign config.profiler with ProfilerConfig, + # it will actually convert the ProfilerConfig dataclass back to a DictConfig. + # We can still use ProfilerConfig for testing purpose (tests/utils/test_nvtx_profile.py) + # as they provides DictConfig-like interface + # The benefit of creating the dataclass config is to perform validation during __post_init__ + if self._is_actor: + omega_profiler_config = config.actor.get("profiler", {}) + elif self._is_rollout: + # NOTE: In colocation mode, rollout config may not take effect (follow the actor config) + # This is for extendability in AsyncRL cases + omega_profiler_config = config.rollout.get("profiler", {}) + elif self._is_ref: + omega_profiler_config = config.ref.get("profiler", {}) + else: + raise ValueError( + f"Invalid role {self.role}, should be one of " + "['actor', 'rollout', 'ref', 'actor_rollout', 'actor_rollout_ref']" + ) + # omega_profiler_config is DictConfig + # profiler_config is a ProfilerConfig dataclass + profiler_config = omega_conf_to_dataclass(omega_profiler_config, dataclass_type=ProfilerConfig) + if omega_profiler_config.get("tool", None) in ["npu", "nsys", "torch", "torch_memory"]: + tool_config = omega_conf_to_dataclass( + omega_profiler_config.get("tool_config", {}).get(omega_profiler_config.get("tool")) + ) + else: + tool_config = None + DistProfilerExtension.__init__( + self, DistProfiler(rank=self.rank, config=profiler_config, tool_config=tool_config) + ) + + self._is_offload_param = False + self._is_offload_optimizer = False + if self._is_actor: + self._is_offload_param = self.config.actor.fsdp_config.get("param_offload", False) + self._is_offload_optimizer = self.config.actor.fsdp_config.get("optimizer_offload", False) + elif self._is_ref: + # TODO: it seems that manual offload is slowly than FSDP offload + self._is_offload_param = self.config.ref.fsdp_config.get("param_offload", False) + + # normalize config + if self._is_actor: + self.config.actor.ppo_mini_batch_size *= self.config.rollout.n + self.config.actor.ppo_mini_batch_size //= self.device_mesh.size() // self.ulysses_sequence_parallel_size + assert self.config.actor.ppo_mini_batch_size > 0, ( + f"ppo_mini_batch_size {self.config.actor.ppo_mini_batch_size} should be larger than 0 after " + f"normalization" + ) + # micro bsz + if self.config.actor.ppo_micro_batch_size is not None: + self.config.actor.ppo_micro_batch_size //= ( + self.device_mesh.size() // self.ulysses_sequence_parallel_size + ) + self.config.actor.ppo_micro_batch_size_per_gpu = self.config.actor.ppo_micro_batch_size + + if self.config.actor.ppo_micro_batch_size_per_gpu is not None: + assert self.config.actor.ppo_mini_batch_size % self.config.actor.ppo_micro_batch_size_per_gpu == 0, ( + f"normalized ppo_mini_batch_size {self.config.actor.ppo_mini_batch_size} should be divisible by " + f"ppo_micro_batch_size_per_gpu {self.config.actor.ppo_micro_batch_size_per_gpu}" + ) + assert self.config.actor.ppo_mini_batch_size // self.config.actor.ppo_micro_batch_size_per_gpu > 0, ( + f"normalized ppo_mini_batch_size {self.config.actor.ppo_mini_batch_size} should be larger than " + f"ppo_micro_batch_size_per_gpu {self.config.actor.ppo_micro_batch_size_per_gpu}" + ) + + # normalize rollout config + if self._is_rollout and self.config.rollout.log_prob_micro_batch_size is not None: + self.config.rollout.log_prob_micro_batch_size //= ( + self.device_mesh.size() // self.ulysses_sequence_parallel_size + ) + self.config.rollout.log_prob_micro_batch_size_per_gpu = self.config.rollout.log_prob_micro_batch_size + # normalize ref config + if self._is_ref and self.config.ref.log_prob_micro_batch_size is not None: + self.config.ref.log_prob_micro_batch_size //= self.device_mesh.size() // self.ulysses_sequence_parallel_size + self.config.ref.log_prob_micro_batch_size_per_gpu = self.config.ref.log_prob_micro_batch_size + + def _build_model_optimizer( + self, + model_path, + fsdp_config: FSDPEngineConfig, + optim_config, + override_model_config, + use_remove_padding=False, + use_fused_kernels=False, + enable_gradient_checkpointing=False, + trust_remote_code=False, + use_liger=False, + role="actor", + enable_activation_offload=False, + ): + from torch import optim + from torch.distributed.fsdp import CPUOffload, MixedPrecision + from transformers import AutoConfig, AutoModel, AutoModelForCausalLM, AutoModelForVision2Seq + + from verl.utils.model import get_generation_config, print_model_size, update_model_config + from verl.utils.torch_dtypes import PrecisionType + + assert role in ["actor", "ref"] + + log_gpu_memory_usage(f"Before init {role} from HF AutoModel", logger=logger) + local_path = model_path + + # note that we have to create model in fp32. Otherwise, the optimizer is in bf16, which is incorrect + # TODO(zhangchi.usc1992): 1. support create from random initialized model. 2. Support init with FSDP directly + self.tokenizer = hf_tokenizer(local_path, trust_remote_code=trust_remote_code) + self.processor = hf_processor(local_path, trust_remote_code=trust_remote_code) + + if self.config.model.get("custom_chat_template", None) is not None: + if self.processor is not None: + self.processor.chat_template = self.config.model.custom_chat_template + else: + self.tokenizer.chat_template = self.config.model.custom_chat_template + + torch_dtype = fsdp_config.get("model_dtype", None) + if torch_dtype is None: + torch_dtype = torch.float32 if self._is_actor else torch.bfloat16 + else: + torch_dtype = PrecisionType.to_dtype(torch_dtype) + + # override model kwargs + actor_model_config = AutoConfig.from_pretrained( + local_path, trust_remote_code=trust_remote_code, attn_implementation="flash_attention_2" + ) + # TODO: VL models use VisionAttention, which directly uses flash_attention in transformers>=4.53 + # which will be patched by _ulysses_flash_attention_forward, but errorly misses position_ids + # Maybe support Ulysses in VisionAttention in the future and remove this patch + if self.ulysses_sequence_parallel_size > 1 and hasattr(actor_model_config, "vision_config"): + actor_model_config.vision_config._attn_implementation = "eager" + + # patch for kimi-vl + if getattr(actor_model_config, "model_type", None) == "kimi_vl": + actor_model_config.text_config.topk_method = "greedy" + + self.generation_config = get_generation_config(local_path, trust_remote_code=trust_remote_code) + + override_config_kwargs = { + "bos_token_id": self.tokenizer.bos_token_id, + "eos_token_id": self.tokenizer.eos_token_id, + "pad_token_id": self.tokenizer.pad_token_id, + } + override_config_kwargs.update(override_model_config) + update_model_config(actor_model_config, override_config_kwargs=override_config_kwargs) + if self.rank == 0: + print(f"Model config after override: {actor_model_config}") + + # NOTE(fix me): tie_word_embedding causes meta_tensor init to hang + init_context = get_init_weight_context_manager( + use_meta_tensor=not actor_model_config.tie_word_embeddings, mesh=self.device_mesh + ) + + with init_context(), warnings.catch_warnings(): + warnings.simplefilter("ignore") + has_remote_code = hasattr(actor_model_config, "auto_map") and any( + actor_model_config.architectures[0] in val for val in actor_model_config.auto_map.values() + ) + if has_remote_code: + auto_class = next( + k for k, v in actor_model_config.auto_map.items() if actor_model_config.architectures[0] in v + ) + match auto_class: + case "AutoModelForVision2Seq": + actor_module_class = AutoModelForVision2Seq + case "AutoModelForCausalLM": + actor_module_class = AutoModelForCausalLM + case _: + actor_module_class = AutoModel + else: + if type(actor_model_config) in AutoModelForVision2Seq._model_mapping.keys(): + actor_module_class = AutoModelForVision2Seq + elif type(actor_model_config) in AutoModelForCausalLM._model_mapping.keys(): + actor_module_class = AutoModelForCausalLM + else: + actor_module_class = AutoModel + + actor_module = actor_module_class.from_pretrained( + pretrained_model_name_or_path=local_path, + torch_dtype=torch_dtype, + config=actor_model_config, + trust_remote_code=True, + ) + + # Apply Liger kernel to the model if use_liger is set to True + if use_liger: + from liger_kernel.transformers.monkey_patch import _apply_liger_kernel_to_instance + + _apply_liger_kernel_to_instance(model=actor_module) + + fused_kernel_options = self.config.model.get("fused_kernel_options", None) + fused_kernels_backend = ( + fused_kernel_options.get("impl_backend", None) if fused_kernel_options is not None else None + ) + + apply_monkey_patch( + model=actor_module, + use_remove_padding=use_remove_padding, + ulysses_sp_size=self.ulysses_sequence_parallel_size, + use_fused_kernels=use_fused_kernels, + fused_kernels_backend=fused_kernels_backend, + ) + + # some parameters may not in torch_dtype. TODO(zhangchi.usc1992) remove this after we switch to fsdp2 + actor_module.to(torch_dtype) + + if enable_gradient_checkpointing: + actor_module.gradient_checkpointing_enable(gradient_checkpointing_kwargs={"use_reentrant": False}) + if self._is_lora: + print("Applying LoRA to actor module") + actor_module.enable_input_require_grads() + # Convert config to regular Python types before creating PEFT model + lora_config = { + "task_type": TaskType.CAUSAL_LM, + "r": self.config.model.lora_rank, + "lora_alpha": self.config.model.lora_alpha, + "target_modules": convert_to_regular_types(self.config.model.target_modules), + "exclude_modules": convert_to_regular_types(self.config.model.exclude_modules), + "bias": "none", + } + actor_module = get_peft_model(actor_module, LoraConfig(**lora_config)) + torch.distributed.barrier() + + if self.rank == 0: + print_model_size(actor_module) + + log_gpu_memory_usage(f"After init {role} from HF AutoModel", logger=logger) + + # We wrap FSDP for rollout as well + mixed_precision_config = fsdp_config.get("mixed_precision", None) + if mixed_precision_config is not None: + param_dtype = PrecisionType.to_dtype(mixed_precision_config.get("param_dtype", "bf16")) + reduce_dtype = PrecisionType.to_dtype(mixed_precision_config.get("reduce_dtype", "fp32")) + buffer_dtype = PrecisionType.to_dtype(mixed_precision_config.get("buffer_dtype", "fp32")) + else: + param_dtype = torch.bfloat16 + reduce_dtype = torch.float32 + buffer_dtype = torch.float32 + + mixed_precision = MixedPrecision(param_dtype=param_dtype, reduce_dtype=reduce_dtype, buffer_dtype=buffer_dtype) + + auto_wrap_policy = get_fsdp_wrap_policy( + module=actor_module, + config=fsdp_config.get("wrap_policy", None), + is_lora=self.config.model.get("lora_rank", 0) > 0, + ) + + if self._is_rollout and self.config.rollout.name == "hf": + # TODO(zhangchi.usc1992, shengguangming) fix me. Current, auto_wrap_policy causes HFRollout to hang in Gemma + auto_wrap_policy = None + + if self.rank == 0: + print(f"wrap_policy: {auto_wrap_policy}") + + fsdp_mesh = self.device_mesh + sharding_strategy = get_sharding_strategy(fsdp_mesh) + + # TODO: add transformer policy + # We force reference policy to use CPUOffload to save memory. + # We force turn off CPUOffload for actor because it causes incorrect results when using grad accumulation + cpu_offload = None if role == "actor" else CPUOffload(offload_params=True) + fsdp_strategy = self.config.actor.strategy + if fsdp_strategy == "fsdp": + actor_module_fsdp = FSDP( + actor_module, + cpu_offload=cpu_offload, + param_init_fn=init_fn, + auto_wrap_policy=auto_wrap_policy, + device_id=get_device_id(), + sharding_strategy=sharding_strategy, # zero3 + mixed_precision=mixed_precision, + sync_module_states=True, + device_mesh=self.device_mesh, + use_orig_params=fsdp_config.get("use_orig_params", False), + forward_prefetch=fsdp_config.get("forward_prefetch", False), + ) + elif fsdp_strategy == "fsdp2": + assert CPUOffloadPolicy is not None, "PyTorch version >= 2.4 is required for using fully_shard API (FSDP2)" + mp_policy = MixedPrecisionPolicy( + param_dtype=param_dtype, reduce_dtype=reduce_dtype, cast_forward_inputs=True + ) + if role == "actor" and fsdp_config.offload_policy: + cpu_offload = CPUOffloadPolicy(pin_memory=True) + self._is_offload_param = False + self._is_offload_optimizer = False + else: + cpu_offload = None if role == "actor" else CPUOffloadPolicy(pin_memory=True) + + fsdp_kwargs = { + "mesh": fsdp_mesh, + "mp_policy": mp_policy, + "offload_policy": cpu_offload, + "reshard_after_forward": fsdp_config.reshard_after_forward, + "shard_placement_fn": get_shard_placement_fn(fsdp_size=self.device_mesh.shape[-1]), + } + full_state = actor_module.state_dict() + apply_fsdp2(actor_module, fsdp_kwargs, fsdp_config) + fsdp2_load_full_state_dict(actor_module, full_state, fsdp_mesh, cpu_offload) + actor_module_fsdp = actor_module + else: + raise NotImplementedError(f"not implement {fsdp_strategy}") + + if enable_activation_offload: + enable_activation_offloading(actor_module_fsdp, fsdp_strategy, enable_gradient_checkpointing) + + log_gpu_memory_usage(f"After {role} FSDP init", logger=logger) + + # TODO: add more optimizer args into config + if role == "actor" and optim_config is not None: + from verl.utils.torch_functional import get_constant_schedule_with_warmup, get_cosine_schedule_with_warmup + + actor_optimizer = optim.AdamW( + actor_module_fsdp.parameters(), + lr=optim_config.lr, + betas=optim_config.get("betas", (0.9, 0.999)), + weight_decay=optim_config.get("weight_decay", 1e-2), + ) + + total_steps = optim_config.get("total_training_steps", 0) + num_warmup_steps = int(optim_config.get("lr_warmup_steps", -1)) + warmup_style = optim_config.get("warmup_style", "constant") + min_lr_ratio = optim_config.get("min_lr_ratio", 0.0) + num_cycles = optim_config.get("num_cycles", 0.5) + if num_warmup_steps < 0: + num_warmup_steps_ratio = optim_config.get("lr_warmup_steps_ratio", 0.0) + num_warmup_steps = int(num_warmup_steps_ratio * total_steps) + + if self.rank == 0: + print(f"Total steps: {total_steps}, num_warmup_steps: {num_warmup_steps}") + + if warmup_style == "constant": + actor_lr_scheduler = get_constant_schedule_with_warmup( + optimizer=actor_optimizer, num_warmup_steps=num_warmup_steps + ) + elif warmup_style == "cosine": + actor_lr_scheduler = get_cosine_schedule_with_warmup( + optimizer=actor_optimizer, + num_warmup_steps=num_warmup_steps, + num_training_steps=total_steps, + min_lr_ratio=min_lr_ratio, + num_cycles=num_cycles, + ) + else: + raise NotImplementedError(f"Warmup style {warmup_style} is not supported") + + log_gpu_memory_usage(f"After {role} optimizer init", logger=logger) + else: + actor_optimizer = None + actor_lr_scheduler = None + + return actor_module_fsdp, actor_optimizer, actor_lr_scheduler, actor_model_config + + def _build_rollout(self, trust_remote_code=False): + from torch.distributed.device_mesh import init_device_mesh + + # TODO(sgm): support FSDP hybrid shard for larger model + infer_tp = self.config.rollout.tensor_model_parallel_size + dp = self.world_size // infer_tp + assert self.world_size % infer_tp == 0, ( + f"rollout world_size: {self.world_size} is not divisible by infer_tp: {infer_tp}" + ) + rollout_device_mesh = init_device_mesh( + device_name, mesh_shape=(dp, infer_tp), mesh_dim_names=["dp", "infer_tp"] + ) + rollout_name = self.config.rollout.name + + if rollout_name == "hf": + self._register_dispatch_collect_info("rollout", dp_rank=self.rank, is_collect=True) + else: + is_collect = rollout_device_mesh["infer_tp"].get_local_rank() == 0 + self._register_dispatch_collect_info( + "rollout", dp_rank=rollout_device_mesh["dp"].get_local_rank(), is_collect=is_collect + ) + + rollout_config: RolloutConfig = omega_conf_to_dataclass(self.config.rollout) + model_config: HFModelConfig = omega_conf_to_dataclass(self.config.model, dataclass_type=HFModelConfig) + + # build rollout worker inside hybrid engine + log_gpu_memory_usage(f"Before building {rollout_name} rollout", logger=logger) + rollout_worker = RolloutWorker(config=rollout_config, model_config=model_config) + log_gpu_memory_usage(f"After building {rollout_name} rollout", logger=logger) + + if rollout_name == "vllm": + from verl.workers.sharding_manager.fsdp_vllm import FSDPVLLMShardingManager + + full_params = torch.distributed.get_world_size() == 1 + rollout_sharding_manager = FSDPVLLMShardingManager( + module=self.actor_module_fsdp, + inference_engine=rollout_worker.rollout.inference_engine, + model_config=self.actor_model_config, + rollout_config=self.config.rollout, + full_params=full_params, + device_mesh=rollout_device_mesh, + offload_param=self._is_offload_param, + load_format=self.config.rollout.load_format, + layered_summon=self.config.rollout.get("layered_summon", False), + ) + log_gpu_memory_usage("After building sharding manager", logger=logger) + + elif rollout_name == "sglang": + # NOTE(linjunrong): Due to recent fp8 support in SGLang. Now importing any symbol relate to + # SGLang's model_runner would check CUDA device capability. However, due to verl's setting, + # the main process of ray can not find any CUDA device, which would potentially lead to: + # "RuntimeError: No CUDA GPUs are available". + # For this reason, sharding_manager.__init__ should not import FSDPSGLangShardingManager and + # we import it here use the abs path. + # check: https://github.com/sgl-project/sglang/blob/00f42707eaddfc2c0528e5b1e0094025c640b7a0/python/sglang/srt/layers/quantization/fp8_utils.py#L76 + from verl.workers.sharding_manager.fsdp_sglang import FSDPSGLangShardingManager + + if torch.distributed.get_world_size() == 1: + self.config.rollout.load_format = "dummy_hf" + rollout_sharding_manager = FSDPSGLangShardingManager( + module=self.actor_module_fsdp, + inference_engine=rollout_worker.rollout._engine, + model_config=self.actor_model_config, + rollout_config=self.config.rollout, + full_params="hf" in self.config.rollout.load_format, + device_mesh=rollout_device_mesh, + offload_param=self._is_offload_param, + multi_stage_wake_up=self.config.rollout.multi_stage_wake_up, + ) + log_gpu_memory_usage("After building sharding manager", logger=logger) + + else: + raise NotImplementedError(f"Rollout name: {self.config.rollout.name} is not supported") + + return rollout_worker, rollout_sharding_manager + + @register(dispatch_mode=Dispatch.ONE_TO_ALL) + def init_model(self): + from verl.workers.actor import DataParallelPPOActor + + # This is used to import external_lib into the huggingface systems + import_external_libs(self.config.model.get("external_lib", None)) + + override_model_config = OmegaConf.to_container(OmegaConf.create(self.config.model.get("override_config", {}))) + use_remove_padding = self.config.model.get("use_remove_padding", False) + use_shm = self.config.model.get("use_shm", False) + use_fused_kernels = self.config.model.get("use_fused_kernels", False) + + if self._is_actor or self._is_rollout: + # we need the model for actor and rollout + if self._is_actor: + optim_config = self.config.actor.optim + fsdp_config = omega_conf_to_dataclass(self.config.actor.fsdp_config) + else: + optim_config = None + fsdp_config = FSDPEngineConfig() + + local_path = copy_to_local(self.config.model.path, use_shm=use_shm) + ( + self.actor_module_fsdp, + self.actor_optimizer, + self.actor_lr_scheduler, + self.actor_model_config, + ) = self._build_model_optimizer( + model_path=local_path, + fsdp_config=fsdp_config, + optim_config=optim_config, + override_model_config=override_model_config, + use_remove_padding=use_remove_padding, + use_fused_kernels=use_fused_kernels, + enable_gradient_checkpointing=self.config.model.get("enable_gradient_checkpointing", False), + trust_remote_code=self.config.model.get("trust_remote_code", False), + use_liger=self.config.model.get("use_liger", False), + role="actor", + enable_activation_offload=self.config.model.get("enable_activation_offload", False), + ) + + # get the original unwrapped module + if fsdp_version(self.actor_module_fsdp) == 1: + self.actor_module = self.actor_module_fsdp._fsdp_wrapped_module + + if self._is_offload_param: + offload_fsdp_model_to_cpu(self.actor_module_fsdp) + log_gpu_memory_usage("After offload actor model during init", logger=logger) + + if self._is_offload_optimizer: + offload_fsdp_optimizer(optimizer=self.actor_optimizer) + log_gpu_memory_usage("After offload actor optimizer during init", logger=logger) + + if self._is_actor: + actor_cfg = omega_conf_to_dataclass(self.config.actor) + self.actor = DataParallelPPOActor( + config=actor_cfg, actor_module=self.actor_module_fsdp, actor_optimizer=self.actor_optimizer + ) + + if self._is_rollout: + self.rollout, self.rollout_sharding_manager = self._build_rollout( + trust_remote_code=self.config.model.get("trust_remote_code", False) + ) + + if self._is_ref: + ref_model_path = self.config.model.path + ref_model = self.config.ref.get("model", None) + if ref_model is not None: + ref_model_path = ref_model.get("path", self.config.model.path) + + if self.rank == 0: + print("reference model:", ref_model_path) + local_path = copy_to_local(ref_model_path, use_shm=use_shm) + self.ref_module_fsdp = self._build_model_optimizer( + model_path=local_path, + fsdp_config=omega_conf_to_dataclass(self.config.ref.fsdp_config), + optim_config=None, + override_model_config=override_model_config, + use_remove_padding=use_remove_padding, + use_fused_kernels=use_fused_kernels, + trust_remote_code=self.config.model.get("trust_remote_code", False), + use_liger=self.config.model.get("use_liger", False), + role="ref", + )[0] + OmegaConf.set_struct(self.config.ref, True) + with open_dict(self.config.ref): + self.config.ref.use_remove_padding = use_remove_padding + self.config.ref.use_fused_kernels = use_fused_kernels + self.ref_policy = DataParallelPPOActor(config=self.config.ref, actor_module=self.ref_module_fsdp) + + if self._is_actor: + self.flops_counter = FlopsCounter(self.actor_model_config) + self.checkpoint_manager = FSDPCheckpointManager( + model=self.actor_module_fsdp, + optimizer=self.actor.actor_optimizer, + lr_scheduler=self.actor_lr_scheduler, + processing_class=self.processor if self.processor is not None else self.tokenizer, + checkpoint_config=self.config.actor.checkpoint, + ) + + if not self._is_actor and self._is_rollout: + # If ActorRolloutRefWorker is initialized as a standalone rollout, + # create a checkpoint manager for FSDP model to allow loading FSDP checkpoints for rollout. + + checkpoint_contents = OmegaConf.create({"load_contents": ["model"], "save_contents": []}) + self.checkpoint_manager = FSDPCheckpointManager( + model=self.actor_module_fsdp, + optimizer=None, + lr_scheduler=None, + processing_class=self.processor if self.processor is not None else self.tokenizer, + checkpoint_config=checkpoint_contents, + ) + + @register(dispatch_mode=make_nd_compute_dataproto_dispatch_fn(mesh_name="actor")) + @DistProfiler.annotate(color="red", role="actor_update") + def update_actor(self, data: DataProto): + assert self._is_actor + if self._is_offload_param: + load_fsdp_model_to_gpu(self.actor_module_fsdp) + if self._is_offload_optimizer: + load_fsdp_optimizer(optimizer=self.actor_optimizer, device_id=get_device_id()) + + with self.ulysses_sharding_manager: + data = data.to("cpu") # data will to device with each micro batch on actor.update_policy + + # perform training + with Timer(name="update_policy", logger=None) as timer: + metrics = self.actor.update_policy(data=data) + delta_time = timer.last + global_num_tokens = data.meta_info["global_token_num"] + estimated_flops, promised_flops = self.flops_counter.estimate_flops(global_num_tokens, delta_time) + metrics["perf/mfu/actor"] = ( + estimated_flops * self.config.actor.ppo_epochs / promised_flops / self.world_size + ) + metrics["perf/max_memory_allocated_gb"] = get_torch_device().max_memory_allocated() / (1024**3) + metrics["perf/max_memory_reserved_gb"] = get_torch_device().max_memory_reserved() / (1024**3) + metrics["perf/cpu_memory_used_gb"] = psutil.virtual_memory().used / (1024**3) + + lr = self.actor_lr_scheduler.get_last_lr()[0] + metrics["actor/lr"] = lr + self.actor_lr_scheduler.step() + + # TODO: here, we should return all metrics + output = DataProto(meta_info={"metrics": metrics}) + + output = output.to("cpu") + + if self._is_offload_param: + offload_fsdp_model_to_cpu(self.actor_module_fsdp) + log_gpu_memory_usage("After offload actor model during update_actor", logger=logger) + if self._is_offload_optimizer: + offload_fsdp_optimizer(optimizer=self.actor_optimizer) + log_gpu_memory_usage("After offload actor optimizer during update_actor", logger=logger) + + return output + + @register(dispatch_mode=make_nd_compute_dataproto_dispatch_fn(mesh_name="rollout")) + @DistProfiler.annotate(color="red", role="rollout_generate") + def generate_sequences(self, prompts: DataProto): + # Support all hardwares + prompts = prompts.to(get_device_id()) + + assert self._is_rollout + + timing_generate = {} + with self.rollout_sharding_manager: + log_gpu_memory_usage("After entering rollout sharding manager", logger=logger) + + with simple_timer("generate_sequences", timing_generate): + output = self.rollout.generate_sequences(prompts=prompts) + + log_gpu_memory_usage("After rollout generation", logger=logger) + + timing_generate.update(self.rollout_sharding_manager.timing) + # We calculate the average timing across all ranks + # to make sure meta_info["timing"] is the same + timing_generate_topk_ratio, timing_generate_min, timing_generate_max = topk_reduce_ratio_min_max( + timing_generate["generate_sequences"] + ) + timing_generate = reduce_timing(timing_generate) + timing_generate.update( + { + "generation_timing/max": timing_generate_max, + "generation_timing/min": timing_generate_min, + "generation_timing/topk_ratio": timing_generate_topk_ratio, + } + ) + output.meta_info["timing"] = timing_generate + output = output.to("cpu") + + # clear kv cache + get_torch_device().empty_cache() + return output + + @register(dispatch_mode=make_nd_compute_dataproto_dispatch_fn(mesh_name="actor")) + @DistProfiler.annotate(color="blue", role="actor_compute_log_prob") + def compute_log_prob(self, data: DataProto): + # when is_lora is True, we use the actor without lora applied to calculate the log_prob + # which is mostly used for ref log_prob calculation + assert self._is_actor + if self._is_offload_param: + load_fsdp_model_to_gpu(self.actor_module_fsdp) + + # Support all hardwares + from contextlib import nullcontext + + is_lora = data.meta_info.pop("is_lora", False) + adapter_ctx = self.actor.actor_module.disable_adapter() if is_lora else nullcontext() + # we should always recompute old_log_probs when it is HybridEngine + data.meta_info["micro_batch_size"] = self.config.rollout.log_prob_micro_batch_size_per_gpu + data.meta_info["max_token_len"] = self.config.rollout.log_prob_max_token_len_per_gpu + data.meta_info["use_dynamic_bsz"] = self.config.rollout.log_prob_use_dynamic_bsz + data.meta_info["temperature"] = self.config.rollout.temperature + # perform recompute log_prob + with self.ulysses_sharding_manager: + with adapter_ctx: + output, entropys = self.actor.compute_log_prob(data=data, calculate_entropy=True) + output = DataProto.from_dict( + tensors={"old_log_probs": output, "entropys": entropys}, + meta_info={"temperature": self.config.rollout.temperature}, + ) + + output = output.to("cpu") + + # https://pytorch.org/docs/stable/notes/fsdp.html#fsdp-notes + # unshard the root FSDP module + if self.world_size > 1 and fsdp_version(self.actor.actor_module) == 1: + self.actor.actor_module._handle.reshard(True) + + if self._is_offload_param: + offload_fsdp_model_to_cpu(self.actor_module_fsdp) + log_gpu_memory_usage("After offload actor model during compute_log_prob", logger=logger) + + return output + + @register(dispatch_mode=make_nd_compute_dataproto_dispatch_fn(mesh_name="actor")) + @DistProfiler.annotate(color="olive", role="ref_compute_log_prob") + def compute_ref_log_prob(self, data: DataProto): + if self._is_lora: + # if _is_lora, actor without lora applied is the ref + data.meta_info["is_lora"] = True + data = self.compute_log_prob(data) + # this old_log_probs is in fact ref_log_prob + data = DataProto.from_dict(tensors={"ref_log_prob": data.batch["old_log_probs"]}) + return data + assert self._is_ref + # else: + # otherwise, the class have a standalone ref model + + micro_batch_size = self.config.ref.log_prob_micro_batch_size_per_gpu + data.meta_info["micro_batch_size"] = micro_batch_size + data.meta_info["temperature"] = self.config.rollout.temperature + data.meta_info["max_token_len"] = self.config.ref.log_prob_max_token_len_per_gpu + data.meta_info["use_dynamic_bsz"] = self.config.ref.log_prob_use_dynamic_bsz + with self.ulysses_sharding_manager: + data = data.to("cpu") # data will to device with each micro batch on ref.compute_log_prob + output, _ = self.ref_policy.compute_log_prob(data=data, calculate_entropy=False) + output = DataProto.from_dict(tensors={"ref_log_prob": output}) + + output = output.to("cpu") + + # https://pytorch.org/docs/stable/notes/fsdp.html#fsdp-notes + # unshard the root FSDP module + if self.world_size > 1: + if fsdp_version(self.ref_policy.actor_module) == 1: + self.ref_policy.actor_module._handle.reshard(True) + elif fsdp_version(self.ref_policy.actor_module) == 2: + self.ref_policy.actor_module.reshard() + + return output + + @register(dispatch_mode=Dispatch.ONE_TO_ALL) + def save_checkpoint(self, local_path, hdfs_path=None, global_step=0, max_ckpt_to_keep=None): + from verl.utils.logger import log_with_rank + + # only support save and load ckpt for actor + assert self._is_actor + + if self._is_offload_param: + load_fsdp_model_to_gpu(self.actor_module_fsdp) + + self.checkpoint_manager.save_checkpoint( + local_path=local_path, hdfs_path=hdfs_path, global_step=global_step, max_ckpt_to_keep=max_ckpt_to_keep + ) + dist.barrier() + + if self._is_lora and hasattr(getattr(self, "actor_module", self.actor_module_fsdp), "peft_config"): + lora_save_path = os.path.join(local_path, "lora_adapter") + peft_model = getattr(self, "actor_module", self.actor_module_fsdp) + peft_config = {} + if dist.get_rank() == 0: + os.makedirs(lora_save_path, exist_ok=True) + peft_config = asdict(peft_model.peft_config.get("default", {})) + peft_config["task_type"] = peft_config["task_type"].value + peft_config["peft_type"] = peft_config["peft_type"].value + peft_config["target_modules"] = list(peft_config["target_modules"]) + try: + if fsdp_version(self.actor_module_fsdp) > 0: + self.actor_module_fsdp = self.actor_module_fsdp.to(get_device_name()) + lora_params = layered_summon_lora_params(self.actor_module_fsdp) + if dist.get_rank() == 0: + save_file(lora_params, os.path.join(lora_save_path, "adapter_model.safetensors")) + with open(os.path.join(lora_save_path, "adapter_config.json"), "w", encoding="utf-8") as f: + json.dump(peft_config, f, ensure_ascii=False, indent=4) + except Exception as e: + log_with_rank( + f"Save LoRA Adapter Error ({e})", rank=dist.get_rank(), logger=logger, log_only_rank_0=True + ) + + dist.barrier() + log_with_rank( + f"[rank-{self.rank}]: Saved LoRA adapter to: {lora_save_path}", + rank=dist.get_rank(), + logger=logger, + log_only_rank_0=True, + ) + + if self._is_offload_param: + offload_fsdp_model_to_cpu(self.actor_module_fsdp) + + @register(dispatch_mode=Dispatch.ONE_TO_ALL) + def load_checkpoint(self, local_path, hdfs_path=None, del_local_after_load=False): + assert self._is_actor or (not self._is_actor and self._is_rollout), ( + f"Checkpoint loading is only supported for Actor or standalone Rollout Workers, but got " + f"{self._is_actor} and {self._is_rollout}" + ) + + if self._is_offload_param: + load_fsdp_model_to_gpu(self.actor_module_fsdp) + + self.checkpoint_manager.load_checkpoint( + local_path=local_path, hdfs_path=hdfs_path, del_local_after_load=del_local_after_load + ) + + if self._is_offload_param: + offload_fsdp_model_to_cpu(self.actor_module_fsdp) + + if self._is_offload_optimizer: + offload_fsdp_optimizer(self.actor_optimizer) + + @register(dispatch_mode=Dispatch.ONE_TO_ALL) + def start_profile(self, **kwargs) -> None: + """Start profiling for the current rank in the current training step.""" + self.profiler.start(**kwargs) + + @register(dispatch_mode=Dispatch.ONE_TO_ALL) + def stop_profile(self) -> None: + """Stop profiling for the current rank in the current training step.""" + self.profiler.stop() + + @register(dispatch_mode=Dispatch.ONE_TO_ALL) + def dump_memory_snapshot(self, tag: str = "manual", sub_dir: str = None) -> None: + """Manually trigger a CUDA memory snapshot dump on all ranks.""" + # Memory snapshot is now handled by the profiler system + # This method is kept for backward compatibility but delegates to profiler + if hasattr(self, "profiler") and hasattr(self.profiler, "_impl"): + try: + # Try to use the profiler's memory snapshot functionality + if hasattr(self.profiler._impl, "sampler"): + out_dir = OmegaConf.select(self.config, "actor.profiler.save_path") or "." + self.profiler._impl.sampler.dump_memory_snapshot(out_dir=out_dir, tag=tag, sub_dir=sub_dir) + except Exception: + # silently ignore if profiler doesn't support memory snapshots + pass + + +class CriticWorker(Worker, DistProfilerExtension): + def __init__(self, config: FSDPCriticConfig): + Worker.__init__(self) + omega_profiler_config = config.get("profiler", {}) + profiler_config = omega_conf_to_dataclass(omega_profiler_config, dataclass_type=ProfilerConfig) + if omega_profiler_config.get("tool", None) in ["npu", "nsys", "torch", "torch_memory"]: + tool_config = omega_conf_to_dataclass( + omega_profiler_config.get("tool_config", {}).get(omega_profiler_config.get("tool")) + ) + else: + tool_config = None + DistProfilerExtension.__init__( + self, DistProfiler(rank=self.rank, config=profiler_config, tool_config=tool_config) + ) + import torch.distributed + + self.config = config + if not torch.distributed.is_initialized(): + torch.distributed.init_process_group( + backend=get_nccl_backend(), + timeout=datetime.timedelta(seconds=self.config.get("nccl_timeout", 600)), + init_method=os.environ.get("DIST_INIT_METHOD", None), + ) + self.config: FSDPCriticConfig = config + + # build device mesh for Ulysses Sequence Parallel + world_size = torch.distributed.get_world_size() + from torch.distributed.device_mesh import init_device_mesh + + fsdp_size = self.config.model.fsdp_config.fsdp_size + self.device_mesh = create_device_mesh(world_size=world_size, fsdp_size=fsdp_size) + + self.ulysses_device_mesh = None + self.ulysses_sequence_parallel_size = self.config.get("ulysses_sequence_parallel_size", 1) + dp = world_size // self.ulysses_sequence_parallel_size + if self.ulysses_sequence_parallel_size > 1: + self.ulysses_device_mesh = init_device_mesh( + device_name, mesh_shape=(dp, self.ulysses_sequence_parallel_size), mesh_dim_names=["dp", "sp"] + ) + + # create training dispatch + if self.ulysses_device_mesh is not None: + is_collect = self.ulysses_device_mesh["sp"].get_local_rank() == 0 + self._register_dispatch_collect_info( + "critic", dp_rank=self.ulysses_device_mesh["dp"].get_local_rank(), is_collect=is_collect + ) + else: + self._register_dispatch_collect_info("critic", dp_rank=self.rank, is_collect=True) + + self.ulysses_sharding_manager = FSDPUlyssesShardingManager(self.ulysses_device_mesh) + + # set FSDP offload params + self._is_offload_param = self.config.model.fsdp_config.param_offload + self._is_offload_optimizer = self.config.model.fsdp_config.optimizer_offload + + # normalize config + self.config.ppo_mini_batch_size *= self.config.rollout_n + self.config.ppo_mini_batch_size //= torch.distributed.get_world_size() // self.ulysses_sequence_parallel_size + if self.config.ppo_micro_batch_size is not None: + self.config.ppo_micro_batch_size //= ( + torch.distributed.get_world_size() // self.ulysses_sequence_parallel_size + ) + self.config.forward_micro_batch_size //= ( + torch.distributed.get_world_size() // self.ulysses_sequence_parallel_size + ) + self.config.ppo_micro_batch_size_per_gpu = self.config.ppo_micro_batch_size + self.config.forward_micro_batch_size_per_gpu = self.config.forward_micro_batch_size + + if self.config.ppo_micro_batch_size_per_gpu is not None: + assert self.config.ppo_mini_batch_size % self.config.ppo_micro_batch_size_per_gpu == 0, ( + f"normalized ppo_mini_batch_size {self.config.ppo_mini_batch_size} should be divisible by " + f"ppo_micro_batch_size_per_gpu {self.config.ppo_micro_batch_size_per_gpu}" + ) + assert self.config.ppo_mini_batch_size // self.config.ppo_micro_batch_size_per_gpu > 0, ( + f"normalized ppo_mini_batch_size {self.config.ppo_mini_batch_size} should be larger than " + f"ppo_micro_batch_size_per_gpu {self.config.ppo_micro_batch_size_per_gpu}" + ) + self._is_lora = self.config.model.get("lora_rank", 0) > 0 + + def _build_critic_model_optimizer(self, config): + # the following line is necessary + from torch import optim + from torch.distributed.fsdp import MixedPrecision + + from verl.utils.model import load_valuehead_model, print_model_size + from verl.utils.torch_dtypes import PrecisionType + + use_shm = config.model.get("use_shm", False) + local_path = copy_to_local(config.model.path, use_shm=use_shm) + # note that the tokenizer between actor and critic may be different. So override tokenizer info with actor info + # using random initialized model from any architecture. May not be the same as Actor. + + tokenizer_path = copy_to_local(config.model.tokenizer_path, use_shm=use_shm) + self.tokenizer = hf_tokenizer(tokenizer_path, trust_remote_code=config.model.get("trust_remote_code", False)) + self.processor = hf_processor(tokenizer_path, trust_remote_code=config.model.get("trust_remote_code", False)) + + if self.config.model.get("custom_chat_template", None) is not None: + if self.processor is not None: + self.processor.chat_template = self.config.model.custom_chat_template + else: + self.tokenizer.chat_template = self.config.model.custom_chat_template + override_config = OmegaConf.to_container(OmegaConf.create(self.config.model.get("override_config", {}))) + override_config_kwargs = { + "bos_token_id": self.tokenizer.bos_token_id, + "eos_token_id": self.tokenizer.eos_token_id, + "pad_token_id": self.tokenizer.pad_token_id, + } + override_config_kwargs.update(override_config) + if self.rank == 0: + print(f"Critic overriding config {override_config_kwargs}") + + torch_dtype = self.config.model.fsdp_config.get("model_dtype", "fp32") + torch_dtype = PrecisionType.to_dtype(torch_dtype) + + from transformers import AutoConfig + + critic_model_config = AutoConfig.from_pretrained( + local_path, + attn_implementation="flash_attention_2", + trust_remote_code=config.model.get("trust_remote_code", False), + ) + # TODO: VL models use VisionAttention, which directly uses flash_attention in transformers>=4.53 + # which will be patched by _ulysses_flash_attention_forward, but errorly misses position_ids + # Maybe support Ulysses in VisionAttention in the future and remove this patch + if self.ulysses_sequence_parallel_size > 1 and hasattr(critic_model_config, "vision_config"): + critic_model_config.vision_config._attn_implementation = "eager" + + critic_model_config.num_labels = 1 + # patch for kimi-vl + if getattr(critic_model_config, "model_type", None) == "kimi_vl": + critic_model_config.text_config.topk_method = "greedy" + + init_context = get_init_weight_context_manager( + use_meta_tensor=not critic_model_config.tie_word_embeddings, mesh=self.device_mesh + ) + + with init_context(), warnings.catch_warnings(): + warnings.simplefilter("ignore") + critic_model_config.classifier_dropout = 0.0 + critic_model_config.hidden_dropout = "0" + critic_model_config.summary_dropout_prob = 0.0 + + critic_module = load_valuehead_model( + local_path, + torch_dtype, + critic_model_config, + config.model.get("trust_remote_code", False), + ) + + use_remove_padding = config.model.get("use_remove_padding", False) + + apply_monkey_patch( + model=critic_module, + use_remove_padding=use_remove_padding, + ulysses_sp_size=self.ulysses_sequence_parallel_size, + ) + + # some parameters may not in torch_dtype + critic_module.to(torch_dtype) + + if config.model.get("enable_gradient_checkpointing", False): + critic_module.gradient_checkpointing_enable(gradient_checkpointing_kwargs={"use_reentrant": False}) + + if self._is_lora: + print("Applying LoRA to critic module") + critic_module.enable_input_require_grads() + # Convert config to regular Python types before creating PEFT model + lora_config = { + "task_type": TaskType.CAUSAL_LM, + "r": self.config.model.lora_rank, + "lora_alpha": self.config.model.lora_alpha, + "target_modules": convert_to_regular_types(self.config.model.target_modules), + "bias": "none", + } + critic_module = get_peft_model(critic_module, LoraConfig(**lora_config)) + + if self.rank == 0: + print_model_size(critic_module) + + self.critic_model_config = critic_model_config + + fsdp_config = self.config.model.fsdp_config + mixed_precision_config = fsdp_config.get("mixed_precision", None) + if mixed_precision_config is not None: + param_dtype = PrecisionType.to_dtype(mixed_precision_config.get("param_dtype", "bf16")) + reduce_dtype = PrecisionType.to_dtype(mixed_precision_config.get("reduce_dtype", "fp32")) + buffer_dtype = PrecisionType.to_dtype(mixed_precision_config.get("buffer_dtype", "fp32")) + else: + param_dtype = torch.bfloat16 + reduce_dtype = torch.float32 + buffer_dtype = torch.float32 + + mixed_precision = MixedPrecision(param_dtype=param_dtype, reduce_dtype=reduce_dtype, buffer_dtype=buffer_dtype) + + auto_wrap_policy = get_fsdp_wrap_policy( + module=critic_module, + config=self.config.model.fsdp_config.wrap_policy, + is_lora=self.config.model.get("lora_rank", 0) > 0, + ) + + log_gpu_memory_usage("Before critic FSDP", logger=None) + + fsdp_mesh = self.device_mesh + sharding_strategy = get_sharding_strategy(fsdp_mesh) + + # Note: We force turn off CPUOffload for critic because it causes incorrect results when using grad accumulation + if config.strategy == "fsdp": + critic_module = FSDP( + critic_module, + param_init_fn=init_fn, + use_orig_params=False, + auto_wrap_policy=auto_wrap_policy, + device_id=get_device_id(), + sharding_strategy=sharding_strategy, + mixed_precision=mixed_precision, + sync_module_states=True, + forward_prefetch=self.config.model.fsdp_config.forward_prefetch, + device_mesh=self.device_mesh, + cpu_offload=None, + ) + elif config.strategy == "fsdp2": + assert CPUOffloadPolicy is not None, "PyTorch version >= 2.4 is required for using fully_shard API (FSDP2)" + mp_policy = MixedPrecisionPolicy( + param_dtype=param_dtype, reduce_dtype=reduce_dtype, cast_forward_inputs=True + ) + offload_policy = None + if fsdp_config.offload_policy: + self._is_offload_param = False + self._is_offload_optimizer = False + offload_policy = CPUOffloadPolicy(pin_memory=True) + + fsdp_kwargs = { + "mesh": fsdp_mesh, + "mp_policy": mp_policy, + "offload_policy": offload_policy, + "reshard_after_forward": fsdp_config.reshard_after_forward, + "shard_placement_fn": get_shard_placement_fn(fsdp_size=self.device_mesh.shape[-1]), + } + full_state = critic_module.state_dict() + apply_fsdp2(critic_module, fsdp_kwargs, fsdp_config) + fsdp2_load_full_state_dict(critic_module, full_state, fsdp_mesh, offload_policy) + else: + raise NotImplementedError(f"Unknown strategy {config.strategy}") + + if config.model.get("enable_activation_offload", False): + enable_gradient_checkpointing = config.model.get("enable_gradient_checkpointing", False) + enable_activation_offloading(critic_module, config.strategy, enable_gradient_checkpointing) + + log_gpu_memory_usage("After critic FSDP", logger=None) + + critic_optimizer = optim.AdamW( + critic_module.parameters(), + lr=config.optim.lr, + betas=config.optim.get("betas", (0.9, 0.999)), + weight_decay=config.optim.get("weight_decay", 1e-2), + ) + + total_steps = config.optim.get("total_training_steps", 0) + num_warmup_steps = int(config.optim.get("lr_warmup_steps", -1)) + warmup_style = config.optim.get("warmup_style", "constant") + if num_warmup_steps < 0: + num_warmup_steps_ratio = config.optim.get("lr_warmup_steps_ratio", 0.0) + num_warmup_steps = int(num_warmup_steps_ratio * total_steps) + + if self.rank == 0: + print(f"Total steps: {total_steps}, num_warmup_steps: {num_warmup_steps}") + + from verl.utils.torch_functional import get_constant_schedule_with_warmup, get_cosine_schedule_with_warmup + + if warmup_style == "constant": + critic_lr_scheduler = get_constant_schedule_with_warmup( + optimizer=critic_optimizer, num_warmup_steps=num_warmup_steps + ) + elif warmup_style == "cosine": + min_lr_ratio = config.optim.get("min_lr_ratio", 0.0) + num_cycles = config.optim.get("num_cycles", 0.5) + critic_lr_scheduler = get_cosine_schedule_with_warmup( + optimizer=critic_optimizer, + num_warmup_steps=num_warmup_steps, + num_training_steps=total_steps, + min_lr_ratio=min_lr_ratio, + num_cycles=num_cycles, + ) + else: + raise NotImplementedError(f"Warmup style {warmup_style} is not supported") + + return critic_module, critic_optimizer, critic_lr_scheduler + + @register(dispatch_mode=Dispatch.ONE_TO_ALL) + def init_model(self): + # This is used to import external_lib into the huggingface systems + import_external_libs(self.config.model.get("external_lib", None)) + + from verl.workers.critic import DataParallelPPOCritic + + self.critic_module, self.critic_optimizer, self.critic_lr_scheduler = self._build_critic_model_optimizer( + self.config + ) + + if self._is_offload_param: + offload_fsdp_model_to_cpu(self.critic_module) + log_gpu_memory_usage("After offload critic model during init", logger=logger) + if self._is_offload_optimizer: + offload_fsdp_optimizer(optimizer=self.critic_optimizer) + log_gpu_memory_usage("After offload critic optimizer during init", logger=logger) + + self.critic = DataParallelPPOCritic( + config=self.config, critic_module=self.critic_module, critic_optimizer=self.critic_optimizer + ) + + self.flops_counter = FlopsCounter(self.critic_model_config) + self.checkpoint_manager = FSDPCheckpointManager( + model=self.critic_module, + optimizer=self.critic_optimizer, + lr_scheduler=self.critic_lr_scheduler, + processing_class=self.processor if self.processor is not None else self.tokenizer, + checkpoint_config=self.config.checkpoint, + ) + + @register(dispatch_mode=make_nd_compute_dataproto_dispatch_fn(mesh_name="critic")) + @DistProfiler.annotate(color="cyan") + def compute_values(self, data: DataProto): + if self._is_offload_param: + load_fsdp_model_to_gpu(self.critic_module) + micro_batch_size = self.config.forward_micro_batch_size_per_gpu + data.meta_info["micro_batch_size"] = micro_batch_size + data.meta_info["max_token_len"] = self.config.forward_max_token_len_per_gpu + data.meta_info["use_dynamic_bsz"] = self.config.use_dynamic_bsz + # perform forward computation + with self.ulysses_sharding_manager: + data = data.to("cpu") # data will to device with each micro batch on critic.compute_values + values = self.critic.compute_values(data=data) + output = DataProto.from_dict(tensors={"values": values}) + + output = output.to("cpu") + if self._is_offload_param: + offload_fsdp_model_to_cpu(self.critic_module) + return output + + @register(dispatch_mode=make_nd_compute_dataproto_dispatch_fn(mesh_name="critic")) + @DistProfiler.annotate(color="pink") + def update_critic(self, data: DataProto): + if self._is_offload_param: + load_fsdp_model_to_gpu(self.critic_module) + if self._is_offload_optimizer: + load_fsdp_optimizer(optimizer=self.critic_optimizer, device_id=get_device_id()) + + # perform forward computation + with self.ulysses_sharding_manager: + data = data.to("cpu") # data will to device with each micro batch on critic.update_critic + with Timer(name="update_critic", logger=None) as timer: + metrics = self.critic.update_critic(data=data) + delta_time = timer.last + + global_num_tokens = data.meta_info["global_token_num"] + estimated_flops, promised_flops = self.flops_counter.estimate_flops(global_num_tokens, delta_time) + metrics["perf/mfu/critic"] = estimated_flops * self.config.ppo_epochs / promised_flops / self.world_size + + lr = self.critic_lr_scheduler.get_last_lr()[0] + metrics["critic/lr"] = lr + self.critic_lr_scheduler.step() + + output = DataProto(batch=None, meta_info={"metrics": metrics}) + + if self._is_offload_param: + offload_fsdp_model_to_cpu(self.critic_module) + if self._is_offload_optimizer: + offload_fsdp_optimizer(optimizer=self.critic_optimizer) + + output = output.to("cpu") + return output + + @register(dispatch_mode=Dispatch.ONE_TO_ALL) + def save_checkpoint(self, local_path, hdfs_path=None, global_step=0, max_ckpt_to_keep=None): + import torch + + if self._is_offload_param: + load_fsdp_model_to_gpu(self.critic_module) + + self.checkpoint_manager.save_checkpoint( + local_path=local_path, hdfs_path=hdfs_path, global_step=global_step, max_ckpt_to_keep=max_ckpt_to_keep + ) + + torch.distributed.barrier() + if self._is_offload_param: + offload_fsdp_model_to_cpu(self.critic_module) + + @register(dispatch_mode=Dispatch.ONE_TO_ALL) + def load_checkpoint(self, local_path, hdfs_path=None, del_local_after_load=True): + import torch + + if self._is_offload_param: + load_fsdp_model_to_gpu(self.critic_module) + + self.checkpoint_manager.load_checkpoint( + local_path=local_path, hdfs_path=hdfs_path, del_local_after_load=del_local_after_load + ) + + torch.distributed.barrier() + if self._is_offload_param: + offload_fsdp_model_to_cpu(self.critic_module) + + if self._is_offload_optimizer: + offload_fsdp_optimizer(self.critic_optimizer) + + +# TODO(sgm): we may need to extract it to dp_reward_model.py +class RewardModelWorker(Worker, DistProfilerExtension): + """ + Note that we only implement the reward model that is subclass of AutoModelForTokenClassification. + """ + + def __init__(self, config): + Worker.__init__(self) + + omega_profiler_config = config.get("profiler", {}) + profiler_config = omega_conf_to_dataclass(omega_profiler_config, dataclass_type=ProfilerConfig) + if omega_profiler_config.get("tool", None) in ["npu", "nsys", "torch", "torch_memory"]: + tool_config = omega_conf_to_dataclass( + omega_profiler_config.get("tool_config", {}).get(omega_profiler_config.get("tool")) + ) + else: + tool_config = None + DistProfilerExtension.__init__( + self, + DistProfiler(rank=self.rank, config=profiler_config, tool_config=tool_config), + ) + + import torch.distributed + + self.config = config + if not torch.distributed.is_initialized(): + torch.distributed.init_process_group( + backend=get_nccl_backend(), + timeout=datetime.timedelta(seconds=self.config.get("nccl_timeout", 600)), + init_method=os.environ.get("DIST_INIT_METHOD", None), + ) + + # build device mesh for Ulysses Sequence Parallel + world_size = torch.distributed.get_world_size() + from torch.distributed.device_mesh import init_device_mesh + + fsdp_size = self.config.model.fsdp_config.fsdp_size + self.device_mesh = create_device_mesh(world_size=world_size, fsdp_size=fsdp_size) + + self.ulysses_device_mesh = None + self.ulysses_sequence_parallel_size = self.config.get("ulysses_sequence_parallel_size", 1) + dp = world_size // self.ulysses_sequence_parallel_size + if self.ulysses_sequence_parallel_size > 1: + self.ulysses_device_mesh = init_device_mesh( + device_name, mesh_shape=(dp, self.ulysses_sequence_parallel_size), mesh_dim_names=["dp", "sp"] + ) + + self.ulysses_sharding_manager = FSDPUlyssesShardingManager(self.ulysses_device_mesh) + + # create training dispatch + if self.ulysses_device_mesh is not None: + is_collect = self.ulysses_device_mesh["sp"].get_local_rank() == 0 + self._register_dispatch_collect_info( + "reward", dp_rank=self.ulysses_device_mesh["dp"].get_local_rank(), is_collect=is_collect + ) + else: + self._register_dispatch_collect_info("reward", dp_rank=self.rank, is_collect=True) + + self.use_remove_padding = self.config.model.get("use_remove_padding", False) + + # normalize config + if self.config.micro_batch_size is not None: + self.config.micro_batch_size //= torch.distributed.get_world_size() + self.config.micro_batch_size_per_gpu = self.config.micro_batch_size + + def _build_model(self, config): + # the following line is necessary + from torch.distributed.fsdp import CPUOffload + from transformers import AutoConfig, AutoModelForTokenClassification + + use_shm = config.model.get("use_shm", False) + # download the checkpoint from hdfs + local_path = copy_to_local(config.model.path, use_shm=use_shm) + + if self.config.model.input_tokenizer is None: + self._do_switch_chat_template = False + else: + self._do_switch_chat_template = True + input_tokenizer_local_path = copy_to_local(config.model.input_tokenizer, use_shm=use_shm) + self.input_tokenizer = hf_tokenizer( + input_tokenizer_local_path, trust_remote_code=config.model.get("trust_remote_code", False) + ) + self.tokenizer = hf_tokenizer(local_path, trust_remote_code=config.model.get("trust_remote_code", False)) + + trust_remote_code = config.model.get("trust_remote_code", False) + model_config = AutoConfig.from_pretrained(local_path, trust_remote_code=trust_remote_code) + model_config.num_labels = 1 + + # note that we have to create model in fp32. Otherwise, the optimizer is in bf16, which is incorrect + init_context = get_init_weight_context_manager( + use_meta_tensor=not model_config.tie_word_embeddings, mesh=self.device_mesh + ) + + with init_context(), warnings.catch_warnings(): + warnings.simplefilter("ignore") + model_config.classifier_dropout = 0.0 + reward_module = AutoModelForTokenClassification.from_pretrained( + pretrained_model_name_or_path=local_path, + config=model_config, + torch_dtype=torch.bfloat16, + attn_implementation="flash_attention_2", + trust_remote_code=trust_remote_code, + ) + + apply_monkey_patch( + model=reward_module, + use_remove_padding=config.model.get("use_remove_padding", False), + ulysses_sp_size=self.ulysses_sequence_parallel_size, + ) + + reward_module.to(torch.bfloat16) + + auto_wrap_policy = get_fsdp_wrap_policy(module=reward_module, config=self.config.model.fsdp_config) + + fsdp_mesh = self.device_mesh + sharding_strategy = get_sharding_strategy(fsdp_mesh) + + if config.strategy == "fsdp": + reward_module = FSDP( + reward_module, + param_init_fn=init_fn, + use_orig_params=False, + auto_wrap_policy=auto_wrap_policy, + device_id=get_device_id(), + sharding_strategy=sharding_strategy, # zero3 + sync_module_states=True, + cpu_offload=CPUOffload(offload_params=True), + forward_prefetch=self.config.model.fsdp_config.forward_prefetch, + device_mesh=self.device_mesh, + ) + elif config.strategy == "fsdp2": + assert CPUOffloadPolicy is not None, "PyTorch version >= 2.4 is required for using fully_shard API (FSDP2)" + cpu_offload = CPUOffloadPolicy(pin_memory=True) + fsdp_kwargs = { + "mesh": fsdp_mesh, + "offload_policy": cpu_offload, + "reshard_after_forward": config.model.fsdp_config.reshard_after_forward, + "shard_placement_fn": get_shard_placement_fn(fsdp_size=self.device_mesh.shape[-1]), + } + full_state = reward_module.state_dict() + apply_fsdp2(reward_module, fsdp_kwargs, config.model.fsdp_config) + fsdp2_load_full_state_dict(reward_module, full_state, fsdp_mesh, cpu_offload) + else: + raise NotImplementedError(f"Unknown strategy: {config.strategy}") + return reward_module + + @register(dispatch_mode=Dispatch.ONE_TO_ALL) + def init_model(self): + # This is used to import external_lib into the huggingface systems + import_external_libs(self.config.model.get("external_lib", None)) + self.reward_module = self._build_model(config=self.config) + + def _forward_micro_batch(self, micro_batch): + if is_cuda_available: + from flash_attn.bert_padding import index_first_axis, pad_input, rearrange, unpad_input + elif is_npu_available: + from transformers.integrations.npu_flash_attention import ( + index_first_axis, + pad_input, + rearrange, + unpad_input, + ) + + from verl.utils.ulysses import gather_outputs_and_unpad, ulysses_pad_and_slice_inputs + + with torch.no_grad(), torch.autocast(device_type=device_name, dtype=torch.bfloat16): + input_ids = micro_batch["input_ids"] + batch_size, seqlen = input_ids.shape + attention_mask = micro_batch["attention_mask"] + position_ids = micro_batch["position_ids"] + if position_ids.dim() == 3: # qwen2vl mrope + position_ids = position_ids.transpose(0, 1) # (bsz, 3, seqlen) -> (3, bsz, seqlen) + + if self.use_remove_padding: + input_ids_rmpad, indices, *_ = unpad_input( + input_ids.unsqueeze(-1), attention_mask + ) # input_ids_rmpad (total_nnz, ...) + input_ids_rmpad = input_ids_rmpad.transpose(0, 1) # (1, total_nnz) + + # unpad the position_ids to align the rotary + if position_ids.dim() == 3: + position_ids_rmpad = ( + index_first_axis(rearrange(position_ids, "c b s ... -> (b s) c ..."), indices) + .transpose(0, 1) + .unsqueeze(1) + ) # (3, bsz, seqlen) -> (3, 1, bsz * seqlen) + else: + position_ids_rmpad = index_first_axis( + rearrange(position_ids.unsqueeze(-1), "b s ... -> (b s) ..."), indices + ).transpose(0, 1) + + # pad and slice the inputs if sp > 1 + if self.ulysses_sequence_parallel_size > 1: + input_ids_rmpad, position_ids_rmpad, pad_size = ulysses_pad_and_slice_inputs( + input_ids_rmpad, position_ids_rmpad, sp_size=self.ulysses_sequence_parallel_size + ) + + # only pass input_ids and position_ids to enable flash_attn_varlen + output = self.reward_module( + input_ids=input_ids_rmpad, attention_mask=None, position_ids=position_ids_rmpad, use_cache=False + ) + reward_rmpad = output.logits + reward_rmpad = reward_rmpad.squeeze(0) # (total_nnz) + + # gather output if sp > 1 + if self.ulysses_sequence_parallel_size > 1: + reward_rmpad = gather_outputs_and_unpad( + reward_rmpad, gather_dim=0, unpad_dim=0, padding_size=pad_size + ) + + # pad it back + rm_score = pad_input(reward_rmpad, indices=indices, batch=batch_size, seqlen=seqlen).squeeze(-1) + else: + output = self.reward_module( + input_ids=input_ids, attention_mask=attention_mask, position_ids=position_ids, use_cache=False + ) + rm_score = output.logits # (batch_size, seq_len, 1) + rm_score = rm_score.squeeze(-1) + + # extract the result of the last valid token + eos_mask_idx = torch.argmax(position_ids * attention_mask, dim=-1) # (bsz,) + rm_score = rm_score[torch.arange(batch_size), eos_mask_idx] + return rm_score + + def _expand_to_token_level(self, data: DataProto, scores: torch.Tensor): + batch_size = data.batch.batch_size[0] + # expand as token_level_reward + attention_mask = data.batch["attention_mask"] + position_ids = data.batch["position_ids"] + response_length = data.batch["responses"].shape[-1] + if position_ids.dim() == 3: # qwen2vl mrope [bs, 3, seq_len] + position_ids = position_ids[:, 0, :] + eos_mask_idx = torch.argmax(position_ids * attention_mask, dim=-1) # (bsz,) + token_level_scores = torch.zeros_like(attention_mask, dtype=scores.dtype) # (bsz, seqlen) + token_level_scores[torch.arange(batch_size), eos_mask_idx] = scores + + # select the response part + token_level_scores = token_level_scores[:, -response_length:] + + return token_level_scores + + def _switch_chat_template(self, data: DataProto): + src_max_length = data.batch["attention_mask"].shape[-1] + + src_tokenizer = self.input_tokenizer + target_tokenizer = self.tokenizer + + rm_input_ids = [] + rm_attention_mask = [] + + for i in range(data.batch.batch_size[0]): + if not isinstance(data.non_tensor_batch["raw_prompt"][i], list | np.ndarray): + raise TypeError( + f"raw_prompt must be a list or numpy array, got {type(data.non_tensor_batch['raw_prompt'][i])}" + ) + + # extract raw prompt + chat: list = list(data.non_tensor_batch["raw_prompt"][i]) + + # extract response + response_ids = data.batch["responses"][i] + response_length = response_ids.shape[-1] + valid_response_length = data.batch["attention_mask"][i][-response_length:].sum() + valid_response_ids = response_ids[:valid_response_length] + + # decode + response = src_tokenizer.decode(valid_response_ids) + # remove bos and eos + response = response.replace(src_tokenizer.eos_token, "") + + chat.append({"role": "assistant", "content": response}) + + prompt_with_chat_template = target_tokenizer.apply_chat_template( + chat, add_generation_prompt=False, tokenize=False + ) + if self.rank == 0 and i == 0: + # for debugging purpose + print(f"Switch template. chat: {prompt_with_chat_template}") + + # the maximum length is actually determined by the reward model itself + max_length = self.config.get("max_length", src_max_length) + if max_length is None: + max_length = src_max_length + + model_inputs = target_tokenizer(prompt_with_chat_template, return_tensors="pt", add_special_tokens=False) + input_ids, attention_mask = verl_F.postprocess_data( + input_ids=model_inputs["input_ids"], + attention_mask=model_inputs["attention_mask"], + max_length=max_length, + pad_token_id=target_tokenizer.pad_token_id, + left_pad=False, # right padding + truncation=self.config.get("truncation", "right"), + ) # truncate from the right + + rm_input_ids.append(input_ids) + rm_attention_mask.append(attention_mask) + + rm_input_ids = torch.cat(rm_input_ids, dim=0) + rm_attention_mask = torch.cat(rm_attention_mask, dim=0) + + rm_position_ids = compute_position_id_with_mask(rm_attention_mask) + + rm_inputs = {"input_ids": rm_input_ids, "attention_mask": rm_attention_mask, "position_ids": rm_position_ids} + + return DataProto.from_dict(rm_inputs) + + @register(dispatch_mode=make_nd_compute_dataproto_dispatch_fn(mesh_name="reward")) + @DistProfiler.annotate(color="brown") + def compute_rm_score(self, data: DataProto): + import itertools + + from verl.utils.seqlen_balancing import get_reverse_idx, rearrange_micro_batches + + # Support all hardwares + data = data.to(get_device_id()) + if self._do_switch_chat_template: + rm_data = self._switch_chat_template(data) + else: + rm_input_ids = data.batch["input_ids"] + rm_attention_mask = data.batch["attention_mask"] + rm_position_ids = data.batch["position_ids"] + rm_inputs = { + "input_ids": rm_input_ids, + "attention_mask": rm_attention_mask, + "position_ids": rm_position_ids, + } + rm_data = DataProto.from_dict(rm_inputs) + + # Support all hardwares + rm_data = rm_data.to(get_device_id()) + + # perform forward computation + with self.ulysses_sharding_manager: + use_dynamic_bsz = self.config.use_dynamic_bsz + if use_dynamic_bsz: + max_token_len = self.config.forward_max_token_len_per_gpu * self.ulysses_sequence_parallel_size + micro_batches, indices = rearrange_micro_batches(batch=rm_data.batch, max_token_len=max_token_len) + else: + micro_batches = rm_data.batch.split(self.config.micro_batch_size_per_gpu) + output = [] + for micro_batch in micro_batches: + rm_score = self._forward_micro_batch(micro_batch) + output.append(rm_score) + scores = torch.cat(output, dim=0) # (batch_size) + + if use_dynamic_bsz: + indices = list(itertools.chain.from_iterable(indices)) + assert len(indices) == scores.size(0), f"{len(indices)} vs. {scores.size()}" + revert_indices = torch.tensor(get_reverse_idx(indices), dtype=torch.long) + scores = scores[revert_indices] + + token_level_scores = self._expand_to_token_level(data, scores) + # Note that this is only the scores, may not be the final rewards used to train RL + output = DataProto.from_dict(tensors={"rm_scores": token_level_scores}) + + # https://pytorch.org/docs/stable/notes/fsdp.html#fsdp-notes + # unshard the root FSDP module + if self.world_size > 1 and fsdp_version(self.reward_module) == 1: + self.reward_module._handle.reshard(True) + + output = output.to("cpu") + return output + + +# ================================= Async related workers ================================= +class AsyncActorRolloutRefWorker(ActorRolloutRefWorker): + def _build_rollout(self, trust_remote_code=False): + rollout_worker, rollout_sharding_manager = super()._build_rollout(trust_remote_code) + + # NOTE: rollout is not actually initialized here, it's deferred + # to be initialized by AsyncvLLMServer. + + self.vllm_tp_size = self.config.rollout.tensor_model_parallel_size + self.vllm_dp_rank = int(os.environ["RANK"]) // self.vllm_tp_size + self.vllm_tp_rank = int(os.environ["RANK"]) % self.vllm_tp_size + + # used for sleep/wake_up + rollout_worker.rollout.sharding_manager = rollout_sharding_manager + + return rollout_worker, rollout_sharding_manager + + @register(dispatch_mode=Dispatch.DP_COMPUTE_PROTO) + def generate_sequences(self, prompts: DataProto): + raise NotImplementedError("AsyncActorRolloutRefWorker does not support generate_sequences") + + # ============================ vLLM related ============================ + + @register(dispatch_mode=Dispatch.DIRECT_ROLLOUT_METHOD) + def execute_method(self, method: str | bytes, *args, **kwargs): + """Called by ExternalRayDistributedExecutor collective_rpc.""" + return self.rollout.execute_method(method, *args, **kwargs) + + @register(dispatch_mode=Dispatch.DIRECT_ROLLOUT_METHOD) + def get_zeromq_address(self): + return self.rollout.get_zeromq_address() + + # ============================ SGLang related ============================ + + @register(dispatch_mode=Dispatch.DIRECT_ROLLOUT_METHOD, blocking=False) + async def chat_completion(self, json_request): + ret = await self.rollout.chat_completion(json_request) + return ret + + @register(dispatch_mode=Dispatch.DIRECT_ROLLOUT_METHOD, blocking=False) + async def generate( + self, + prompt_ids: list[int], + sampling_params: dict[str, Any], + request_id: str, + image_data: Optional[list[Any]] = None, + ) -> list[int]: + ret = await self.rollout.generate(prompt_ids, sampling_params, request_id, image_data=image_data) + return ret + + @register(dispatch_mode=Dispatch.DIRECT_ROLLOUT_METHOD) + async def wake_up(self): + await self.rollout.wake_up() + # return something to block the caller + return True + + @register(dispatch_mode=Dispatch.DIRECT_ROLLOUT_METHOD) + async def sleep(self): + await self.rollout.sleep() + # return something to block the caller + return True diff --git "a/openseek/competition/pz/yuanboyang/yuanboyang-\345\206\263\350\265\233\346\212\200\346\234\257\346\212\245\345\221\212.pdf" "b/openseek/competition/pz/yuanboyang/yuanboyang-\345\206\263\350\265\233\346\212\200\346\234\257\346\212\245\345\221\212.pdf" new file mode 100644 index 0000000..7564917 Binary files /dev/null and "b/openseek/competition/pz/yuanboyang/yuanboyang-\345\206\263\350\265\233\346\212\200\346\234\257\346\212\245\345\221\212.pdf" differ diff --git "a/openseek/competition/pz/yuanboyang/yuanboyang-\345\210\235\350\265\233\346\212\200\346\234\257\346\212\245\345\221\212.pdf" "b/openseek/competition/pz/yuanboyang/yuanboyang-\345\210\235\350\265\233\346\212\200\346\234\257\346\212\245\345\221\212.pdf" new file mode 100644 index 0000000..f8e024a Binary files /dev/null and "b/openseek/competition/pz/yuanboyang/yuanboyang-\345\210\235\350\265\233\346\212\200\346\234\257\346\212\245\345\221\212.pdf" differ