Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions flagscale/runner/backend/backend_megatron.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@
)
from flagscale.runner.utils import get_pkg_dir, logger, parse_hostfile, resolve_path

PERF_MONITOR_RUNNER_KEYS = (
"enable_perf_monitor",
"perf_log_interval",
"perf_log_dir",
"perf_console_output",
"perf_log_format",
"perf_memory_tracking",
"perf_breakdown",
"perf_max_log_files",
"perf_model_type",
)


class MegatronBackend(BackendBase):
def __init__(self, config: DictConfig):
Expand All @@ -20,6 +32,7 @@ def __init__(self, config: DictConfig):

def _prepare(self):
_update_config_train(self.config)
self._prepare_perf_monitor_config()
self.user_args = _get_args_megatron(self.config)
self.rdzv_id = datetime.now().strftime("%Y%m%d_%H%M%S.%f")
self.user_envs = self.config.experiment.get("envs", {})
Expand All @@ -30,6 +43,22 @@ def _prepare(self):
logger.info("\n************** configuration **************")
logger.info(f"\n{OmegaConf.to_yaml(self.config)}")

def _prepare_perf_monitor_config(self):
system_config = self.config.train.system
runner_config = self.config.experiment.runner

OmegaConf.set_struct(system_config, False)
for key in PERF_MONITOR_RUNNER_KEYS:
if runner_config.get(key, None) is not None and system_config.get(key, None) is None:
system_config[key] = runner_config.get(key)

if system_config.get("perf_log_dir", None) is not None:
system_config.perf_log_dir = resolve_path(
system_config.perf_log_dir, "system.perf_log_dir"
)
elif system_config.get("enable_perf_monitor", False):
system_config.perf_log_dir = os.path.join(system_config.logging.log_dir, "perf_monitor")

def generate_run_script(
self,
config,
Expand Down Expand Up @@ -78,6 +107,8 @@ def generate_run_script(
f.write(f"mkdir -p {system_config.logging.details_dir}\n")
f.write(f"mkdir -p {system_config.logging.tensorboard_dir}\n")
f.write(f"mkdir -p {system_config.logging.wandb_save_dir}\n")
if system_config.get("perf_log_dir", None):
f.write(f"mkdir -p {system_config.perf_log_dir}\n")
f.write("\n")
f.write(f"cd {pkg_dir}\n")
f.write("\n")
Expand Down
18 changes: 18 additions & 0 deletions flagscale/runner/launcher/launcher_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,24 @@ def _get_runner_cmd_train(
del runner_args["nsys_rep_file_path"]
if "deploy" in runner_args:
del runner_args["deploy"]
if "enable_perf_monitor" in runner_args:
del runner_args["enable_perf_monitor"]
if "perf_log_interval" in runner_args:
del runner_args["perf_log_interval"]
if "perf_log_dir" in runner_args:
del runner_args["perf_log_dir"]
if "perf_console_output" in runner_args:
del runner_args["perf_console_output"]
if "perf_log_format" in runner_args:
del runner_args["perf_log_format"]
if "perf_memory_tracking" in runner_args:
del runner_args["perf_memory_tracking"]
if "perf_breakdown" in runner_args:
del runner_args["perf_breakdown"]
if "perf_max_log_files" in runner_args:
del runner_args["perf_max_log_files"]
if "perf_model_type" in runner_args:
del runner_args["perf_model_type"]
runner_args["rdzv_id"] = rdzv_id
# runner_args["master_addr"] = master_addr
# runner_args["master_port"] = master_port
Expand Down
68 changes: 68 additions & 0 deletions flagscale/train/megatron/training/arguments_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,73 @@ def _add_regularization_args(parser):
help='If set, disable Nesterov momentum for muon')
return parser

def _add_perf_monitor_args(parser):
group = parser.add_argument_group(title="flagscale perf monitor")

group.add_argument(
"--enable-perf-monitor",
action="store_true",
default=False,
help="Enable FlagScale performance monitoring during training.",
)
group.add_argument(
"--perf-log-interval",
type=int,
default=10,
help="Log performance metrics every N iterations.",
)
group.add_argument(
"--perf-log-dir",
type=str,
default=None,
help="Directory used to save performance monitor logs.",
)
group.add_argument(
"--perf-console-output",
action="store_true",
default=False,
help="Also emit performance monitor logs to stdout on rank 0.",
)
group.add_argument(
"--perf-log-format",
type=str,
choices=["text", "json", "both"],
default="both",
help="Output format for performance monitor files.",
)
group.add_argument(
"--perf-memory-tracking",
dest="perf_memory_tracking",
action="store_true",
help="Track CUDA memory usage in the performance monitor.",
)
group.add_argument(
"--no-perf-memory-tracking",
dest="perf_memory_tracking",
action="store_false",
help="Disable CUDA memory tracking in the performance monitor.",
)
group.set_defaults(perf_memory_tracking=True)
group.add_argument(
"--perf-breakdown",
action="store_true",
default=False,
help="Include estimated component breakdowns in performance logs.",
)
group.add_argument(
"--perf-max-log-files",
type=int,
default=10,
help="Maximum number of historical performance log files to keep.",
)
group.add_argument(
"--perf-model-type",
type=str,
choices=["auto", "gpt", "llama", "qwen", "mixtral", "aquila", "moe"],
default="auto",
help="Model type hint used for FLOPS estimation.",
)
return parser

def _add_flagos_args(parser):
group = parser.add_argument_group(title="flagscale transformer engine fl")
Expand Down Expand Up @@ -878,6 +945,7 @@ def add_flagscale_arguments(parser):
parser = _add_auto_skip_spiky_loss(parser)
parser = _add_peft_args(parser)
parser = _add_regularization_args(parser)
parser = _add_perf_monitor_args(parser)
parser = _add_flagos_args(parser)
parser = _add_engram_args(parser)
return parser
16 changes: 15 additions & 1 deletion flagscale/train/megatron/training/training.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
import functools
import gc
import inspect
import json
import logging
import math
import os
import socket
import sys
from typing import Any, Optional

Expand Down Expand Up @@ -138,6 +140,12 @@
from megatron.training.global_vars import get_spiky_loss_detector
from megatron.training.fs_theoretical_memory_usage import report_theoretical_memory as fs_report_theoretical_memory
from megatron.plugin.hetero.parallel_context import get_parallel_context
from flagscale.train.perf_monitor.hooks import (
initialize_perf_monitor,
perf_monitor_end_iteration,
perf_monitor_end_training,
perf_monitor_start_iteration,
)

stimer = StragglerDetector()

Expand Down Expand Up @@ -2453,6 +2461,7 @@ def train(
timers('interval-time', log_level=0).start(barrier=True)
print_datetime('before the start of training step')
report_memory_flag = True
perf_callback = initialize_perf_monitor(args)
pre_hook_enabled = False
should_exit = False
exit_code = 0
Expand Down Expand Up @@ -2483,6 +2492,7 @@ def train(

num_microbatches = get_num_microbatches()

writer = get_tensorboard_writer()
wandb_writer = get_wandb_writer()
if wandb_writer and args.wandb_log_model:
# wandb.watch's log_freg needs to take the accumulated number of microbatches into account
Expand Down Expand Up @@ -2665,7 +2675,8 @@ def get_e2e_base_metrics():
model, optimizer, iteration, ref_state_dict,
)
train_data_iterator = buffered_rollouts

if perf_callback is not None:
perf_monitor_start_iteration(iteration)
ft_integration.on_training_step_start()
(
loss_dict,
Expand All @@ -2679,6 +2690,8 @@ def get_e2e_base_metrics():
forward_step_func, train_data_iterator, model, optimizer, opt_param_scheduler, config, forward_backward_func
)
ft_integration.on_training_step_end()
if perf_callback is not None:
perf_monitor_end_iteration(iteration, writer, wandb_writer)
if should_checkpoint:
save_checkpoint_and_time(
iteration,
Expand Down Expand Up @@ -2891,6 +2904,7 @@ def get_e2e_base_metrics():

# Flush TensorBoard, WandB writers and one-logger.
writer = get_tensorboard_writer()
perf_monitor_end_training(writer, wandb_writer)
if writer:
writer.flush()

Expand Down
120 changes: 120 additions & 0 deletions flagscale/train/perf_monitor/READ_PERF_MONITOR.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Perf Monitor on Metax C550

## Scope

This note documents the current `perf_monitor` path for Metax C550 on `main-legacy`.

Important:

- The current runnable branch is `main-legacy`.
- The active code path is the legacy runner:
- `flagscale/runner/runner_train.py`
- The new runner launcher path is not active in this branch.

## Current Code Path

- Runner integration:
- `flagscale/runner/runner_train.py`
- `run.py`
- `flagscale/runner/auto_tuner/tuner.py`
- Monitor service:
- `flagscale/runner/elastic/monitor_launcher.py`
- `flagscale/runner/elastic/monitor_service.py`
- `flagscale/runner/elastic/diagnostic.py`

## Metax-Specific Notes

- This monitor path is mostly process/log based and does not depend on `nvidia-smi`.
- Metax-specific diagnostic keywords were added for:
- `maca out of memory`
- `mxkw`
- `ioctl create queue block timeout`

## Compatibility Aliases

For convenience, the legacy monitor path also accepts:

- `++experiment.runner.enable_perf_monitor=true`
- `++experiment.runner.perf_monitor_interval=5`

These are mapped internally to the legacy keys:

- `enable_perf_monitor` -> `enable_monitoring`
- `perf_monitor_interval` -> `monitor_interval`

## Known Pitfalls

- In the legacy runner, monitor enablement must be propagated to each node. This path was fixed in `runner_train.py`; do not bypass it with custom launch wrappers.
- Use a fresh `exp_dir` and a missing `checkpoint.load` during validation to avoid resume mismatches.
- Validate this first on the mini Aquila config, not on the original full 7B config.

## Smoke Test

```bash
cd /workspace/muxi-flagscale-legacy/build/Metax_C550/muxi-flagscale-legacy

TS=$(date +%Y%m%d_%H%M%S)

python run.py \
--config-path ./examples/aquila/conf \
--config-name train \
action=test \
experiment.exp_dir=/workspace/exp/aquila_perf_smoke_${TS} \
train.system.checkpoint.load=/workspace/exp/__no_ckpt__/does_not_exist \
train.system.checkpoint.save=/workspace/exp/aquila_perf_smoke_${TS}/checkpoints \
train.system.use_flash_attn=false \
train.model.attention_backend=unfused \
train.model.num_layers=8 \
train.model.hidden_size=1024 \
train.model.num_attention_heads=16 \
train.model.seq_length=512 \
train.model.max_position_embeddings=512 \
train.model.multiple_of=128 \
train.model.micro_batch_size=1 \
train.model.global_batch_size=8 \
train.model.train_samples=16 \
++experiment.runner.enable_perf_monitor=true \
++experiment.runner.perf_monitor_interval=5
```

## Expected Result

- The short training run completes successfully.
- Monitor outputs are written under:

```bash
/workspace/exp/aquila_perf_smoke_${TS}/logs/monitor
```

Typical files:

- `status.log`
- `host_*_diagnostic.txt`
- `host_*_current.log`

## Full Run Example

```bash
TS=$(date +%Y%m%d_%H%M%S)

python run.py \
--config-path ./examples/aquila/conf \
--config-name train \
action=run \
experiment.exp_dir=/workspace/exp/aquila_perf_run_${TS} \
train.system.checkpoint.load=/workspace/exp/__no_ckpt__/does_not_exist \
train.system.checkpoint.save=/workspace/exp/aquila_perf_run_${TS}/checkpoints \
train.system.use_flash_attn=false \
train.model.attention_backend=unfused \
train.model.num_layers=8 \
train.model.hidden_size=1024 \
train.model.num_attention_heads=16 \
train.model.seq_length=512 \
train.model.max_position_embeddings=512 \
train.model.multiple_of=128 \
train.model.micro_batch_size=1 \
train.model.global_batch_size=8 \
train.model.train_samples=1600 \
++experiment.runner.enable_perf_monitor=true \
++experiment.runner.perf_monitor_interval=5
```
20 changes: 20 additions & 0 deletions flagscale/train/perf_monitor/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""FlagScale performance monitor utilities."""

from .hooks import (
get_perf_monitor,
initialize_perf_monitor,
perf_monitor_end_iteration,
perf_monitor_end_training,
perf_monitor_start_iteration,
)
from .perf_metrics import FLOPSMeasurementCallback, PerformanceMonitor

__all__ = [
"FLOPSMeasurementCallback",
"PerformanceMonitor",
"get_perf_monitor",
"initialize_perf_monitor",
"perf_monitor_end_iteration",
"perf_monitor_end_training",
"perf_monitor_start_iteration",
]
Loading
Loading