Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions tests/e2e/ft/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Fault Tolerance E2E Tests

## Layout

- Scenario logic lives in `conftest_ft/scenario_<name>.py`.
- CI runs it via thin per-mode entry files `test_trainer_ft_<scenario>_<mode>.py`, each registered with `register_cuda_ci(est_time=..., suite="stage-c-8-gpu-h200", labels=["ft"])`.
- The CUDA CI runner executes each entry as bare `python3 <file>` (exit code = pass/fail); the entry just calls the scenario's `run_ci(mode)`.

| Scenario (`conftest_ft/scenario_*.py`) | Type | What it verifies |
|------|------|-----------------|
| `scenario_no_failure` | Comparison | indep_dp matches normal DP when no faults |
| `scenario_with_failure` | Comparison, multi-phase | indep_dp matches normal DP after fault + ckpt resume |
| `scenario_deterministic` | Comparison, multi-phase | healing state transfer is bitwise-correct (stop+start), on cold start and on resume from a post-healing ckpt |
| `scenario_ft_random` | Non-comparison | system survives random crashes without hanging |
| `scenario_realistic_gsm8k` | Non-comparison | model still reaches gsm8k accuracy under random crashes |

## Mode Variants

- Each scenario runs with a `--mode`.
- All modes are **disaggregated** (training and rollout on separate nodes). Modes without rollout use debug rollout data.

| Mode | Nodes | DP cells | Parallelism | Rollout | Model | Coverage |
|------|-------|----------|-------------|---------|-------|----------|
| `dp2_cp2_tp2_ep2` | 1 | 2 | CP2 TP2 EP2 | debug data | 5-layer MoE | TP + EP |
| `dp2_cp2_pp2` | 1 | 2 | CP2 PP2 | debug data | 5-layer MoE | PP |
| `dp4_cp2` | 1 | 4 | CP2 | debug data | 5-layer MoE | Multi-replica (>=4 cells) |
| `dp2_cp2_real_rollout` | 1 | 2 | CP2 | 4 engines × 1 GPU | 5-layer MoE | Real rollout engines + weight update path (no_failure, deterministic) |
| `dp2_cp2_real_rollout_dense` | 1 | 2 | CP2 | 4 engines × 1 GPU | dense Qwen3-0.6B | Real rollout under a fault + injection match guard (with_failure) |
| `6node_dp4_cp2_tp2_pp2_ep2_etp2` | 4+2 | 4 | CP2 TP2 PP2 EP2 ETP2 | 2 engines × 8 GPU | full MoE | Large-scale, all parallelism |

- All scenarios use `--rollout-batch-size 32 --n-samples-per-prompt 8 --global-batch-size 256` (256 samples/rollout), which divides evenly across both 2 and 4 cells. Uneven sample distribution across replicas is **not** exercised.
- 1-node modes use the 5-layer MoE (`Qwen3-30B-A3B-5layer`), except `dp2_cp2_real_rollout_dense` (dense `Qwen3-0.6B` — see `scenario_with_failure` for why).
- Authorized CI skips (no entry file): `6node_dp4_cp2_tp2_pp2_ep2_etp2` (multi-node), `with_failure × dp4_cp2`.

## Running

### In CI

- Gated on the `run-ci-ft` PR label (FT is expensive — not run on every PR). With the label set, every entry runs on `stage-c-8-gpu-h200`.
- Add a `(scenario, mode)` to CI: copy an entry file, change `run_ci(...)`'s mode.
- Add a new label: edit `tests/ci/labels.py` and create the matching `run-ci-<label>` GitHub label.

### Manually

Set `PYTHONPATH` to the repo root (CI sets it automatically).

- One mode, exactly as CI runs it — invoke the entry file:

```bash
PYTHONPATH=. python tests/e2e/ft/test_trainer_ft_no_failure_dp2_cp2_tp2_ep2.py
```

- Any mode (incl. authorized-skips) — invoke the scenario's typer app:

```bash
PYTHONPATH=. python tests/e2e/ft/conftest_ft/scenario_<name>.py run --mode <mode>
```

| subcommand | does |
|---|---|
| `run` | full pipeline: prepare + baseline + target + compare |
| `baseline` / `target` | run one side only (debugging) |
| `compare` | re-run comparison on existing dumps (no GPU) |

- When debugging, prefer the individual subcommands (shared `--dump-dir`, `--phase` for multi-phase) over `run`, so you re-run only what changed (e.g. just `compare` on existing dumps, or one side / phase).
- `scenario_ft_random`: non-comparison; only `run` with `--seed` / `--num-steps` / `--crash-probability`.
- `scenario_realistic_gsm8k`: non-comparison, no `--mode`; only `run` with `--seed` / `--num-rollout` / `--crash-probability` / `--metric-threshold`.
- Dumps land under `/node_public/dumps/<test_name>/` (`conftest_ft/app.py` `resolve_dump_dir`).

## Comparison criterion

- Dumps: per-tensor boolean predicates over `rel`/`max_abs`/`mean_abs` (`compare_dumps(diff_thresholds=[(name_regex, predicate), ...])`).
- `scenario_deterministic`: bitwise (`rel <= 0`), relying on `--deterministic-mode` (kernel determinism) + `--debug-deterministic-collective` (fixed-tree SUM collectives).
- Metrics: `rtol=atol=0`, except `train/grad_norm` (`rtol<=1e-6`): its bracketing depends on dist-optimizer shard count (8 flat vs 2 per cell), so a few fp32 ulps are inherent; the grads stay bitwise-checked via the dumps.
- Other scenarios: `rel <= 0.0085`; `with_failure` also floors near-zero MoE-expert and QK-norm (`q_layernorm`/`k_layernorm`) grads at `max_abs <= 1e-3`.
- Unmatched tensors are a fail-closed error — end each list with a `.*` catch-all.
- Exact per-scenario thresholds: Test Definitions below.

## Debug Rollout Data

- Modes without rollout engines (`has_real_rollout == False`) use pre-recorded data via `--load-debug-rollout-data --debug-train-only`.
- `conftest_ft/execution.py` `prepare()` downloads it via `U.hf_download_dataset()`.

### How to regenerate

- **Must** use the 5-layer model (the full model produces `rollout_log_probs` incompatible with the 5-layer training model → NaN gradients in GRPO).

```bash
# Step 1: Generate rollout data (5-layer model + real sglang rollout, no dumper)
PYTHONPATH=. python tests/e2e/ft/conftest_ft/scenario_no_failure.py generate-data \
--mode dp2_cp2_real_rollout --num-steps 12 --output-dir /tmp/gen_rollout

# Step 2: Locate the generated rollout data
ls /tmp/gen_rollout/rollout_data/

# Step 3: Upload to HF
huggingface-cli upload --repo-type dataset fzyzcjy/miles-test-rollout-Qwen3-30B-A3B-5layer \
/tmp/gen_rollout/rollout_data/
```

---

## Test Definitions

Empty file added tests/e2e/ft/__init__.py
Empty file.
Empty file.
202 changes: 202 additions & 0 deletions tests/e2e/ft/conftest_ft/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
# NOTE: You MUST read tests/e2e/ft/README.md as source-of-truth and documentations

import os
from collections.abc import Callable
from pathlib import Path
from typing import Annotated

import typer

from tests.e2e.ft.conftest_ft.execution import get_common_train_args, prepare, run_training
from tests.e2e.ft.conftest_ft.modes import FTTestMode, resolve_mode


BuildArgsFn = Callable[[FTTestMode, str, bool], str]


def resolve_dump_dir(test_name: str) -> str:
# TODO make it configurable, but on local disk instead of remote disk
output_dir = "/node_public"
dump_dir = str(Path(output_dir) / "dumps" / test_name)
os.makedirs(dump_dir, exist_ok=True)
return dump_dir


def _dump_subdir(side: str, phase: str) -> str:
return f"{side}/{phase}" if phase else side


def run_pipeline(
*,
test_name: str,
build_baseline_args: BuildArgsFn,
build_target_args: BuildArgsFn,
compare_fn: Callable[[str, FTTestMode], None],
phases: list[str] | None,
mode: str,
enable_dumper: bool = True,
) -> None:
"""Full pipeline (prepare + every phase's baseline/target + compare) for one mode."""
effective_phases: list[str] = phases or [""]
ft_mode: FTTestMode = resolve_mode(mode)
dump_dir: str = resolve_dump_dir(test_name)
print(f"Dump directory: {dump_dir}")

prepare(ft_mode)

for phase in effective_phases:
baseline_dump = f"{dump_dir}/{_dump_subdir('baseline', phase)}"
run_training(
train_args=build_baseline_args(ft_mode, baseline_dump, enable_dumper),
mode=ft_mode,
dump_dir=baseline_dump,
)

target_dump = f"{dump_dir}/{_dump_subdir('target', phase)}"
run_training(
train_args=build_target_args(ft_mode, target_dump, enable_dumper),
mode=ft_mode,
dump_dir=target_dump,
)

if enable_dumper:
compare_fn(dump_dir, ft_mode)


def create_comparison_app_and_run_ci(
*,
test_name: str,
build_baseline_args: BuildArgsFn,
build_target_args: BuildArgsFn,
compare_fn: Callable[[str, FTTestMode], None],
phases: list[str] | None = None,
) -> tuple[typer.Typer, Callable[[str], None]]:
"""Build, from one wiring, the manual typer app and a run_ci(mode) one-shot runner.

Returns ``(app, run_ci)``: ``app`` exposes run/baseline/target/compare for manual use;
``run_ci(mode)`` runs the full pipeline for a single mode (used by the per-mode CI entry
files), writing dumps under a per-mode test name so concurrent CI modes don't collide.

For simple (no-phase) tests, leave phases empty.
For multi-phase tests (e.g. with_failure), provide phase names like ["phase_a", "phase_b"].
"""
app: typer.Typer = typer.Typer()

def _run_side(
side: str,
build_fn: BuildArgsFn,
mode: str,
dump_dir: str | None,
phase: str,
*,
enable_dumper: bool = True,
) -> None:
ft_mode = resolve_mode(mode)
if dump_dir is None:
dump_dir = resolve_dump_dir(test_name)
sub = _dump_subdir(side, phase)
full_dump_dir = f"{dump_dir}/{sub}"
args = build_fn(ft_mode, full_dump_dir, enable_dumper)
prepare(ft_mode)
run_training(train_args=args, mode=ft_mode, dump_dir=full_dump_dir)

@app.command()
def baseline(
mode: Annotated[str, typer.Option(help="Test mode variant")],
dump_dir: Annotated[str | None, typer.Option(help="Dump base directory")] = None,
phase: Annotated[str, typer.Option(help="Phase name (multi-phase tests)")] = "",
enable_dumper: Annotated[bool, typer.Option(help="Enable dumper output")] = True,
) -> None:
"""Run baseline (normal DP) training."""
_run_side("baseline", build_baseline_args, mode, dump_dir, phase, enable_dumper=enable_dumper)

@app.command()
def target(
mode: Annotated[str, typer.Option(help="Test mode variant")],
dump_dir: Annotated[str | None, typer.Option(help="Dump base directory")] = None,
phase: Annotated[str, typer.Option(help="Phase name (multi-phase tests)")] = "",
enable_dumper: Annotated[bool, typer.Option(help="Enable dumper output")] = True,
) -> None:
"""Run target (indep_dp) training."""
_run_side("target", build_target_args, mode, dump_dir, phase, enable_dumper=enable_dumper)

@app.command()
def compare(
mode: Annotated[str, typer.Option(help="Test mode variant")],
dump_dir: Annotated[str, typer.Option(help="Dump base directory")],
) -> None:
"""Compare baseline and target dumps."""
ft_mode = resolve_mode(mode)
compare_fn(dump_dir, ft_mode)

@app.command()
def run(
mode: Annotated[str, typer.Option(help="Test mode variant")],
enable_dumper: Annotated[bool, typer.Option(help="Enable dumper output")] = True,
) -> None:
"""Full pipeline: prepare + all phases + compare."""
run_pipeline(
test_name=test_name,
build_baseline_args=build_baseline_args,
build_target_args=build_target_args,
compare_fn=compare_fn,
phases=phases,
mode=mode,
enable_dumper=enable_dumper,
)

@app.command()
def generate_data(
mode: Annotated[str, typer.Option(help="Test mode variant (must have real rollout)")],
num_steps: Annotated[int, typer.Option(help="Number of rollout steps to generate")] = 12,
output_dir: Annotated[
str, typer.Option(help="Output directory for rollout data")
] = "/tmp/generated_rollout_data",
) -> None:
"""Generate debug rollout data using real rollout (no dumper)."""
ft_mode = resolve_mode(mode)
assert ft_mode.has_real_rollout, f"Mode {mode} does not have real rollout engines"
prepare(ft_mode)
args = get_common_train_args(ft_mode, dump_dir=output_dir, num_steps=num_steps, enable_dumper=False)
run_training(train_args=args, mode=ft_mode)

def run_ci(mode: str) -> None:
"""Run one mode's full pipeline (entry point for the per-mode CI files)."""
run_pipeline(
test_name=f"{test_name}_{mode}",
build_baseline_args=build_baseline_args,
build_target_args=build_target_args,
compare_fn=compare_fn,
phases=phases,
mode=mode,
)

return app, run_ci


def create_non_comparison_app(
*,
test_name: str,
build_args: Callable[[FTTestMode, str], str],
verify_fn: Callable[[str, FTTestMode], None] | None = None,
) -> typer.Typer:
"""Generate a typer app with a single 'run' command for non-comparison tests."""
app: typer.Typer = typer.Typer()

@app.command()
def run(
mode: Annotated[str, typer.Option(help="Test mode variant")],
) -> None:
"""Full pipeline: prepare + execute + verify."""
ft_mode = resolve_mode(mode)
dump_dir: str = resolve_dump_dir(test_name)
print(f"Dump directory: {dump_dir}")

prepare(ft_mode)
args = build_args(ft_mode, dump_dir)
run_training(train_args=args, mode=ft_mode)

if verify_fn is not None:
verify_fn(dump_dir, ft_mode)

return app
Loading
Loading