From d24acc67f48f06bcfc6a3c9b66501964048d5499 Mon Sep 17 00:00:00 2001 From: Fiona-Waters Date: Thu, 28 May 2026 16:30:30 +0100 Subject: [PATCH] Add GRPO on Ray guided example with README - Add examples/fine-tuning/grpo_ray/ with multi-node distributed GRPO training via CodeFlare SDK RayJob + ManagedClusterConfig - Notebook demonstrates verl backend with FSDP + vLLM across 2 nodes - Update fine-tuning README to include Ray as a fourth execution mode Signed-off-by: Fiona-Waters Co-authored-by: Cursor --- examples/fine-tuning/README.md | 34 +- examples/fine-tuning/grpo_ray/README.md | 95 +++++ .../grpo_ray/grpo_lora-rayjob.ipynb | 395 ++++++++++++++++++ 3 files changed, 522 insertions(+), 2 deletions(-) create mode 100644 examples/fine-tuning/grpo_ray/README.md create mode 100644 examples/fine-tuning/grpo_ray/grpo_lora-rayjob.ipynb diff --git a/examples/fine-tuning/README.md b/examples/fine-tuning/README.md index 5caa5ddf..c8206775 100644 --- a/examples/fine-tuning/README.md +++ b/examples/fine-tuning/README.md @@ -14,11 +14,12 @@ For detailed algorithm documentation and configuration options, see the upstream ## Execution Modes -There are three ways to run fine tuning examples: +There are four ways to run fine tuning examples: 1. **Interactive (single node fine tuning)** 2. **Distributed (distributed fine tuning with Kubeflow Trainer)** -3. **Pipeline mode (automated training, model evaluation, and registration with Kubeflow Pipelines)** +3. **Distributed on Ray (distributed fine tuning with KubeRay and CodeFlare SDK)** +4. **Pipeline mode (automated training, model evaluation, and registration with Kubeflow Pipelines)** ### Interactive (single node fine tuning) @@ -96,6 +97,35 @@ Training is offloaded to **dedicated training pods** managed by **Kubeflow Train --- +### Distributed on Ray (distributed fine tuning with KubeRay) + +**What it is** + +Training is offloaded to a **Ray cluster** managed by **KubeRay**, submitted via **CodeFlare SDK**: + +- **Multi-GPU training** using Ray-native distribution (FSDP, Ray Train `TorchTrainer`) +- **Short-lived clusters** — the SDK creates a RayCluster for the job and tears it down on completion +- **Integration with Kueue** for resource queueing and scheduling +- **Ray Dashboard** visibility for job monitoring and debugging + +**Recommended for** + +- **GRPO/RLVR training** with the verl backend (Ray-native, multi-GPU) +- Teams already using Ray for distributed workloads +- Workloads that benefit from Ray's actor-based distribution model + +**Resource considerations** + +- **GPU(s) required** on the Ray head pod (verl uses `STRICT_PACK` — all GPUs co-located) +- **No shared PVC required** — the RayCluster is ephemeral; persist results to S3 or external storage +- The Workbench only submits and monitors the job + +**Learn more** + +- [GRPO fine-tuning on Ray](grpo_ray/README.md) + +--- + ### Pipeline Mode (Automated Workflows) **What it is** diff --git a/examples/fine-tuning/grpo_ray/README.md b/examples/fine-tuning/grpo_ray/README.md new file mode 100644 index 00000000..81adbb98 --- /dev/null +++ b/examples/fine-tuning/grpo_ray/README.md @@ -0,0 +1,95 @@ +# GRPO Fine-Tuning on Ray with Training Hub + +This example demonstrates how to run Training Hub's [GRPO (Group Relative Policy Optimization)](https://github.com/Red-Hat-AI-Innovation-Team/training_hub?tab=readme-ov-file#grpo) on Ray using CodeFlare SDK and KubeRay on Red Hat OpenShift AI. + +## What is GRPO? + +GRPO is a reinforcement learning from verifiable rewards (RLVR) algorithm that improves a model's outputs by comparing groups of responses and reinforcing the better ones: + +- Generates multiple candidate responses per prompt +- Scores them with a reward function (e.g. tool-call correctness) +- Uses the group's relative ranking to compute advantage signals +- Updates LoRA adapter weights via policy gradient with group normalization + +Each training iteration has two phases: + +1. **Rollout phase** — vLLM generates candidate responses and a reward function scores them +2. **Train phase** — The policy network updates LoRA adapter weights using the advantage signals + +The verl backend orchestrates this using Ray-native actors with FSDP for the policy network and vLLM for inference, distributing across multiple GPUs. + +### Training Task: Tool-Call Verification + +The example uses the [Agent-Ark/Toucan-1.5M](https://huggingface.co/datasets/Agent-Ark/Toucan-1.5M) dataset, which contains tool-calling conversations. The reward function verifies that the model produces syntactically correct tool calls with the expected function name and arguments. + +## RHOAI Compatibility + +This example is compatible with RHOAI version 3.5. + +> [!NOTE] +> The Ray runtime image used in this example is **tested and verified** only — it is not FIPS compliant and is not productised. + +## Requirements + +- An OpenShift cluster with OpenShift AI (RHOAI 3.5) installed: + - The `dashboard`, `workbenches`, and `ray` components enabled +- Worker node(s) with NVIDIA GPUs (Ampere-based or newer, 80GB VRAM recommended per GPU) +- `codeflare-sdk` installed in the workbench (pre-installed in RHOAI workbench images) + +## Hardware Requirements + +### Workbench Requirements + +The workbench only submits and monitors the RayJob — no GPU is needed on the workbench itself unless you want to test the trained model afterward. + +| Use Case | GPU | CPU | Memory | +|----------|-----|-----|--------| +| Job submission and monitoring | None | 2 cores | 8Gi | +| Job submission + model evaluation after training | 1× GPU (40GB+ VRAM) | 8 cores | 64Gi | + +### Ray Cluster Requirements + +verl distributes training across multiple nodes using Ray-native actors with FSDP for the policy network and vLLM for inference generation. + +The default configuration uses 1 head node + 1 worker node, each with 2 GPUs (4 GPUs total). + +| Component | GPU | GPU Type | CPU | Memory | +|-----------|-----|----------|-----|--------| +| Head pod | 2× GPU | NVIDIA A100-80GB or H100 | 8 cores | 64Gi | +| Worker pod (×1) | 2× GPU | NVIDIA A100-80GB or H100 | 8 cores | 192Gi request / 256Gi limit | + +> [!NOTE] +> +> - Memory requirements scale with model size and number of GPUs. The above values suit the example configuration (Qwen3-4B with LoRA, FSDP across 4× A100-80GB, vLLM with `gpu_memory_utilization=0.30`). +> - Scale by adding more worker pods or increasing GPUs per node. + +## GRPO-specific Considerations + +- **`gpu_memory_utilization`**: Controls how much GPU memory vLLM reserves for inference. The default `0.30` leaves the majority for FSDP training. Adjust based on model size and available VRAM. +- **HuggingFace token**: Not strictly required for public models (e.g. Qwen3-4B) but recommended to avoid rate limits. Pass as `HF_TOKEN` environment variable. + +## Setup + +### Prerequisites + +1. Log in to your OpenShift cluster (`oc login`) +2. Ensure the KubeRay operator is running in your cluster +3. Verify GPU nodes are available: `oc get nodes -l nvidia.com/gpu.present=true` + +### Running the Example + +From a RHOAI workbench: + +1. Clone this repository and navigate to `examples/fine-tuning/grpo_ray/`: + + ```bash + git clone https://github.com/red-hat-data-services/red-hat-ai-examples.git + ``` + +2. Open [`grpo_lora-rayjob.ipynb`](./grpo_lora-rayjob.ipynb) +3. Follow the notebook instructions — update the namespace, image, and authentication details for your cluster + +> [!NOTE] +> You will need a Hugging Face token if using gated models. +> Set the `HF_TOKEN` variable in the configuration section. +> You can skip the token for non-gated models like Qwen3-4B. diff --git a/examples/fine-tuning/grpo_ray/grpo_lora-rayjob.ipynb b/examples/fine-tuning/grpo_ray/grpo_lora-rayjob.ipynb new file mode 100644 index 00000000..ae6474b9 --- /dev/null +++ b/examples/fine-tuning/grpo_ray/grpo_lora-rayjob.ipynb @@ -0,0 +1,395 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# LoRA GRPO Fine-Tuning on Ray with Training Hub on OpenShift AI\n", + "\n", + "This notebook runs multi-node distributed **GRPO** training using Training Hub's verl backend on a Ray cluster. We train [Qwen/Qwen3-4B](https://huggingface.co/Qwen/Qwen3-4B) with LoRA to produce correct tool calls, using the [Agent-Ark/Toucan-1.5M](https://huggingface.co/datasets/Agent-Ark/Toucan-1.5M) dataset.\n", + "\n", + "The verl backend distributes training across nodes using FSDP for the policy network and vLLM for inference. CodeFlare SDK submits the job as a `RayJob` with a `ManagedClusterConfig` — the SDK creates a short-lived RayCluster, runs the job, and tears everything down on completion.\n", + "\n", + "For more details on GRPO and hardware requirements, see the [README](./README.md)." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Setup\n", + "\n", + "Import the required dependencies." + ] + }, + { + "cell_type": "code", + "metadata": {}, + "source": [ + "from codeflare_sdk import ManagedClusterConfig, RayJob" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Authenticate to your OpenShift Cluster\n", + "\n", + "Log in via `oc login`. Get your token from `oc whoami -t` or the OpenShift console (Copy login command).\n", + "\n", + "The `--insecure-skip-tls-verify` flag is needed for clusters with self-signed certificates.\n", + "\n", + "> **Note:** The CodeFlare SDK's `RayJob` path reads authentication from the local kubeconfig file.\n", + "> Using `oc login` ensures the kubeconfig is written correctly for the SDK to pick up." + ] + }, + { + "cell_type": "code", + "metadata": {}, + "source": [ + "!oc login --token= --server= --insecure-skip-tls-verify" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 1. Configure Training Parameters\n", + "\n", + "Key parameters:\n", + "\n", + "### GRPO Parameters\n", + "- **num_iterations**: Number of GRPO iterations (each = rollout + train phase)\n", + "- **tasks_per_iteration**: Prompts sampled per iteration for rollout\n", + "- **group_size**: Responses generated per prompt for comparison\n", + "- **n_train**: Total training examples from dataset\n", + "\n", + "### LoRA Parameters\n", + "- **lora_r**: Rank of the low-rank matrices (higher = more capacity, more memory)\n", + "- **lora_alpha**: Scaling factor\n", + "\n", + "### vLLM Parameters\n", + "- **gpu_memory_utilization**: Fraction of GPU memory reserved for vLLM inference (remainder used by FSDP training)\n", + "\n", + "### Cluster Configuration\n", + "- **NUM_WORKERS**: Number of Ray worker nodes (training is distributed across head + workers)\n", + "- **N_GPUS_PER_NODE**: GPUs per node (head and each worker)\n", + "- **IMAGE**: Ray runtime image with Training Hub, verl, and vLLM pre-installed" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "source": [ + "# Cluster configuration\n", + "NAMESPACE = \"\" # Replace with your namespace\n", + "# TODO: Update image reference once Konflux build succeeds\n", + "IMAGE = \"quay.io/rh_ee_fwaters/ray-grpo:strat1414\"\n", + "NUM_WORKERS = 1 # Number of Ray worker nodes\n", + "N_GPUS_PER_NODE = 2 # GPUs per node (head + each worker)\n", + "HF_TOKEN = \"\" # Optional: set your Hugging Face token (huggingface.co/settings/tokens)\n", + "\n", + "# Model and dataset\n", + "MODEL_PATH = \"Qwen/Qwen3-4B\"\n", + "DATA_PATH = \"Agent-Ark/Toucan-1.5M\"\n", + "OUTPUT_DIR = \"/tmp/grpo-output\"\n", + "\n", + "# GRPO hyperparameters\n", + "NUM_ITERATIONS = 5\n", + "TASKS_PER_ITERATION = 10\n", + "GROUP_SIZE = 4\n", + "N_TRAIN = 200\n", + "N_VAL = 10\n", + "LEARNING_RATE = 1e-5\n", + "\n", + "# LoRA configuration\n", + "LORA_R = 16\n", + "LORA_ALPHA = 8\n", + "\n", + "# vLLM configuration\n", + "GPU_MEMORY_UTILIZATION = 0.30" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 2. Configure the Ray Cluster\n", + "\n", + "Configure the multi-node Ray cluster for distributed GRPO training. verl distributes\n", + "training across nodes using FSDP for the policy network and vLLM for inference.\n", + "\n", + "The default configuration uses 1 head node + 1 worker node, each with 2 GPUs\n", + "(4 GPUs total across 2 nodes)." + ] + }, + { + "cell_type": "code", + "metadata": {}, + "source": [ + "env_vars = {}\n", + "if HF_TOKEN:\n", + " env_vars[\"HF_TOKEN\"] = HF_TOKEN\n", + " env_vars[\"HUGGING_FACE_HUB_TOKEN\"] = HF_TOKEN\n", + "\n", + "cluster_config = ManagedClusterConfig(\n", + " namespace=NAMESPACE,\n", + " image=IMAGE,\n", + " head_cpu_requests=4,\n", + " head_cpu_limits=8,\n", + " head_memory_requests=64,\n", + " head_memory_limits=64,\n", + " head_accelerators={\"nvidia.com/gpu\": N_GPUS_PER_NODE},\n", + " num_workers=NUM_WORKERS,\n", + " worker_cpu_requests=4,\n", + " worker_cpu_limits=8,\n", + " worker_memory_requests=192,\n", + " worker_memory_limits=256,\n", + " worker_accelerators={\"nvidia.com/gpu\": N_GPUS_PER_NODE},\n", + " envs=env_vars,\n", + ")" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 3. Build the Training Entrypoint\n", + "\n", + "The entrypoint calls `training_hub.lora_grpo()` with the verl backend.\n", + "Training Hub handles all verl orchestration internally — setting up Ray actors,\n", + "configuring FSDP, launching vLLM, and running the GRPO training loop." + ] + }, + { + "cell_type": "code", + "metadata": {}, + "source": [ + "TOTAL_GPUS = N_GPUS_PER_NODE * (1 + NUM_WORKERS) # head + workers\n", + "\n", + "entrypoint = f\"\"\"python -c \"\n", + "from training_hub import lora_grpo\n", + "\n", + "result = lora_grpo(\n", + " model_path='{MODEL_PATH}',\n", + " data_path='{DATA_PATH}',\n", + " ckpt_output_dir='{OUTPUT_DIR}',\n", + " backend='verl',\n", + " n_gpus={TOTAL_GPUS},\n", + " num_iterations={NUM_ITERATIONS},\n", + " tasks_per_iteration={TASKS_PER_ITERATION},\n", + " group_size={GROUP_SIZE},\n", + " n_train={N_TRAIN},\n", + " n_val={N_VAL},\n", + " lora_r={LORA_R},\n", + " lora_alpha={LORA_ALPHA},\n", + " learning_rate={LEARNING_RATE},\n", + " gpu_memory_utilization={GPU_MEMORY_UTILIZATION},\n", + ")\n", + "\n", + "print('Training complete')\n", + "print('Status:', result.get('status'))\n", + "print('Reward history:', result.get('reward_history'))\n", + "\"\"\"\n", + "\n", + "print(\"Entrypoint configured\")\n", + "print(f\"Model: {MODEL_PATH}\")\n", + "print(f\"Dataset: {DATA_PATH}\")\n", + "print(f\"Nodes: {1 + NUM_WORKERS} (head + {NUM_WORKERS} workers)\")\n", + "print(f\"GPUs: {TOTAL_GPUS} total ({N_GPUS_PER_NODE} per node)\")\n", + "print(f\"Iterations: {NUM_ITERATIONS}\")" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 4. Submit the RayJob\n", + "\n", + "The SDK creates a `RayJob` CR with an embedded `RayCluster` spec. KubeRay:\n", + "1. Spins up the Ray cluster (head pod with GPUs)\n", + "2. Runs the entrypoint on the head node\n", + "3. Tears everything down when the job finishes (`shutdownAfterJobFinishes: true`)\n", + "\n", + "The `ttl_seconds_after_finished` controls how long the RayJob CR persists after\n", + "completion (for log inspection). Set to 0 for immediate cleanup." + ] + }, + { + "cell_type": "code", + "metadata": {}, + "source": [ + "job = RayJob(\n", + " job_name=\"grpo-verl-training\",\n", + " entrypoint=entrypoint,\n", + " cluster_config=cluster_config,\n", + " namespace=NAMESPACE,\n", + " ttl_seconds_after_finished=600,\n", + ")\n", + "\n", + "job.submit()\n", + "print(f\"RayJob '{job.name}' submitted to namespace '{NAMESPACE}'\")" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 5. Monitor Training Progress\n", + "\n", + "Poll the job status. The status may show as `PENDING` while the RayCluster is\n", + "spinning up and pulling the image across nodes.\n", + "\n", + "GRPO training with the default parameters on 4× A100-80GB (2 nodes) takes approximately 15-30 minutes." + ] + }, + { + "cell_type": "code", + "metadata": {}, + "source": [ + "job.status()" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": {}, + "source": [ + "import time\n", + "\n", + "print(\"Waiting for job to complete...\")\n", + "print(\n", + " f\"Expected duration: ~15-30 minutes for {NUM_ITERATIONS} iterations on {TOTAL_GPUS}× GPU\\n\"\n", + ")\n", + "\n", + "for i in range(120):\n", + " status, ready = job.status(print_to_console=False)\n", + " print(f\"[{i * 30}s] Status: {status.value}\")\n", + " if ready or status.value in (\"FAILED\", \"COMPLETE\"):\n", + " break\n", + " time.sleep(30)\n", + "\n", + "print(f\"\\nFinal status: {status.value}\")" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 6. View Job Logs\n", + "\n", + "Retrieve the RayJob logs to inspect training output, reward scores, and any errors." + ] + }, + { + "cell_type": "code", + "metadata": {}, + "source": [ + "job.logs()" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 7. Plot Reward Curve\n", + "\n", + "Parse the reward history from the job logs and plot it. The mean reward should\n", + "increase over iterations as the model learns to produce correct tool calls." + ] + }, + { + "cell_type": "code", + "metadata": {}, + "source": [ + "import re\n", + "\n", + "import matplotlib.pyplot as plt\n", + "\n", + "log_output = job.logs()\n", + "\n", + "rewards = re.findall(r\"Reward history:\\s*\\[([^\\]]+)\\]\", log_output)\n", + "\n", + "if rewards:\n", + " reward_values = [float(r) for r in rewards[-1].split(\",\")]\n", + " iterations = list(range(1, len(reward_values) + 1))\n", + "\n", + " plt.figure(figsize=(8, 4))\n", + " plt.plot(iterations, reward_values, marker=\"o\")\n", + " plt.xlabel(\"Iteration\")\n", + " plt.ylabel(\"Mean Reward\")\n", + " plt.title(\"GRPO Training — Reward Curve\")\n", + " plt.grid(True, alpha=0.3)\n", + " plt.tight_layout()\n", + " plt.show()\n", + "\n", + " print(f\"Final reward: {reward_values[-1]:.4f}\")\n", + "else:\n", + " print(\"Could not parse reward history from logs.\")\n", + " print(\"Check job.logs() output above for training results.\")" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 8. Cleanup\n", + "\n", + "The RayCluster is automatically torn down when the job finishes\n", + "(`shutdownAfterJobFinishes: true`). The RayJob CR is cleaned up after\n", + "`ttl_seconds_after_finished` (600 seconds).\n", + "\n", + "Use `job.delete()` for immediate cleanup if needed." + ] + }, + { + "cell_type": "code", + "metadata": {}, + "source": [ + "# Uncomment to delete the RayJob immediately:\n", + "# job.delete()" + ], + "execution_count": null, + "outputs": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbformat_minor": 4, + "version": "3.12.0" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} \ No newline at end of file