diff --git a/configs/gpu_perf_hpo.yaml b/configs/gpu_perf_hpo.yaml new file mode 100644 index 0000000..bfb12ca --- /dev/null +++ b/configs/gpu_perf_hpo.yaml @@ -0,0 +1,59 @@ +# HPO configuration for GPU performance benchmarking with the CLAIMED +# gpu_performance_test component on Vela (OpenShift / MLBatch). +# +# Hyperparameters: +# batch_size – DataLoader / training batch size +# num_workers – DataLoader worker count +# hidden_dim – MLP hidden layer width (training/inference cost) +# depth – MLP depth (training/inference cost) +# matrix_size – Matrix multiplication size (raw GPU compute) +# gpu_num – Number of GPUs to request per pod (launcher-level) +# +# Static args: +# All remaining fixed CLI flags for gpu_performance_test.py +# +# Metrics: +# The gpu_performance_test script prints "Samples/sec:" three times (DataLoader, +# Training, Inference) and "GFLOPS:" once. iterate2 uses the name#N syntax +# to select the Nth occurrence (0-based) of a repeated metric name. + +metrics: + - "Samples/sec#0" # DataLoader throughput (1st occurrence) + - "Samples/sec#1" # Training throughput (2nd occurrence) + - "Samples/sec#2" # Inference throughput (3rd occurrence) + - GFLOPS + +hpo: + gpu_num: + type: categorical + choices: [1, 2] + + batch_size: + type: categorical + choices: [32, 64, 128] + + num_workers: + type: categorical + choices: [8, 16, 32] + + hidden_dim: + type: categorical + choices: [500, 1000, 2000] + + depth: + type: categorical + choices: [500, 1000, 2000] + + matrix_size: + type: categorical + choices: [5000, 10000, 20000] + +static: + mode: single_gpu + dataset_size: 100000 + steps: 1 + input_dim: 1000000 + num_classes: 100 + materialize_dir: "." + cleanup: true # flag – generates --cleanup (store_true) + iterations: 100 diff --git a/configs/gridfm_graphkit_hpo.yaml b/configs/gridfm_graphkit_hpo.yaml new file mode 100644 index 0000000..049466f --- /dev/null +++ b/configs/gridfm_graphkit_hpo.yaml @@ -0,0 +1,51 @@ +# HPO configuration for gridfm-graphkit HGNS PF case118 +# +# Hyperparameters: +# gpu_num – number of GPUs to request from the WLM (launcher-level) +# bfloat16 – boolean flag (presence = --bfloat16, absence = flag omitted) +# tf32 – boolean flag (presence = --tf32, absence = flag omitted) +# compile – torch.compile mode; null disables the flag entirely +# dataset – group: selects config + data_path + exp_name together +# +# Static args: +# all other fixed CLI args +# +# Metrics: +# extracted from [performance] lines in trial output + +metrics: + - case118_ieee/layer_0_residual + - last epoch time + - last epoch it/s + +hpo: + gpu_num: + type: categorical + choices: [1, 2, 4] + + bfloat16: + type: flag # store_true: true → --bfloat16, false → flag omitted + + tf32: + type: flag # store_true: true → --tf32, false → flag omitted + + compile: + type: categorical + choices: ["max-autotune", "default", "reduce-overhead", null] + # null → --compile flag is omitted entirely + + num_workers: + type: categorical + choices: [8, 16, 24, 32] + + dataset: + type: group # one choice selects all bundled args together + choices: + case118: + config: ./examples/config/HGNS_PF_datakit_case118.yaml + data_path: /u/rkie/ + +static: + run_name: run1 + log_dir: logs + report-performance: true diff --git a/docs/iterate2.md b/docs/iterate2.md index 3429984..c21359e 100644 --- a/docs/iterate2.md +++ b/docs/iterate2.md @@ -1,6 +1,14 @@ # iterate2 – HPO Launcher -`iterate2` is a generic Optuna-based hyperparameter optimisation (HPO) launcher with a pluggable workload-manager backend. It submits one trial per Optuna suggestion, waits for the job to finish, extracts a scalar metric from the job's log file, and returns it to Optuna. +`iterate2` is a generic Optuna-based hyperparameter optimisation (HPO) launcher with a pluggable workload-manager backend. It submits one trial per Optuna suggestion, waits for the job to finish, extracts one or more metrics from the job's log file, and returns them to Optuna. + +Key capabilities: + +- **Multi-objective optimisation** — extract and optimise several metrics simultaneously (Pareto front) +- **Five HPO parameter types** — `float`, `int`, `categorical`, `flag` (store-true), `group` (bundled arg sets) +- **Dynamic GPU count per trial** — `gpu_num` in the HPO space controls the WLM resource request per trial +- **Null-omission** — `null` in a `categorical` choice causes the flag to be completely absent from the command line +- **Workload manager backends** — LSF, Slurm, or direct local execution ## Quick start @@ -28,18 +36,32 @@ iterate2 \ | `--venv` | `.venv` | Virtual-environment directory to activate. Set to empty string to disable | | `--interpreter` | `python` | Python interpreter to invoke | | `--param-setter` | `None` | Use setter-style argument passing (see [Setter-style arguments](#setter-style-arguments)) | -| `--wlm` | `none` | Workload manager: `lsf`, `slurm`, `openshift`, or `none` | +| `--wlm` | `none` | Workload manager: `lsf`, `slurm`, `vela`, or `none` | | `--gpu-count` | `1` | Number of GPUs per trial | | `--cpu-count` | `4` | Number of CPUs per trial | | `--mem-gb` | `128` | Memory (GB) per trial | | `--lsf-gpu-config-string` | `None` | Optional verbatim LSF `-gpu` option string (see [GPU configuration](#gpu-configuration-on-lsf)) | +| `--parallelism` | `1` | Number of trials to run in parallel (see [Parallel execution](#parallel-execution)) | + +### Vela (OpenShift) options + +Required when `--wlm vela`. + +| Option | Default | Description | +|---|---|---| +| `--vela-job-template` | *(required)* | Path to the Vela job YAML template. `{{HPO_COMMAND}}` in `setupCommands` is replaced per trial | +| `--vela-chart-path` | *(required)* | Path to the `pytorchjob-generator` helm chart directory | +| `--vela-namespace` | *(current context)* | OpenShift/Kubernetes namespace | +| `--vela-cmd-placeholder` | `{{HPO_COMMAND}}` | String in `setupCommands` that is replaced with the HPO-parametrised CLI call | +| `--vela-pod-ready-timeout` | `600` | Seconds to wait for the trial pod to reach Running state | +| `--vela-job-timeout` | `86400` | Seconds to wait (streaming logs) for the job to complete | ### Optuna options | Option | Default | Description | |---|---|---| | `--optuna-study-name` | *(required)* | Name of the Optuna study | -| `--optuna-db-path` | *(required)* | Storage URL for the Optuna database, e.g. `sqlite:///hpo.db` | +| `--optuna-db-path` | *(required)* | Storage URL. `sqlite:///hpo.db` for SQLite, `js:///path/journal.log` for JournalStorage, or any Optuna-supported URL | | `--optuna-n-trials` | `100` | Number of trials to run | ### HPO search space @@ -73,7 +95,107 @@ static: dataset_path: /data/my_dataset ``` -Supported parameter types: `float`, `int`, `categorical`. +Supported parameter types: `float`, `int`, `categorical`, `flag`, `group`. + +#### Parameter types + +##### `float` + +Suggests a floating-point value between `low` and `high`. Set `log: true` for log-uniform sampling. + +```yaml +learning_rate: + type: float + low: 1e-5 + high: 1e-2 + log: true +``` + +Generates: `--learning-rate 0.0003` + +##### `int` + +Suggests an integer between `low` and `high` (inclusive). + +```yaml +encoder_depth: + type: int + low: 2 + high: 8 +``` + +Generates: `--encoder-depth 4` + +##### `categorical` + +Suggests one value from a list of choices. Choices can be strings, numbers, or `null`. + +```yaml +batch_size: + type: categorical + choices: [16, 32, 64] +``` + +Generates: `--batch-size 32` + +**`null` omits the flag entirely.** Useful for optional flags like `--compile`: + +```yaml +compile: + type: categorical + choices: ["max-autotune", "default", null] + # null → --compile is completely absent from the command +``` + +##### `flag` + +Models a `store_true`-style flag that takes no value — its presence or absence is the parameter. `true` adds the flag; `false` omits it. + +```yaml +bfloat16: + type: flag # true → --bfloat16 false → (omitted) + +tf32: + type: flag # true → --tf32 false → (omitted) +``` + +!!! note + Use unquoted YAML `true`/`false` for `flag` and for boolean values in `categorical.choices`. + Use **quoted** `"true"`/`"false"` when the wrapped script expects the literal string as a value (e.g. `--amp true`). + +##### `group` + +Bundles several CLI arguments together under a single Optuna categorical parameter. Optuna picks one group name; `iterate2` then injects all key/value pairs from that group into the trial's argument list. This is useful when multiple arguments are co-dependent (e.g. config file + dataset path + experiment name). + +```yaml +dataset: + type: group + choices: + case2000: + config: ./examples/config/model_case2000.yaml + data_path: /data/pf/ + exp_name: case2000 + case1000: + config: ./examples/config/model_case1000.yaml + data_path: /data/pf/ + exp_name: case1000 +``` + +Optuna tracks the choice as a single categorical (`dataset = "case2000"`), but the wrapped script receives: + +``` +--config ./examples/config/model_case2000.yaml --data-path /data/pf/ --exp-name case2000 +``` + +##### `gpu_num` — dynamic GPU count + +The special key `gpu_num` (as `categorical` or `int`) overrides `--gpu-count` for the **WLM resource request** of each individual trial. It is consumed by `iterate2` and never forwarded to the wrapped script. + +```yaml +gpu_num: + type: categorical + choices: [1, 2, 4] +``` ### Static arguments @@ -86,13 +208,41 @@ Arguments passed unchanged to every trial. Can be supplied inline or via file: If neither is provided, `iterate2` falls back to the `static` section of `--hpo-yaml`. +Static boolean values follow the same rule as HPO values: unquoted `true` produces a bare flag (`--flag`), unquoted `false` omits it. + +```yaml +static: + max_epochs: 50 + tf32: true # → --tf32 (store_true flag, always present) + debug: false # → (omitted) +``` + ### Metric extraction | Option | Default | Description | |---|---| -| `--metric` | `val/F1_Score` | Metric name to extract from the trial's stdout log | +| `--metrics` | `score_combined` | Comma-separated list of metric names to extract from the trial's stdout log | -The last occurrence of the pattern `: ` or `= ` is used. +The **last** occurrence of the pattern `: ` or `= ` is used for each metric. If a metric is not found, it defaults to `0.0` with a warning. + +**Single metric (single-objective):** + +```sh +--metrics val_loss +``` + +**Multiple metrics (multi-objective, Pareto front):** + +```sh +--metrics score_linear_acc,score_modality_leak,score_combined +``` + +All objectives are maximised. `iterate2` prints the Pareto-front trials at the end: + +``` +Trial 12: Values=[0.873, 0.041, 0.791] +Trial 17: Values=[0.901, 0.038, 0.812] +``` --- @@ -116,7 +266,7 @@ iterate2 --param-setter set ... | `--param-setter set` | `--set learning_rate 0.001 --set batch_size 32` | !!! note - In setter style, boolean parameters are passed explicitly as `--set flag true` / `--set flag false` rather than as bare flags (`--flag`), since there is no named flag to toggle. + In setter style, `flag` parameters are passed as `--set flag` (key only, no value) when `true`, and omitted when `false`. --- @@ -161,6 +311,62 @@ bsub -n 20 -R "span[hosts=1]" \ --- +--- + +## Parallel execution + +By default `iterate2` runs one trial at a time. Pass `--parallelism N` to run up to `N` trials simultaneously, each in its own thread. + +```sh +iterate2 \ + --parallelism 4 \ + --wlm lsf \ + ... +``` + +### How it works + +Each thread independently: + +1. Asks Optuna for a new set of hyperparameters (`study.ask()`) +2. Builds and submits the launcher command (e.g. `bsub -K …`) +3. Streams every output line to the main process stdout/stderr, prefixed with `[trial-N]` +4. Reports the extracted metrics back to Optuna (`study.tell()`) + +Output from concurrent trials is prefixed so you can follow individual workers: + +``` +[trial-3] Epoch 1/10 ━━━━━━━━━━ 100/100 0:01:12 +[trial-5] Using bfloat16 precision +[trial-3] [performance] val_loss : 0.0421 +[trial-5] Epoch 1/10 ━━━━━━━━━━ 100/100 0:01:15 +``` + +### Output files + +| WLM | stdout | stderr | +|---|---|---| +| `none` | `trial_N.out` (written by iterate2) | `trial_N.err` (written by iterate2) | +| `lsf` / `slurm` | `trial_N.out` (written by WLM on cluster) | `trial_N.err` (written by WLM on cluster) | + +For WLM backends the local WLM tool output (bsub/srun status messages) is written to `trial_N_wlm.out` / `trial_N_wlm.err` so the cluster-managed files are never overwritten. + +### SQLite and parallelism + +Optuna retries on SQLite locking errors automatically. Values up to `--parallelism 4` work well with SQLite. For higher concurrency use PostgreSQL or **JournalStorage**: + +```sh +# PostgreSQL +--optuna-db-path postgresql://user:pass@host/dbname + +# JournalStorage (file-based, lock-free, safe for parallel workers on a shared filesystem) +--optuna-db-path js:///path/to/study_journal.log +``` + +`js:///` is a custom `iterate2` scheme. The path after `js:///` is passed to Optuna's `JournalFileStorage`. JournalStorage serialises trials to an append-only log and is well-suited for NFS/GPFS shared filesystems where SQLite locking is unreliable. + +--- + ## Workload managers ### LSF @@ -175,45 +381,147 @@ Uses `srun` with `--gres=gpu:`, `--cpus-per-task`, and `--mem` flags. Runs the command directly in a local shell, redirecting stdout/stderr to `trial_.out` / `trial_.err`. -### openshift +### Vela (OpenShift / MLBatch) + +`--wlm vela` submits each trial as a [PyTorchJob](https://www.kubeflow.org/docs/components/training/pytorch/) via the [MLBatch `pytorchjob-generator`](https://github.com/project-codeflare/mlbatch) helm chart. -Not yet implemented. +#### Submission flow + +1. For each Optuna trial iterate2: + * Builds the CLI invocation from sampled + static args. + * Patches the **job template YAML**: + * appends `-trial-` to `jobName` (unique resource per trial) + * sets `numGpusPerPod` from `gpu_num` (HPO or CLI `--gpu-count`) + * replaces the `{{HPO_COMMAND}}` placeholder in `setupCommands` with the generated CLI call + * Runs `helm template -f | oc create [-n ] -f-` +2. Polls until `-master-0` pod appears, then streams `oc logs -f ` — **this call blocks until the container exits**, so the trial behaves the same as other WLM backends. +3. Pod exit code is checked; non-zero raises an error. +4. The `PyTorchJob` resource is deleted. + +#### Job template + +Create a YAML file modelled on `examples/vela_gridfm_template.yaml`. The only special requirement is the `{{HPO_COMMAND}}` placeholder somewhere in `setupCommands`: + +```yaml +jobName: "my-project-hpo" # iterate2 appends -trial-N +numGpusPerPod: 1 # iterate2 overwrites with gpu_num +numCpusPerPod: 32 +totalMemoryPerPod: "32Gi" + +volumes: + - name: "data-vol" + claimName: "my-pvc" + mountPath: "/mnt/data" + +setupCommands: + - "wget -q https://example.com/config.yaml" + - "{{HPO_COMMAND}}" # ← iterate2 fills this in +``` + +#### Example invocation + +```sh +iterate2 \ + --script "gridfm_graphkit train" \ + --interpreter "" \ + --wlm vela \ + --vela-job-template examples/vela_gridfm_template.yaml \ + --vela-chart-path ../mlbatch/tools/pytorchjob-generator/chart \ + --vela-namespace my-namespace \ + --gpu-count 1 \ + --optuna-study-name gridfm_vela_hpo \ + --optuna-db-path sqlite:///gridfm_vela_hpo.db \ + --optuna-n-trials 20 \ + --hpo-yaml configs/gridfm_graphkit_hpo.yaml +``` + +See `examples/run_vela_example.sh` for a complete ready-to-run script. + +!!! note + `--script` is the bare CLI entry-point (`gridfm_graphkit train`). Set `--interpreter ""` to suppress the default `python` prefix. + +!!! tip + `gpu_num` in the HPO space controls both `numGpusPerPod` in the job YAML **and** the WLM resource request, just like with LSF/Slurm. --- ## Example HPO YAML +Full example combining all parameter types: + ```yaml # hpo_space.yaml hpo: + # float – log-uniform over [1e-5, 1e-2] learning_rate: type: float low: 1e-5 high: 1e-2 log: true - weight_decay: - type: float - low: 1e-6 - high: 1e-3 - log: true + + # int – encoder depth + encoder_depth: + type: int + low: 2 + high: 8 + + # categorical – batch size + batch_size: + type: categorical + choices: [16, 32, 64] + + # categorical with null – compile mode (null omits --compile entirely) + compile: + type: categorical + choices: ["max-autotune", "default", null] + + # flag – store_true style (--bfloat16 present or absent) + bfloat16: + type: flag + + # flag – store_true style (--tf32 present or absent) + tf32: + type: flag + + # gpu_num – controls WLM resource request per trial (not forwarded to script) + gpu_num: + type: categorical + choices: [1, 2, 4] + + # group – bundles co-dependent args; Optuna picks one group by name + dataset: + type: group + choices: + case2000: + config: ./examples/config/model_case2000.yaml + data_path: /data/pf/ + exp_name: case2000 + case1000: + config: ./examples/config/model_case1000.yaml + data_path: /data/pf/ + exp_name: case1000 static: - max_epochs: 30 - config: configs/my_model.yaml + max_epochs: 50 + log_dir: logs + num_workers: 16 ``` Launch with: ```sh iterate2 \ - --script terratorch_iterate/main.py \ + --script gridfm_graphkit \ + --interpreter "" \ + --root-dir /path/to/project \ + --venv /path/to/venv \ --wlm lsf \ --lsf-gpu-config-string "num=1:mode=exclusive_process:mps=yes:gmodel=NVIDIAA100_SXM4_80GB" \ - --cpu-count 20 \ - --mem-gb 512 \ - --optuna-study-name geobench_hpo \ - --optuna-db-path sqlite:///geobench_hpo.db \ - --optuna-n-trials 40 \ + --cpu-count 16 \ + --mem-gb 64 \ + --optuna-study-name my_study \ + --optuna-db-path sqlite:///my_study.db \ + --optuna-n-trials 50 \ --hpo-yaml hpo_space.yaml \ - --metric "val/F1_Score" + --metrics val_loss,val_f1 ``` diff --git a/examples/iterate_study.db b/examples/iterate_study.db new file mode 100644 index 0000000..5d1050d Binary files /dev/null and b/examples/iterate_study.db differ diff --git a/examples/run_lsf_gpu_example.sh b/examples/run_lsf_gpu_example.sh index 2913513..9808c92 100755 --- a/examples/run_lsf_gpu_example.sh +++ b/examples/run_lsf_gpu_example.sh @@ -22,7 +22,7 @@ set -euo pipefail SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -iterate2 \ +iterate \ --script "${SCRIPT_DIR}/bumpy_function.py" \ --root-dir "${SCRIPT_DIR}" \ --venv "" \ diff --git a/examples/run_lsf_gridfm_example.sh b/examples/run_lsf_gridfm_example.sh new file mode 100755 index 0000000..5afb84e --- /dev/null +++ b/examples/run_lsf_gridfm_example.sh @@ -0,0 +1,55 @@ +#!/usr/bin/env bash +# ============================================================================= +# Example: iterate with --wlm lsf for gridfm-graphkit HPO on an LSF cluster +# +# Prerequisites +# ------------- +# * LSF bsub/bjobs available on PATH +# * gridfm-graphkit installed in the venv (or via module load) +# * configs/gridfm_graphkit_hpo.yaml present +# +# How it works +# ------------ +# 1. For each Optuna trial iterate: +# a. Samples hyperparameters from gridfm_graphkit_hpo.yaml +# b. Builds the gridfm_graphkit CLI invocation from static + sampled params +# c. Submits a bsub job (-K blocks until completion) +# d. Reads stdout/stderr from trial.out / trial.err +# e. Extracts metrics and reports them to Optuna +# +# Customise +# --------- +# LSF_GPU_CONFIG – full -gpu option string for bsub +# GPU_COUNT – must match num= in LSF_GPU_CONFIG +# OC_NAMESPACE – not used for LSF; kept as no-op for parity +# ============================================================================= + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)" + +# Adjust to match your cluster's GPU model and scheduling policy. +LSF_GPU_CONFIG="${LSF_GPU_CONFIG:-num=2:mode=exclusive_process:mps=no:gmodel=NVIDIAA100_SXM4_80GB}" + +# Commands to run inside the bsub job before launching the training script. +# source ~/.bashrc ensures module / mamba initialisation is available; +# micromamba activate gridfm switches to the correct conda environment. +PRE_RUN="${PRE_RUN_COMMANDS:-source ~/.bashrc && micromamba activate gridfm}" + +iterate \ + --script "gridfm_graphkit train" \ + --interpreter "" \ + --root-dir "${GRIDFM_ROOT:-${HOME}/gitco/gridfm-graphkit}" \ + --wlm lsf \ + --pre-run-commands "${PRE_RUN}" \ + --no-underscore-to-hyphen \ + --gpu-count 2 \ + --cpu-count 32 \ + --mem-gb 256 \ + --lsf-gpu-config-string "${LSF_GPU_CONFIG}" \ + --optuna-study-name gridfm_lsf_hpo \ + --optuna-db-path "js:///gridfm_lsf_hpo.journal" \ + --parallelism 4 \ + --optuna-n-trials 20 \ + --hpo-yaml "${REPO_ROOT}/configs/gridfm_graphkit_hpo.yaml" diff --git a/examples/run_vela_example.sh b/examples/run_vela_example.sh new file mode 100755 index 0000000..bb5cafc --- /dev/null +++ b/examples/run_vela_example.sh @@ -0,0 +1,57 @@ +#!/usr/bin/env bash +# ============================================================================= +# Example: iterate2 with --wlm vela (OpenShift / MLBatch PyTorchJob) +# +# Prerequisites +# ------------- +# * helm CLI installed and on PATH +# * oc CLI logged in to the target cluster +# * mlbatch/tools/pytorchjob-generator/chart checked out locally +# * The gridfm HPO YAML (configs/gridfm_graphkit_hpo.yaml) present +# +# How it works +# ------------ +# 1. For each Optuna trial iterate2: +# a. Samples hyperparameters from gridfm_graphkit_hpo.yaml +# b. Builds the gridfm_graphkit CLI invocation from static + sampled params +# c. Patches vela_gridfm_template.yaml: +# - appends "-trial-" to jobName (unique resource per trial) +# - sets numGpusPerPod = gpu_num (from the HPO space) +# - replaces {{HPO_COMMAND}} (the actual CLI call) +# d. Runs: helm template -f | oc create -f- +# e. Polls until -master-0 pod is Running +# f. Streams: oc logs -f -master-0 +# (blocks until container exits; output captured for metric extraction) +# g. Checks pod exit code; deletes the PyTorchJob resource +# 2. Metrics are extracted from the captured log and returned to Optuna. +# ============================================================================= + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)" + +# Path to the mlbatch pytorchjob-generator helm chart. +# Clone mlbatch first: git clone https://github.com/project-codeflare/mlbatch +CHART_PATH="${MLBATCH_CHART_PATH:-${HOME}/tmp/mlbatch/tools/pytorchjob-generator/chart}" + +NAMESPACE_ARG=() +[[ -n "${OC_NAMESPACE:-}" ]] && NAMESPACE_ARG=(--vela-namespace "${OC_NAMESPACE}") + +iterate \ + --script "gridfm_graphkit train" \ + --interpreter "" \ + --wlm vela \ + --vela-job-template "${SCRIPT_DIR}/vela_gridfm_template.yaml" \ + --vela-chart-path "${CHART_PATH}" \ + "${NAMESPACE_ARG[@]}" \ + --vela-cmd-placeholder "{{HPO_COMMAND}}" \ + --vela-pod-ready-timeout 600 \ + --vela-job-timeout 86400 \ + --no-underscore-to-hyphen \ + --gpu-count 1 \ + --optuna-study-name gridfm_vela_hpo \ + --optuna-db-path "js:///gridfm_vela_hpo.journal" \ + --parallelism 16 \ + --optuna-n-trials 20 \ + --hpo-yaml "${REPO_ROOT}/configs/gridfm_graphkit_hpo.yaml" diff --git a/examples/run_vela_gpu_perf_example.sh b/examples/run_vela_gpu_perf_example.sh new file mode 100755 index 0000000..bf845b6 --- /dev/null +++ b/examples/run_vela_gpu_perf_example.sh @@ -0,0 +1,53 @@ +#!/usr/bin/env bash +# ============================================================================= +# Example: iterate2 GPU performance benchmark HPO on Vela (OpenShift / MLBatch) +# +# What iterate2 does per trial +# ---------------------------- +# 1. Samples hyperparameters from configs/gpu_perf_hpo.yaml +# 2. Builds the CLI call: +# python /app/.local/.../gpu_performance_test.py \ +# --mode single_gpu --batch-size --num-workers ... +# 3. Patches examples/vela_gpu_perf_template.yaml: +# - appends -trial- to jobName +# - sets numGpusPerPod = gpu_num +# - replaces {{HPO_COMMAND}} with the CLI call (plain single-line string) +# 4. Submits: +# helm template -f | oc create [-n ] -f- +# 5. Streams: oc logs -f -master-0 (blocks until container exits) +# 6. Extracts metrics using Nth-occurrence syntax (name#N, 0-based): +# Samples/sec#0 (DataLoader), Samples/sec#1 (Training), +# Samples/sec#2 (Inference), GFLOPS +# 7. Deletes the PyTorchJob resource. +# ============================================================================= + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)" + +# Path to the mlbatch pytorchjob-generator helm chart. +# Clone first: git clone https://github.com/project-codeflare/mlbatch +CHART_PATH="${MLBATCH_CHART_PATH:-${HOME}/tmp/mlbatch/tools/pytorchjob-generator/chart}" + +# Only pass --vela-namespace when OC_NAMESPACE is set; an empty string causes +# argparse to receive a bare "" token it treats as an unrecognised argument. +NAMESPACE_ARG=() +[[ -n "${OC_NAMESPACE:-}" ]] && NAMESPACE_ARG=(--vela-namespace "${OC_NAMESPACE}") + +iterate \ + --script "/app/.local/lib/python3.12/site-packages/claimed/components/util/gpu_performance_test.py" \ + --interpreter "python" \ + --no-underscore-to-hyphen \ + --wlm vela \ + --vela-job-template "${SCRIPT_DIR}/vela_gpu_perf_template.yaml" \ + --vela-chart-path "${CHART_PATH}" \ + "${NAMESPACE_ARG[@]}" \ + --vela-pod-ready-timeout 300 \ + --vela-job-timeout 7200 \ + --gpu-count 1 \ + --optuna-study-name gpu_perf_hpo \ + --optuna-db-path "js:///gpu_perf_hpo.journal" \ + --optuna-n-trials 250 \ + --hpo-yaml "${REPO_ROOT}/configs/gpu_perf_hpo.yaml" \ + --parallelism 5 \ diff --git a/examples/vela_gpu_perf_template.yaml b/examples/vela_gpu_perf_template.yaml new file mode 100644 index 0000000..476a5c7 --- /dev/null +++ b/examples/vela_gpu_perf_template.yaml @@ -0,0 +1,29 @@ +#################### +# Job Metadata +# NOTE: iterate2 appends "-trial-" to jobName so each trial is a unique resource. +#################### +jobName: "romeokienzler-gpu-test-hpo" +containerImage: "us.icr.io/geodn/gridfm-graphkit:0.1" +imagePullPolicy: "IfNotPresent" +imagePullSecrets: + - name: "pullsecret-gridfm-geodn" + +################################## +# Resource Requirements +# numGpusPerPod is overwritten per-trial by the gpu_num HPO parameter. +################################## +numPods: 1 +numCpusPerPod: 32 +numGpusPerPod: 1 +totalMemoryPerPod: "64Gi" + +######################## +# Workload Specification +######################## +setupCommands: + - "pip install -U claimed" + # iterate2 replaces {{HPO_COMMAND}} with the HPO-parametrised CLI call. + # Each setupCommands entry must be a single line (Helm chart constraint). + # Metrics are disambiguated by iterate2 using Samples/sec#N (Nth-occurrence) + # rather than awk renaming. + - "{{HPO_COMMAND}}" diff --git a/examples/vela_gridfm_template.yaml b/examples/vela_gridfm_template.yaml new file mode 100644 index 0000000..e75b3cd --- /dev/null +++ b/examples/vela_gridfm_template.yaml @@ -0,0 +1,34 @@ +#################### +# Job Metadata +# NOTE: iterate2 appends "-trial-" to jobName for each HPO trial so +# every trial creates a unique Kubernetes resource. +#################### +jobName: "romeokienzler-gridfm-hpo" +containerImage: "us.icr.io/geodn/gridfm-graphkit:0.5" +imagePullPolicy: "IfNotPresent" +imagePullSecrets: + - name: "pullsecret-gridfm-geodn" + +################################## +# Resource Requirements +# NOTE: numGpusPerPod is overridden per-trial by the gpu_num HPO parameter. +################################## +numPods: 1 +numCpusPerPod: 64 +numGpusPerPod: 2 # placeholder – iterate overrides this with gpu_num +totalMemoryPerPod: "64Gi" + +volumes: + - name: "gridfm-storage" + claimName: "gridfm" + mountPath: "/mnt/data" + +######################## +# Workload Specification +######################## +setupCommands: + - "pip install -U git+https://github.com/gridfm/gridfm-graphkit.git@fix_multi_gpu_training" + - "wget https://raw.githubusercontent.com/gridfm/gridfm-graphkit/refs/heads/main/examples/config/HGNS_PF_datakit_case118.yaml" +# - "sed -i 's/epochs: 200/epochs: 5/g' HGNS_PF_datakit_case118.yaml" + # iterate replaces the placeholder below with the HPO-parametrised CLI call. + - "{{HPO_COMMAND}}" diff --git a/terratorch_iterate/iterate2.py b/terratorch_iterate/iterate2.py index eef5069..a22ad94 100644 --- a/terratorch_iterate/iterate2.py +++ b/terratorch_iterate/iterate2.py @@ -2,15 +2,23 @@ import argparse import json +import logging import os import subprocess +import sys import re +import tempfile +import threading +import time from pathlib import Path from typing import Dict, Any, Optional, Literal, List import optuna +from optuna.storages import JournalStorage, JournalFileStorage import yaml +logger = logging.getLogger("iterate2") + # ============================================================ # CLI # ============================================================ @@ -25,15 +33,81 @@ def parse_args(): # ------------------------ parser.add_argument("--script", required=True, help="Training script to execute") parser.add_argument("--root-dir", default=None, help="Root dir (derived if omitted)") - parser.add_argument("--venv", default=".venv", help="Virtualenv dir") + parser.add_argument("--venv", default=".venv", help="Virtualenv dir (shortcut for source /bin/activate)") + parser.add_argument( + "--pre-run-commands", + default=None, + help=( + "Shell commands to run before the training script, joined with ' && '. " + "Useful for sourcing bashrc, activating conda/mamba envs, loading modules, etc. " + "Example: 'source ~/.bashrc && micromamba activate gridfm'. " + "When set, --venv is ignored." + ), + ) parser.add_argument("--interpreter", default="python", help="Interpreter to use") parser.add_argument("--param-setter", type=str, default=None) - parser.add_argument("--wlm", choices=["lsf", "slurm", "openshift", "none"], default="none") + parser.add_argument("--wlm", choices=["lsf", "slurm", "openshift", "vela", "none"], default="none") parser.add_argument("--gpu-count", type=int, default=1) parser.add_argument("--cpu-count", type=int, default=4) parser.add_argument("--mem-gb", type=int, default=128) parser.add_argument("--lsf-gpu-config-string", type=str, default=None) + # ------------------------ + # Vela / OpenShift options + # ------------------------ + parser.add_argument( + "--vela-job-template", + type=str, + default=None, + help="Path to the Vela job YAML template (required when --wlm vela)", + ) + parser.add_argument( + "--vela-chart-path", + type=str, + default=None, + help="Path to the helm chart directory (required when --wlm vela)", + ) + parser.add_argument( + "--vela-namespace", + type=str, + default=None, + help="OpenShift/Kubernetes namespace (uses current context if omitted)", + ) + parser.add_argument( + "--vela-cmd-placeholder", + type=str, + default="{{HPO_COMMAND}}", + help="String in the job template's setupCommands that is replaced with the HPO command (default: '{{HPO_COMMAND}}')", + ) + parser.add_argument( + "--vela-pod-ready-timeout", + type=int, + default=600, + help="Seconds to wait for the trial pod to reach Running state (default: 600)", + ) + parser.add_argument( + "--vela-job-timeout", + type=int, + default=86400, + help="Seconds to wait for the trial job to complete (default: 86400 = 24 h)", + ) + parser.add_argument( + "--parallelism", + type=int, + default=1, + help="Number of trials to run in parallel (default: 1 = sequential). " + "Each parallel trial runs in its own thread. " + "For SQLite storage, values >4 may cause locking contention; " + "consider PostgreSQL for high parallelism.", + ) + parser.add_argument( + "--no-underscore-to-hyphen", + dest="underscore_to_hyphen", + action="store_false", + default=True, + help="Do not convert underscores to hyphens in arg names (default: convert)", + ) + # ------------------------ # Optuna config # ------------------------ @@ -58,6 +132,16 @@ def parse_args(): help="Comma-separated metric names to extract (e.g. score_linear_acc,score_modality_leak,score_combined)", ) + # ------------------------ + # Logging + # ------------------------ + parser.add_argument( + "--log-level", + default="INFO", + choices=["DEBUG", "INFO", "WARNING", "ERROR"], + help="Logging verbosity (default: INFO)", + ) + return parser.parse_args() @@ -67,51 +151,499 @@ def parse_args(): def resolve_paths(script: str, root_dir: Optional[str]): if root_dir is None: root_dir = '.' - return script, Path(root_dir).resolve() + resolved = Path(root_dir).resolve() + logger.debug("Resolved root_dir '%s' → '%s'", root_dir, resolved) + return script, resolved def build_launcher_command(wlm, cmd, trial_id, out_file, err_file, gpu_count, cpu_count, mem_gb, lsf_gpu_config_string): + logger.debug("Building launcher command: wlm=%s gpu_count=%d cpu_count=%d mem_gb=%d", wlm, gpu_count, cpu_count, mem_gb) if wlm == "lsf": gpu_fragment = f"-gpu \"{lsf_gpu_config_string}\"" if lsf_gpu_config_string else (f"-gpu num={gpu_count}" if gpu_count > 0 else "") - return f"bsub {gpu_fragment} -K -o {out_file} -e {err_file} -R \"rusage[ngpus={gpu_count}, cpu={cpu_count}, mem={mem_gb}GB]\" -J hpo_trial_{trial_id} \"{cmd}\"" - if wlm == "slurm": - return f"srun --gres=gpu:{gpu_count} --cpus-per-task={cpu_count} --mem={mem_gb}G --job-name=hpo_trial_{trial_id} --output={out_file} --error={err_file} bash -c \"{cmd}\"" - if wlm == "none": - return f'bash -c "{cmd} > {out_file} 2> {err_file}"' - raise ValueError(f"Unknown WLM: {wlm}") + launcher = f"bsub {gpu_fragment} -K -o {out_file} -e {err_file} -R \"rusage[ngpus={gpu_count}, cpu={cpu_count}, mem={mem_gb}GB]\" -J hpo_trial_{trial_id} \"{cmd}\"" + elif wlm == "slurm": + launcher = f"srun --gres=gpu:{gpu_count} --cpus-per-task={cpu_count} --mem={mem_gb}G --job-name=hpo_trial_{trial_id} --output={out_file} --error={err_file} bash -c \"{cmd}\"" + elif wlm == "none": + # No embedded redirect: run_and_stream() captures stdout/stderr via PIPE + # and writes to out_file/err_file itself. + launcher = f'bash -c "{cmd}"' + elif wlm in ("vela",): + # Vela uses a separate submission flow; this function is not called for it. + raise ValueError("build_launcher_command must not be called for wlm='vela'; use build_vela_job_yaml + run_vela_trial instead.") + else: + raise ValueError(f"Unknown WLM: {wlm}") + logger.debug("Launcher command: %s", launcher) + return launcher -def build_shell_command(interpreter, root_dir, script_path, venv, script_args, param_setter): +def build_shell_command(interpreter, root_dir, script_path, venv, script_args, param_setter, underscore_to_hyphen=True, pre_run_commands=None): parts = [f"cd {root_dir}"] - if venv: parts.append(f"source {venv}/bin/activate") + if pre_run_commands: + parts.append(pre_run_commands) + logger.debug("Pre-run commands: %s", pre_run_commands) + elif venv: + parts.append(f"source {venv}/bin/activate") + logger.debug("Activating venv: %s", venv) arg_list = [f"{interpreter} {script_path}"] for key, value in script_args.items(): - arg_name = key.replace("_", "-") + arg_name = key.replace("_", "-") if underscore_to_hyphen else key + if value is None: + logger.debug("Skipping arg '%s': value is None (flag omitted)", key) + continue if param_setter: - arg_list.append(f"--{param_setter} {key} {value}") + if isinstance(value, bool): + if value: + arg_list.append(f"--{param_setter} {key}") + logger.debug("Setter flag: --%s %s (store_true)", param_setter, key) + else: + logger.debug("Skipping flag '%s': False → omitted", key) + else: + arg_list.append(f"--{param_setter} {key} {value}") + logger.debug("Setter arg: --%s %s %s", param_setter, key, value) else: if isinstance(value, bool): - if value: arg_list.append(f"--{arg_name}") + if value: + arg_list.append(f"--{arg_name}") + logger.debug("Flag present: --%s", arg_name) + else: + logger.debug("Skipping flag '--%s': False → omitted", arg_name) else: arg_list.append(f"--{arg_name} {value}") - parts.append(" ".join(arg_list)) - return " && ".join(parts) + logger.debug("Arg: --%s %s", arg_name, value) + cmd = " && ".join(parts + [" ".join(arg_list)]) + logger.debug("Shell command: %s", cmd) + return cmd + + +def build_container_command(interpreter: str, script_path: str, script_args: dict, param_setter: Optional[str], underscore_to_hyphen: bool = True) -> str: + """Build a bare CLI invocation suitable for running inside a container. + + Unlike :func:`build_shell_command` this function does **not** prepend + ``cd`` or ``source venv`` – those are not needed (or available) inside an + already-running container image. + """ + prefix = f"{interpreter} " if interpreter else "" + arg_list = [f"{prefix}{script_path}".strip()] + for key, value in script_args.items(): + arg_name = key.replace("_", "-") if underscore_to_hyphen else key + if value is None: + logger.debug("Container cmd: skipping '%s' (None)", key) + continue + if param_setter: + if isinstance(value, bool): + if value: + arg_list.append(f"--{param_setter} {key}") + else: + pass # omit + else: + arg_list.append(f"--{param_setter} {key} {value}") + else: + if isinstance(value, bool): + if value: + arg_list.append(f"--{arg_name}") + # else omit + else: + arg_list.append(f"--{arg_name} {value}") + cmd = " ".join(arg_list) + logger.debug("Container command: %s", cmd) + return cmd + + +def build_vela_job_yaml( + template_path: str, + trial_id: int, + gpu_count: int, + container_cmd: str, + placeholder: str, +) -> tuple[str, str]: + """Load *template_path* as raw text, inject HPO parameters, return ``(yaml_str, job_name)``. + + All modifications are done via targeted regex/string substitutions on the raw + YAML text so that multi-line block scalars (e.g. awk pipelines), single-quoted + strings, and other constructs that PyYAML would mangle on a load→dump round-trip + are preserved exactly as written in the template. + + Changes applied: + * ``jobName`` gets a ``-trial-{trial_id}`` suffix (unique Kubernetes resource). + * ``numGpusPerPod`` is overwritten with *gpu_count*. + * The *placeholder* string inside ``setupCommands`` is replaced with + *container_cmd* in-place, preserving any surrounding wrapper (e.g. awk pipeline). + """ + with open(template_path, "r") as fh: + text = fh.read() + + # ── jobName ────────────────────────────────────────────────────────────── + job_name_match = re.search(r'^(jobName\s*:\s*["\']?)([^"\'#\n]+)(["\']?)', text, re.MULTILINE) + if not job_name_match: + raise ValueError(f"'jobName' key not found in template '{template_path}'") + raw_name = job_name_match.group(2).strip() + job_name = f"{raw_name}-trial-{trial_id}" + text = ( + text[:job_name_match.start(2)] + + job_name_match.group(2).replace(raw_name, job_name) + + text[job_name_match.end(2):] + ) + logger.debug("Vela trial %d: jobName → %s", trial_id, job_name) + + # ── numGpusPerPod ──────────────────────────────────────────────────────── + text = re.sub( + r'^(numGpusPerPod\s*:\s*)\S+', + lambda m: f"{m.group(1)}{gpu_count}", + text, + flags=re.MULTILINE, + ) + logger.debug("Vela trial %d: numGpusPerPod → %d", trial_id, gpu_count) + + # ── placeholder substitution ───────────────────────────────────────────── + if placeholder in text: + text = text.replace(placeholder, container_cmd) + logger.debug("Vela trial %d: substituted placeholder '%s'", trial_id, placeholder) + else: + logger.warning( + "Vela trial %d: placeholder '%s' not found in template '%s' – appending command", + trial_id, placeholder, template_path, + ) + text += f"\n - {container_cmd}\n" + + return text, job_name + + +def _oc(*args, namespace: Optional[str] = None, check: bool = True, capture: bool = False): + """Run an ``oc`` sub-command, optionally capturing output.""" + cmd = ["oc"] + list(args) + if namespace: + cmd += ["-n", namespace] + logger.debug("oc command: %s", " ".join(cmd)) + if capture: + return subprocess.run(cmd, check=check, capture_output=True, text=True) + return subprocess.run(cmd, check=check) + + +def run_vela_trial( + trial_id: int, + job_yaml: str, + chart_path: str, + job_name: str, + namespace: Optional[str], + out_file: str, + err_file: str, + pod_ready_timeout: int, + job_timeout: int, +) -> None: + """Submit a Vela/OpenShift PyTorchJob, stream its logs, and wait for completion. + + Steps + ----- + 1. Write *job_yaml* to a temp file. + 2. ``helm template -f | oc create [-n ] -f-`` + 3. Poll until the master pod (``-master-0``) appears. + 4. ``oc logs -f `` – streams every line to stdout **and** *out_file*. + 5. After streaming ends, check the pod's terminated exit-code. + Non-zero → raise :class:`subprocess.CalledProcessError`. + 6. Cleanup: delete the PyTorchJob resource. + """ + ns_args = ["-n", namespace] if namespace else [] + prefix = f"[trial-{trial_id}]" + + # Write temp YAML + with tempfile.NamedTemporaryFile( + mode="w", + suffix=".yaml", + prefix=f"vela_trial_{trial_id}_", + delete=False, + ) as fh: + fh.write(job_yaml) + tmp_yaml = fh.name + logger.debug("Vela trial %d: temp YAML written to %s", trial_id, tmp_yaml) + + try: + # ── 1. Submit ────────────────────────────────────────────────────────── + ns_flag = f"-n {namespace}" if namespace else "" + create_cmd = ( + f"helm template -f {tmp_yaml} {chart_path}" + f" | oc create {ns_flag} -f-" + ) + logger.info("Trial %d: submitting Vela job → %s", trial_id, create_cmd) + result = subprocess.run(create_cmd, shell=True, capture_output=True, text=True) + with _print_lock: + sys.stdout.write(f"{prefix} {result.stdout}") + sys.stdout.flush() + if result.returncode != 0: + raise RuntimeError( + f"Vela trial {trial_id}: oc create failed (rc={result.returncode}):\n" + f"{result.stderr}" + ) + logger.info("Trial %d: job '%s' created", trial_id, job_name) + + # ── 2. Wait for master pod to appear ────────────────────────────────── + master_pod = f"{job_name}-master-0" + deadline = time.monotonic() + pod_ready_timeout + logger.info("Trial %d: waiting for pod '%s' to appear (timeout %ds)…", trial_id, master_pod, pod_ready_timeout) + while time.monotonic() < deadline: + r = subprocess.run( + ["oc", "get", "pod", master_pod, "--ignore-not-found"] + ns_args, + capture_output=True, text=True, + ) + if master_pod in r.stdout: + logger.debug("Trial %d: pod '%s' found", trial_id, master_pod) + break + time.sleep(5) + else: + raise TimeoutError( + f"Vela trial {trial_id}: pod '{master_pod}' did not appear within {pod_ready_timeout}s" + ) + + # ── 3. Wait for pod to be Running/Succeeded ─────────────────────────── + logger.info("Trial %d: waiting for pod '%s' to be Running…", trial_id, master_pod) + wait_cmd = ( + ["oc", "wait", f"pod/{master_pod}", + "--for=condition=Ready", + f"--timeout={pod_ready_timeout}s"] + + ns_args + ) + wr = subprocess.run(wait_cmd, capture_output=True, text=True) + # oc wait returns non-zero if the pod is already Completed (no Ready condition); + # that's fine – the logs are still accessible. + logger.debug("Trial %d: oc wait rc=%d stderr=%s", trial_id, wr.returncode, wr.stderr.strip()) + + # ── 4. Stream logs ──────────────────────────────────────────────────── + log_cmd = ["oc", "logs", "-f", master_pod] + ns_args + logger.info("Trial %d: streaming logs from '%s'", trial_id, master_pod) + log_proc = subprocess.Popen( + log_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + t_out = threading.Thread( + target=_stream_pipe, + args=(log_proc.stdout, out_file, trial_id, "stdout", sys.stdout), + daemon=True, + ) + t_err = threading.Thread( + target=_stream_pipe, + args=(log_proc.stderr, err_file, trial_id, "stderr", sys.stderr), + daemon=True, + ) + t_out.start() + t_err.start() + log_proc.wait(timeout=job_timeout) + t_out.join() + t_err.join() + logger.debug("Trial %d: log stream ended (rc=%d)", trial_id, log_proc.returncode) + + # ── 4b. If oc logs exited early (e.g. "unexpected EOF"), the pod may + # still be running. Re-attach the log stream and wait for it to + # finish so we capture the full output and don't delete a live job. + if log_proc.returncode != 0: + logger.warning( + "Trial %d: oc logs exited with rc=%d (possible EOF disconnect) – " + "waiting for pod to terminate before reading exit code", + trial_id, log_proc.returncode, + ) + # Wait for pod phase Succeeded or Failed (container terminated). + oc_wait_phase = subprocess.run( + ["oc", "wait", f"pod/{master_pod}", + "--for=jsonpath={.status.phase}=Succeeded", + f"--timeout={job_timeout}s"] + + ns_args, + capture_output=True, text=True, + ) + if oc_wait_phase.returncode != 0: + # Pod may have Failed; try that phase too. + subprocess.run( + ["oc", "wait", f"pod/{master_pod}", + "--for=jsonpath={.status.phase}=Failed", + f"--timeout=30s"] + + ns_args, + capture_output=True, text=True, + ) + # Re-stream any log lines written after the disconnect into the same files. + catchup = subprocess.run( + ["oc", "logs", "--tail=-1", master_pod] + ns_args, + capture_output=True, text=True, + ) + if catchup.stdout: + with open(out_file, "a", encoding="utf-8", errors="replace") as fh: + fh.write(catchup.stdout) + if catchup.stderr: + with open(err_file, "a", encoding="utf-8", errors="replace") as fh: + fh.write(catchup.stderr) + + # ── 5. Check pod exit code ──────────────────────────────────────────── + # Poll until the pod has a terminated exit code (handles the race + # between oc-logs EOF and pod termination being recorded in the API). + exit_code_str = "" + for _attempt in range(30): + ec_result = subprocess.run( + ["oc", "get", "pod", master_pod, "-o", + "jsonpath={.status.containerStatuses[0].state.terminated.exitCode}"] + + ns_args, + capture_output=True, text=True, + ) + exit_code_str = ec_result.stdout.strip() + if exit_code_str.lstrip("-").isdigit(): + break + logger.debug("Trial %d: exit code not yet available, retrying in 5 s…", trial_id) + time.sleep(5) + exit_code = int(exit_code_str) if exit_code_str.lstrip("-").isdigit() else 0 + logger.info("Trial %d: pod exit code = %s", trial_id, exit_code) + if exit_code != 0: + logger.warning("Trial %d: pod exited with code %d – marking trial as pruned", trial_id, exit_code) + raise optuna.exceptions.TrialPruned(f"pod exited with code {exit_code}") + + finally: + # ── 6. Cleanup – delete the job ─────────────────────────────────────── + logger.debug("Trial %d: deleting PyTorchJob '%s'", trial_id, job_name) + subprocess.run( + ["oc", "delete", "pytorchjob", job_name, "--ignore-not-found"] + ns_args, + capture_output=True, + ) + try: + os.unlink(tmp_yaml) + except OSError: + pass + +# ============================================================ +# PARALLEL STREAMING RUNNER +# ============================================================ + +_print_lock = threading.Lock() + +def _stream_pipe(pipe, dest_file, trial_id: int, stream_name: str, dest_stream): + """Read lines from *pipe*, write to *dest_file* and print prefixed to *dest_stream*.""" + prefix = f"[trial-{trial_id}]" + with open(dest_file, "w", encoding="utf-8", errors="replace") as fh: + for raw in pipe: + line = raw.decode("utf-8", errors="replace") + fh.write(line) + fh.flush() + with _print_lock: + dest_stream.write(f"{prefix} {line}") + dest_stream.flush() + +def run_and_stream(launcher_cmd: str, trial_id: int, out_file: str, err_file: str, wlm: str): + """ + Run *launcher_cmd* in a shell. + + For ``wlm='none'``: captures stdout and stderr via PIPE, streams every line + to the main process stdout/stderr (prefixed with ``[trial-N]``), and also + writes them to *out_file* / *err_file* for later metric extraction. + + For WLM backends (lsf, slurm, …): the WLM tool itself manages the output + files on the cluster. The local subprocess output (WLM status messages, + errors) is still streamed with the same prefix so parallel workers are + distinguishable. + """ + logger.debug("Trial %d: run_and_stream wlm=%s cmd=%s", trial_id, wlm, launcher_cmd) + proc = subprocess.Popen( + launcher_cmd, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + if wlm == "none": + # Full capture: write to files AND stream to console + t_out = threading.Thread( + target=_stream_pipe, + args=(proc.stdout, out_file, trial_id, "stdout", sys.stdout), + daemon=True, + ) + t_err = threading.Thread( + target=_stream_pipe, + args=(proc.stderr, err_file, trial_id, "stderr", sys.stderr), + daemon=True, + ) + else: + # WLM manages the cluster output files (out_file/err_file) itself. + # Stream only the local WLM tool output (bsub/srun status messages) + # to console; write it to separate local files to avoid clobbering the + # cluster-managed trial output files. + wlm_out = out_file.replace(".out", "_wlm.out") + wlm_err = err_file.replace(".err", "_wlm.err") + t_out = threading.Thread( + target=_stream_pipe, + args=(proc.stdout, wlm_out, trial_id, "wlm-stdout", sys.stdout), + daemon=True, + ) + t_err = threading.Thread( + target=_stream_pipe, + args=(proc.stderr, wlm_err, trial_id, "wlm-stderr", sys.stderr), + daemon=True, + ) + + t_out.start() + t_err.start() + proc.wait() + t_out.join() + t_err.join() + + if proc.returncode != 0: + raise subprocess.CalledProcessError(proc.returncode, launcher_cmd) # ============================================================ # MULTI-METRIC EXTRACTION # ============================================================ -def extract_metrics_from_log(path: str, metric_names: List[str]) -> List[float]: +def extract_metrics_from_log(path: str, metric_names: List[str], err_path: Optional[str] = None) -> List[float]: + """Extract metric values from a log file. + + Each entry in *metric_names* is either a plain name (uses the **last** + match) or ``name#N`` to select the **N-th occurrence** (0-based). This + lets you disambiguate scripts that print the same metric key multiple + times, e.g.:: + + metrics: + - "Samples/sec#0" # DataLoader throughput (first occurrence) + - "Samples/sec#1" # Training throughput (second occurrence) + - "Samples/sec#2" # Inference throughput (third occurrence) + - GFLOPS + """ + logger.debug("Extracting metrics %s from '%s'", metric_names, path) results = [] with open(path, "r", encoding="utf-8", errors="ignore") as f: text = f.read() - + logger.debug("Log file '%s': %d characters read", path, len(text)) + # Also read stderr — Lightning/rich writes test result tables there + if err_path: + try: + with open(err_path, "r", encoding="utf-8", errors="ignore") as f: + err_text = f.read() + logger.debug("Err file '%s': %d characters read", err_path, len(err_text)) + text = text + "\n" + err_text + except FileNotFoundError: + logger.debug("Err file '%s' not found, skipping", err_path) + for metric in metric_names: - pattern = re.compile(rf"{re.escape(metric)}\s*[:=]\s*([-+]?\d*\.?\d+(?:[eE][-+]?\d+)?)") + # Support name#N syntax for Nth-occurrence selection (0-based) + occurrence: Optional[int] = None + bare_metric = metric + idx_match = re.fullmatch(r'(.+)#(\d+)', metric) + if idx_match: + bare_metric = idx_match.group(1) + occurrence = int(idx_match.group(2)) + + # Matches: key: value | key=value | [performance] key : value | Lightning table │ key │ value │ + pattern = re.compile( + rf"(?:\[\w+\]\s*)?{re.escape(bare_metric)}\s*(?:[:=│])\s*([-+]?\d*\.?\d+(?:[eE][-+]?\d+)?)" + ) matches = pattern.findall(text) if not matches: - print(f"Warning: Metric '{metric}' not found in {path}. Defaulting to 0.0") + logger.warning("Metric '%s' not found in '%s' — defaulting to 0.0", metric, path) results.append(0.0) + elif occurrence is not None: + if occurrence >= len(matches): + logger.warning( + "Metric '%s' occurrence #%d requested but only %d match(es) found — defaulting to 0.0", + metric, occurrence, len(matches), + ) + results.append(0.0) + else: + value = float(matches[occurrence]) + logger.debug("Metric '%s': using occurrence #%d = %s", metric, occurrence, value) + results.append(value) else: - results.append(float(matches[-1])) + value = float(matches[-1]) + logger.debug("Metric '%s': found %d match(es), using last value %s", metric, len(matches), value) + results.append(value) return results # ============================================================ @@ -120,71 +652,188 @@ def extract_metrics_from_log(path: str, metric_names: List[str]) -> List[float]: def load_hpo_space(args): data = {} - if args.hpo_json: data = json.loads(args.hpo_json) + if args.hpo_json: + logger.debug("Loading HPO space from JSON string") + data = json.loads(args.hpo_json) elif args.hpo_yaml: + logger.debug("Loading HPO space from YAML file: %s", args.hpo_yaml) with open(args.hpo_yaml, "r") as f: data = yaml.safe_load(f) - return data.get("hpo", {}) + space = data.get("hpo", {}) + logger.info("HPO space loaded: %d parameter(s): %s", len(space), list(space.keys())) + return space + +def load_metrics_from_yaml(args): + """Return metrics list from YAML 'metrics:' key, or None if not present.""" + data = {} + if args.hpo_json: + data = json.loads(args.hpo_json) + elif args.hpo_yaml: + with open(args.hpo_yaml, "r") as f: data = yaml.safe_load(f) + elif args.static_args_yaml: + with open(args.static_args_yaml, "r") as f: data = yaml.safe_load(f) + metrics = data.get("metrics", None) + if metrics is None: + return None + if isinstance(metrics, list): + return [m.strip() for m in metrics] + return [m.strip() for m in str(metrics).split(",")] def load_static_args(args): data = {} - if args.static_args_json: data = json.loads(args.static_args_json) + if args.static_args_json: + logger.debug("Loading static args from JSON string") + data = json.loads(args.static_args_json) elif args.static_args_yaml: + logger.debug("Loading static args from YAML file: %s", args.static_args_yaml) with open(args.static_args_yaml, "r") as f: data = yaml.safe_load(f) elif args.hpo_yaml: + logger.debug("Loading static args from HPO YAML file: %s", args.hpo_yaml) with open(args.hpo_yaml, "r") as f: data = yaml.safe_load(f) - return data.get("static", data if data else {}) + static = data.get("static", data if data else {}) + logger.info("Static args loaded: %d key(s): %s", len(static), list(static.keys())) + return static def suggest_from_spec(trial, name, spec): t = spec["type"] - if t == "float": return trial.suggest_float(name, float(spec["low"]), float(spec["high"]), log=spec.get("log", False)) - if t == "int": return trial.suggest_int(name, int(spec["low"]), int(spec["high"]), log=spec.get("log", False)) - if t == "categorical": return trial.suggest_categorical(name, spec["choices"]) - raise ValueError(f"Unknown param type: {t}") + if t == "float": + val = trial.suggest_float(name, float(spec["low"]), float(spec["high"]), log=spec.get("log", False)) + elif t == "int": + val = trial.suggest_int(name, int(spec["low"]), int(spec["high"]), log=spec.get("log", False)) + elif t == "categorical": + val = trial.suggest_categorical(name, spec["choices"]) + elif t == "flag": + val = trial.suggest_categorical(name, [True, False]) + elif t == "group": + val = trial.suggest_categorical(name, list(spec["choices"].keys())) + else: + raise ValueError(f"Unknown param type: {t}") + logger.debug("Suggested '%s' (%s) = %r", name, t, val) + return val def main(): args = parse_args() + + logging.basicConfig( + level=getattr(logging, args.log_level), + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + # Suppress noisy optuna INFO logs unless user asked for DEBUG + logging.getLogger("optuna").setLevel( + logging.WARNING if args.log_level == "INFO" else getattr(logging, args.log_level) + ) + + logger.info("iterate2 starting") + logger.info("Log level: %s", args.log_level) + logger.info("WLM: %s | interpreter: %s | script: %s", args.wlm, args.interpreter, args.script) + logger.info("Optuna study: '%s' | db: %s | n_trials: %d", args.optuna_study_name, args.optuna_db_path, args.optuna_n_trials) + hpo_space = load_hpo_space(args) static_args = load_static_args(args) - metric_list = [m.strip() for m in args.metrics.split(",")] - + yaml_metrics = load_metrics_from_yaml(args) + metric_list = yaml_metrics if yaml_metrics is not None else [m.strip() for m in args.metrics.split(",")] + logger.info("Optimising metrics: %s (source: %s)", metric_list, "yaml" if yaml_metrics else "cli") + script_path, root_dir = resolve_paths(args.script, args.root_dir) def objective(trial): script_args = static_args.copy() for name, spec in hpo_space.items(): - script_args[name] = suggest_from_spec(trial, name, spec) + val = suggest_from_spec(trial, name, spec) + if spec["type"] == "group": + # Expand the chosen group's key→value pairs directly into script_args + script_args.update(spec["choices"][val]) + else: + script_args[name] = val + + # gpu_num in hpo/static overrides the CLI --gpu-count for this trial's launcher + gpu_count = int(script_args.pop("gpu_num", args.gpu_count)) + logger.debug("Trial %d: effective gpu_count=%d", trial.number, gpu_count) + logger.info("Trial %d: sampled parameters: %s", trial.number, script_args) out_file = f"trial_{trial.number}.out" err_file = f"trial_{trial.number}.err" + logger.debug("Trial %d: stdout → %s | stderr → %s", trial.number, out_file, err_file) + + if args.wlm == "vela": + # ── Vela / OpenShift path ────────────────────────────────────── + if not args.vela_job_template: + raise ValueError("--vela-job-template is required when --wlm vela") + if not args.vela_chart_path: + raise ValueError("--vela-chart-path is required when --wlm vela") + container_cmd = build_container_command( + args.interpreter, script_path, script_args, + args.param_setter, args.underscore_to_hyphen, + ) + logger.info("Trial %d: container command → %s", trial.number, container_cmd) + job_yaml, job_name = build_vela_job_yaml( + args.vela_job_template, + trial.number, + gpu_count, + container_cmd, + args.vela_cmd_placeholder, + ) + logger.debug("Trial %d: job YAML (first 400 chars):\n%s", trial.number, job_yaml[:400]) + run_vela_trial( + trial_id=trial.number, + job_yaml=job_yaml, + chart_path=args.vela_chart_path, + job_name=job_name, + namespace=args.vela_namespace, + out_file=out_file, + err_file=err_file, + pod_ready_timeout=args.vela_pod_ready_timeout, + job_timeout=args.vela_job_timeout, + ) + else: + # ── Standard WLM path (lsf / slurm / none) ──────────────────── + shell_cmd = build_shell_command(args.interpreter, root_dir, script_path, args.venv, script_args, args.param_setter, args.underscore_to_hyphen, pre_run_commands=args.pre_run_commands) + launcher_cmd = build_launcher_command(args.wlm, shell_cmd, trial.number, out_file, err_file, gpu_count, args.cpu_count, args.mem_gb, args.lsf_gpu_config_string) + logger.info("Trial %d: submitting → %s", trial.number, launcher_cmd) + run_and_stream(launcher_cmd, trial.number, out_file, err_file, args.wlm) - shell_cmd = build_shell_command(args.interpreter, root_dir, script_path, args.venv, script_args, args.param_setter) - launcher_cmd = build_launcher_command(args.wlm, shell_cmd, trial.number, out_file, err_file, args.gpu_count, args.cpu_count, args.mem_gb, args.lsf_gpu_config_string) + logger.info("Trial %d: job finished", trial.number) - print(f"Trial {trial.number}: Running...") - subprocess.run(launcher_cmd, shell=True, check=True) + values = extract_metrics_from_log(out_file, metric_list, err_path=err_file) + logger.info("Trial %d: results %s", trial.number, dict(zip(metric_list, values))) - values = extract_metrics_from_log(out_file, metric_list) - print(f"Trial {trial.number} results: {dict(zip(metric_list, values))}") - return tuple(values) # Multi-objective direction directions = ["maximize"] * len(metric_list) + logger.info("Creating Optuna study (directions: %s)", directions) + + if args.optuna_db_path.startswith("js:///"): + journal_path = args.optuna_db_path[len("js:///"):] + logger.info("Using JournalStorage at '%s'", journal_path) + storage = JournalStorage(JournalFileStorage(journal_path)) + elif "sqlite" in args.optuna_db_path: + storage = args.optuna_db_path + else: + storage = f"sqlite:///{args.optuna_db_path}" + logger.debug("Optuna storage: %s", storage) study = optuna.create_study( study_name=args.optuna_study_name, - storage=f"sqlite:///{args.optuna_db_path}" if "sqlite" not in args.optuna_db_path else args.optuna_db_path, + storage=storage, directions=directions, load_if_exists=True, ) + logger.info("Study '%s' ready (existing trials: %d)", args.optuna_study_name, len(study.trials)) - study.optimize(objective, n_trials=args.optuna_n_trials) + logger.info("Parallelism: %d worker(s)", args.parallelism) + study.optimize( + objective, + n_trials=args.optuna_n_trials, + n_jobs=args.parallelism, + catch=(Exception,), # mark trial as FAILED and continue; never crash the study + ) - print("\n" + "="*60) - print("OPTIMIZATION COMPLETE") - print(f"Pareto Front Trials: {len(study.best_trials)}") + logger.info("=" * 60) + logger.info("OPTIMIZATION COMPLETE") + logger.info("Pareto Front Trials: %d", len(study.best_trials)) for t in study.best_trials: - print(f"Trial {t.number}: Values={t.values}") + logger.info(" Trial %d: Values=%s Params=%s", t.number, t.values, t.params) if __name__ == "__main__": main() \ No newline at end of file