Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions 3.test_cases/pytorch/verl/kubernetes/rlvr/ray-expose.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ if [ "$?" == 0 ]; then
echo "Ray is exposed on port ${RAY_DASHBOARD_PORT}"
else
PID_FILE="$HOME/port-forward.pid"
export SERVICEHEAD=$(kubectl get service | grep head-svc | awk '{print $1}' | head -n 1)
export SERVICEHEAD=$(kubectl get service -n "${RAY_NAMESPACE}" | grep head-svc | awk '{print $1}' | head -n 1)

kubectl port-forward --address 0.0.0.0 service/${SERVICEHEAD} ${RAY_DASHBOARD_PORT}:8265 > /dev/null 2>&1 &
kubectl port-forward -n "${RAY_NAMESPACE}" --address 0.0.0.0 service/${SERVICEHEAD} ${RAY_DASHBOARD_PORT}:8265 > /dev/null 2>&1 &
echo $! > "$PID_FILE"
echo "Port-forward started, PID $! saved in $PID_FILE"
sleep 1
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
#!/usr/bin/env bash
set -xeuo pipefail

# =============================================================================
# Optimized GRPO recipe for p5en.48xlarge (8x H200 per node)
#
# Key optimizations over run_grpo_configurable.sh:
# 1. Dynamic batching (use_dynamic_bsz=True) — eliminates padding waste in
# actor updates. Typical GSM8K sequences average ~317 tokens vs 1024 max,
# so fixed-size batching wastes ~69% of compute on padding.
# 2. FSDP2 (strategy=fsdp2) — PyTorch's next-gen fully sharded data
# parallelism with per-parameter sharding. ~7% lower memory, ~1.5%
# throughput gain over FSDP1.
# 3. Forward prefetch (forward_prefetch=True) — overlaps FSDP all-gather
# with computation for pipelined communication/compute.
# 4. Higher vLLM KV cache (gpu_memory_utilization=0.7) — more cache for
# faster generation batching.
#
# IMPORTANT: PYTORCH_CUDA_ALLOC_CONF must be empty in the runtime env to
# avoid conflicts with vLLM v1's CuMemAllocator. verl internally toggles
# expandable_segments on/off at the training/inference boundary.
#
# Tested with: verl v0.6.1, 4x p5en.48xlarge (32 H200 GPUs), Qwen3-8B
# =============================================================================

# Project configuration
project_name='GRPO'
exp_name="GRPO-${MODEL_NAME}-optimized"

# GRPO Algorithm parameters
adv_estimator=grpo
use_kl_in_reward=False
use_kl_loss=True
kl_loss_coef=0.001
kl_loss_type=low_var_kl
entropy_coeff=0

# Token length configuration
max_prompt_length=512
max_response_length=512
filter_overlong_prompts=True
truncation='error'

# Training configuration
train_prompt_bsz=${TRAIN_BATCH_SIZE:-64}
n_resp_per_prompt=${N_RESP_PER_PROMPT:-8}
train_prompt_mini_bsz=16 # Must be <= train_prompt_bsz

# Dynamic batching: pack sequences by total token count instead of fixed
# micro-batch size. ppo_max_token_len_per_gpu should be >= 2x the max total
# sequence length (2 * 1024 = 2048). Setting 4096 provides headroom.
# When use_dynamic_bsz=True, ppo_micro_batch_size_per_gpu is ignored.
use_dynamic_bsz=True
ppo_max_token_len_per_gpu=4096

# FSDP2 + forward prefetch
strategy=fsdp2
forward_prefetch=True

# Ray configuration from env_vars
RAY_ADDRESS=${RAY_ADDRESS:-"http://localhost:8265"}

# Cluster configuration from env_vars
NNODES=${NUM_NODES:-4}
GPUS_PER_NODE=${NUM_GPU_PER_NODE:-8}

# Model and data paths from env_vars
MODEL_NAME=${MODEL_NAME:-"Qwen3-8B"}
MODEL_PATH=${MODEL_PATH:-"Qwen/Qwen3-8B"}
RAY_DATA_HOME=${RAY_DATA_HOME:-"/fsx/verl"}
CKPTS_DIR="${RAY_DATA_HOME}/ckpts/${project_name}/${exp_name}"

# Data files - using GSM8K dataset
TRAIN_FILE="${RAY_DATA_HOME}/data/gsm8k/train.parquet"
TEST_FILE="${RAY_DATA_HOME}/data/gsm8k/test.parquet"

# Performance parameters
gen_tp=2
log_prob_micro_bsz_per_gpu=32
gpu_memory_utilization=0.7 # Higher than default 0.6 for more KV cache

# Memory optimization
param_offload=False
optimizer_offload=False
ref_param_offload=True

# Checkpoint configuration
save_freq=20
test_freq=5

# Print configuration for verification
echo "=== GRPO Optimized Training Configuration ==="
echo "Project: ${project_name}"
echo "Experiment: ${exp_name}"
echo "Model: ${MODEL_NAME} (${MODEL_PATH})"
echo "Nodes: ${NNODES}"
echo "GPUs per node: ${GPUS_PER_NODE}"
echo "Total GPUs: $((NNODES * GPUS_PER_NODE))"
echo "Data home: ${RAY_DATA_HOME}"
echo "Checkpoints: ${CKPTS_DIR}"
echo "Ray address: ${RAY_ADDRESS}"
echo "--- Optimizations ---"
echo "Dynamic batching: ${use_dynamic_bsz} (max_token_len=${ppo_max_token_len_per_gpu})"
echo "Strategy: ${strategy}"
echo "Forward prefetch: ${forward_prefetch}"
echo "GPU memory utilization: ${gpu_memory_utilization}"
echo "================================================"

# Submit Ray job
# NOTE: PYTORCH_CUDA_ALLOC_CONF="" overrides the pod-level env var to prevent
# vLLM v1 crash (AssertionError: Expandable segments not compatible with memory pool).
# verl manages expandable_segments internally at the training/inference boundary.
ray job submit --no-wait \
--address "${RAY_ADDRESS}" \
--runtime-env-json '{"env_vars": {"NCCL_DEBUG": "INFO", "TOKENIZERS_PARALLELISM": "false", "HYDRA_FULL_ERROR": "1", "PYTORCH_CUDA_ALLOC_CONF": ""}}' \
-- python3 -m verl.trainer.main_ppo \
algorithm.adv_estimator=${adv_estimator} \
data.train_files="${TRAIN_FILE}" \
data.val_files="${TEST_FILE}" \
data.prompt_key=question \
data.train_batch_size=${train_prompt_bsz} \
data.max_prompt_length=${max_prompt_length} \
data.max_response_length=${max_response_length} \
data.filter_overlong_prompts=${filter_overlong_prompts} \
data.truncation=${truncation} \
actor_rollout_ref.model.path="${MODEL_PATH}" \
actor_rollout_ref.model.use_remove_padding=True \
actor_rollout_ref.model.enable_gradient_checkpointing=True \
actor_rollout_ref.actor.optim.lr=1e-6 \
actor_rollout_ref.actor.ppo_mini_batch_size=${train_prompt_mini_bsz} \
actor_rollout_ref.actor.use_dynamic_bsz=${use_dynamic_bsz} \
actor_rollout_ref.actor.ppo_max_token_len_per_gpu=${ppo_max_token_len_per_gpu} \
actor_rollout_ref.actor.use_kl_loss=${use_kl_loss} \
actor_rollout_ref.actor.kl_loss_coef=${kl_loss_coef} \
actor_rollout_ref.actor.kl_loss_type=${kl_loss_type} \
actor_rollout_ref.actor.entropy_coeff=${entropy_coeff} \
actor_rollout_ref.actor.fsdp_config.param_offload=${param_offload} \
actor_rollout_ref.actor.fsdp_config.optimizer_offload=${optimizer_offload} \
actor_rollout_ref.actor.fsdp_config.forward_prefetch=${forward_prefetch} \
actor_rollout_ref.actor.strategy=${strategy} \
actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=${log_prob_micro_bsz_per_gpu} \
actor_rollout_ref.rollout.tensor_model_parallel_size=${gen_tp} \
actor_rollout_ref.rollout.name=vllm \
actor_rollout_ref.rollout.gpu_memory_utilization=${gpu_memory_utilization} \
actor_rollout_ref.rollout.n=${n_resp_per_prompt} \
actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=${log_prob_micro_bsz_per_gpu} \
actor_rollout_ref.ref.fsdp_config.param_offload=${ref_param_offload} \
algorithm.use_kl_in_reward=${use_kl_in_reward} \
trainer.critic_warmup=0 \
'trainer.logger=["console"]' \
trainer.project_name="${project_name}" \
trainer.experiment_name="${exp_name}" \
trainer.n_gpus_per_node=${GPUS_PER_NODE} \
trainer.nnodes=${NNODES} \
trainer.default_local_dir="${CKPTS_DIR}" \
trainer.save_freq=${save_freq} \
trainer.test_freq=${test_freq} \
trainer.total_epochs=2
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ export NUM_NODES=4 # Single source of truth for number of nod
export NUM_GPU_PER_NODE=8
export NUM_EFA_PER_NODE=16

# Worker pod resource requests/limits — adjust for your instance type
# p5en.48xlarge: 192 vCPU, 2 TiB RAM → use 48 CPU, 450Gi per worker (4 workers/node)
# p5.48xlarge: 192 vCPU, 2 TiB RAM → use 48 CPU, 450Gi per worker
export WORKER_CPU=48
export WORKER_MEMORY="450Gi"


# Ray configs
export MODEL_NAME="Qwen3-8B"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,13 @@ patches:
value: ${HF_TOKEN}
resources:
limits:
cpu: ${WORKER_CPU}
memory: ${WORKER_MEMORY}
nvidia.com/gpu: ${NUM_GPU_PER_NODE}
vpc.amazonaws.com/efa: ${NUM_EFA_PER_NODE}
requests:
cpu: ${WORKER_CPU}
memory: ${WORKER_MEMORY}
nvidia.com/gpu: ${NUM_GPU_PER_NODE}
vpc.amazonaws.com/efa: ${NUM_EFA_PER_NODE}
volumeMounts:
Expand Down
24 changes: 17 additions & 7 deletions 3.test_cases/pytorch/verl/kubernetes/rlvr/setup/load_data_grpo.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ DATA_DIR="${RAY_DATA_HOME}/data/gsm8k"
echo "Creating data directory: ${DATA_DIR}"

# Get the head pod name
HEAD_POD=$(kubectl get pods -l ray.io/node-type=head -o jsonpath='{.items[0].metadata.name}')
HEAD_POD=$(kubectl get pods -n "${RAY_NAMESPACE}" -l ray.io/node-type=head -o jsonpath='{.items[0].metadata.name}')

if [ -z "$HEAD_POD" ]; then
echo "Error: Could not find Ray head pod. Is your cluster running?"
Expand Down Expand Up @@ -45,10 +45,20 @@ except Exception as e:
print("Adding VERL-required columns (data_source and reward_model)...")

def add_verl_columns(example):
"""Add required columns for VERL reward computation"""
"""Add required columns for VERL reward computation.

IMPORTANT: verl's RLHFDataset._build_messages() expects the prompt_key
column to contain a list of chat message dicts, NOT a raw string.
e.g. [{"role": "user", "content": "..."}]

If the column contains a raw string, apply_chat_template will produce
only the generation prompt tokens (e.g. '<|im_start|>assistant\\n' = 3 tokens)
and the model will never see the actual question, resulting in 0.0 reward.
"""
ground_truth = extract_answer(example['answer'])
return {
**example,
'question': [{'role': 'user', 'content': example['question']}],
'answer': example['answer'],
'data_source': 'openai/gsm8k',
'reward_model': {'ground_truth': ground_truth}
}
Expand Down Expand Up @@ -82,17 +92,17 @@ EOF

# Copy script to pod
echo "Copying download script to pod..."
kubectl cp /tmp/download_gsm8k.py ${HEAD_POD}:/tmp/download_gsm8k.py
kubectl cp /tmp/download_gsm8k.py "${RAY_NAMESPACE}/${HEAD_POD}:/tmp/download_gsm8k.py"

# Execute the script in the pod
echo "Downloading GSM8K data..."
kubectl exec ${HEAD_POD} -- bash -c "export DATA_DIR=${DATA_DIR}"
kubectl exec ${HEAD_POD} -- python3 /tmp/download_gsm8k.py
kubectl exec -n "${RAY_NAMESPACE}" ${HEAD_POD} -- bash -c "export DATA_DIR=${DATA_DIR}"
kubectl exec -n "${RAY_NAMESPACE}" ${HEAD_POD} -- python3 /tmp/download_gsm8k.py


# Verify the files exist
echo "Verifying downloaded files..."
kubectl exec ${HEAD_POD} -- ls -lh ${DATA_DIR}/
kubectl exec -n "${RAY_NAMESPACE}" ${HEAD_POD} -- ls -lh ${DATA_DIR}/

echo "GSM8K data download complete!"
echo "Data location: ${DATA_DIR}"
Expand Down
88 changes: 75 additions & 13 deletions 3.test_cases/pytorch/verl/kubernetes/rlvr/setup/raycluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,34 @@ spec:
- name: ray-worker
image: PLACEHOLDER_IMAGE ## IMAGE: Here you may choose which image your head node will run
env:
## EFA CONFIGURATION
- name: FI_PROVIDER
value: "efa"
- name: FI_EFA_USE_DEVICE_RDMA
value: "1"
- name: FI_EFA_FORK_SAFE
value: "1"
- name: NCCL_PROTO
value: "simple"
## NCCL CONFIGURATION
- name: NCCL_SOCKET_IFNAME
value: "^docker,lo,veth"
- name: NCCL_BUFFSIZE
value: "8388608"
- name: NCCL_P2P_NET_CHUNKSIZE
value: "8388608"
- name: NCCL_TUNER_PLUGIN
value: "/nccl-links/lib64/libnccl-tuner-aws-ofi.so"
- name: NCCL_DEBUG
value: "INFO"
- name: LD_LIBRARY_PATH
value: "/nccl-links/lib64:/host-ofi-nccl/lib64:/host-efa/lib64:/usr/local/lib/python3.12/dist-packages/torch/lib:/usr/local/lib:/usr/lib/x86_64-linux-gnu"
## VLLM AND PYTORCH
- name: VLLM_USE_V1
value: "1"
- name: TOKENIZERS_PARALLELISM
value: "true"
- name: PYTORCH_CUDA_ALLOC_CONF
value: "expandable_segments:True"
## TORCH NCCL
- name: TORCH_NCCL_DUMP_ON_TIMEOUT
value: "1"
- name: TORCH_NCCL_ASYNC_ERROR_HANDLING
Expand All @@ -142,32 +158,78 @@ spec:
command: ["/bin/sh","-c","ray stop"]
resources:
limits: ## LIMITS: Set resource limits for your worker pods
cpu: 16
memory: 200Gi
cpu: PLACEHOLDER_WORKER_CPU
memory: PLACEHOLDER_WORKER_MEMORY
nvidia.com/gpu: PLACEHOLDER_NUM_GPU_PER_NODE
vpc.amazonaws.com/efa: PLACEHOLDER_NUM_EFA_PER_NODE
requests: ## REQUESTS: Set resource requests for your worker pods
cpu: 16
memory: 200Gi
cpu: PLACEHOLDER_WORKER_CPU
memory: PLACEHOLDER_WORKER_MEMORY
nvidia.com/gpu: PLACEHOLDER_NUM_GPU_PER_NODE
vpc.amazonaws.com/efa: PLACEHOLDER_NUM_EFA_PER_NODE
ports:
- containerPort: 8080
name: metrics
volumeMounts: ## VOLUMEMOUNTS: Mount your S3 CSI EKS Add-On to worker pods
volumeMounts: ## VOLUMEMOUNTS: Mount EFA/NCCL host libs and shared storage
- name: host-efa
mountPath: /host-efa
readOnly: true
- name: host-ofi-nccl
mountPath: /host-ofi-nccl
readOnly: true
- name: nccl-links
mountPath: /nccl-links
- name: ray-logs
mountPath: /tmp/ray
- name: fsx-storage
mountPath: /fsx
# - name: checkpoint-logs
# mountPath: /var/log/sagemaker_checkpointing
- name: dshm
mountPath: /dev/shm
## INIT CONTAINER: Create NCCL symlinks from host OFI-NCCL libs
## Required when the Docker image does not bundle EFA/OFI-NCCL libraries.
## The host nodes (e.g. p5en) have these at /opt/amazon/ofi-nccl but with
## different filenames than what NCCL expects. The init container creates
## the expected symlinks in a shared emptyDir volume.
initContainers:
- name: nccl-symlinks
image: busybox:latest
command: ["sh", "-c"]
args:
- |
mkdir -p /nccl-links/lib64
ln -sf /host-ofi-nccl/lib64/libnccl-net-ofi.so /nccl-links/lib64/libnccl-net-aws-ofi.so
ln -sf /host-ofi-nccl/lib64/libnccl-ofi-tuner.so /nccl-links/lib64/libnccl-tuner-aws-ofi.so
ls -la /nccl-links/lib64/
echo "NCCL symlinks created"
volumeMounts:
- name: nccl-links
mountPath: /nccl-links
- name: host-ofi-nccl
mountPath: /host-ofi-nccl
readOnly: true
volumes:
## HOST EFA AND OFI-NCCL LIBRARIES
## These are pre-installed on EFA-enabled nodes (p5, p5e, p5en, etc.)
## at /opt/amazon/efa and /opt/amazon/ofi-nccl. Mount them into the pod
## so NCCL can use EFA for inter-node communication.
- name: host-efa
hostPath:
path: /opt/amazon/efa
type: Directory
- name: host-ofi-nccl
hostPath:
path: /opt/amazon/ofi-nccl
type: Directory
- name: nccl-links
emptyDir: {}
- name: fsx-storage
persistentVolumeClaim:
claimName: fsx-claim
- name: ray-logs
emptyDir: {}
# - name: checkpoint-logs
# hostPath:
# path: /var/logs/sagemaker_checkpointing
# type: DirectoryOrCreate
## SHARED MEMORY: Required for PyTorch DataLoader shared memory and
## NCCL shared-memory transport. Size should be generous for large models.
- name: dshm
emptyDir:
medium: Memory
sizeLimit: 200Gi
Loading