diff --git a/cvs/lib/docker_lib.py b/cvs/lib/docker_lib.py index a904ab63..3c9b9019 100644 --- a/cvs/lib/docker_lib.py +++ b/cvs/lib/docker_lib.py @@ -108,9 +108,18 @@ def launch_docker_container( network='host', shm_size='64G', timeout=60 * 10, + privileged=True, + extra_run_args='', ): + # `--privileged` exposes every host device (including all RDMA HCAs under + # /dev/infiniband) regardless of `device_list`. Callers that need to + # restrict device discovery pass `privileged=False` and an explicit + # `device_list`; defaults preserve the historical privileged behavior. + priv_flag = '--privileged ' if privileged else '' cmd = f'docker run -d --network {network} --ipc {network} \ - --cap-add=IPC_LOCK --security-opt seccomp=unconfined --privileged ' + --cap-add=IPC_LOCK --security-opt seccomp=unconfined {priv_flag}' + if extra_run_args: + cmd = cmd + f'{extra_run_args} ' for device in device_list: cmd = cmd + f' --device {device} ' for src_vol in volume_dict.keys(): diff --git a/cvs/lib/linux_utils.py b/cvs/lib/linux_utils.py index 9d01dcf8..bf9bbf42 100644 --- a/cvs/lib/linux_utils.py +++ b/cvs/lib/linux_utils.py @@ -331,6 +331,60 @@ def get_rdma_capable_devices_dict(phdl): return rdma_cap_dict +def get_uverbs_devices_for_hcas(phdl, hca_csv, include_rdma_cm=True): + """ + Resolve a comma-separated list of RDMA HCA (ibdev) names to their + /dev/infiniband/uverbsN character-device paths on each node. + + This is used to expose ONLY a chosen subset of RDMA devices to an + unprivileged container (`docker run --device ...`), so that RDMA device + discovery (e.g. `ibv_devinfo`) inside the container is restricted to the + requested HCAs instead of every device the host exposes. + + The ibdev -> uverbs mapping is read per node from + /sys/class/infiniband_verbs/uverbs*/ibdev because the uverbs index is not + guaranteed to follow ibdev ordering and can differ across nodes. + + Args: + phdl: parallel-ssh handle; phdl.exec(cmd) returns {node: stdout}. + hca_csv (str): comma-separated ibdev names (e.g. 'rocep28s0,rocep62s0'). + include_rdma_cm (bool): append '/dev/infiniband/rdma_cm' to each node's + list (required for RDMA connection management). + + Returns: + dict: {node: ['/dev/infiniband/uverbsN', ..., '/dev/infiniband/rdma_cm']} + ordered to follow hca_csv. HCAs that cannot be resolved on a node are + skipped with a warning. + """ + requested = [h.strip() for h in hca_csv.split(',') if h.strip()] + cmd = 'for u in /sys/class/infiniband_verbs/uverbs*; do echo "$(basename $u) $(cat $u/ibdev)"; done' + out_dict = phdl.exec(cmd) + dev_dict = {} + for node in out_dict.keys(): + # Build ibdev -> uverbs name map for this node. + ibdev_to_uverbs = {} + for line in out_dict[node].split('\n'): + parts = line.split() + if len(parts) != 2: + continue + uverbs_name, ibdev = parts[0], parts[1] + if uverbs_name.startswith('uverbs'): + ibdev_to_uverbs[ibdev] = uverbs_name + node_devs = [] + for hca in requested: + if hca in ibdev_to_uverbs: + node_devs.append(f'/dev/infiniband/{ibdev_to_uverbs[hca]}') + else: + log.warning( + f'HCA {hca} not found in /sys/class/infiniband_verbs on node {node}; ' + f'available ibdevs: {sorted(ibdev_to_uverbs.keys())}' + ) + if include_rdma_cm: + node_devs.append('/dev/infiniband/rdma_cm') + dev_dict[node] = node_devs + return dev_dict + + def get_backend_nic_dict(phdl): lshw_dict = get_lshw_network_dict(phdl) rdma_cap_devs = get_rdma_capable_devices_dict(phdl) diff --git a/cvs/lib/sglang_disagg_lib.py b/cvs/lib/sglang_disagg_lib.py index bf41e239..4b987334 100644 --- a/cvs/lib/sglang_disagg_lib.py +++ b/cvs/lib/sglang_disagg_lib.py @@ -5,6 +5,7 @@ All code contained here is Property of Advanced Micro Devices, Inc. ''' +import json import os import re import time @@ -33,6 +34,202 @@ def textwrap_for_yml(msg_string): return '\n'.join([m.lstrip() for m in msg_string.split('\n')]) +def _is_lower_better(metric_name): + """Return True if a smaller value is better for the given metric. + + Latency-style metrics (anything ending in `_ms` or mentioning latency / + ttft / tpot / itl / e2el) are better when lower; throughput, count and + accuracy metrics are better when higher. This reproduces the historical + comparison direction used by verify_inference_results while being explicit. + """ + return bool(re.search(r'_ms$|latency|ttft|tpot|itl|e2el', metric_name, re.I)) + + +def parse_gsm8k_metrics(text): + """Extract gsm8k bench_sglang.py metrics from one node's stdout. + + The throughput is stored under `tokens_per_sec` so it lines up with the + gsm8k `expected_results` threshold key. + + Returns a dict with any of: accuracy, invalid, latency_s, tokens_per_sec. + """ + patterns = { + 'accuracy': r'Accuracy:\s+([0-9.]+)', + 'invalid': r'Invalid:\s+([0-9.]+)', + 'latency_s': r'Latency:\s+([0-9.]+)\s*s', + 'tokens_per_sec': r'Output throughput:\s+([0-9.]+)\s+token', + } + metrics = {} + for key, pattern in patterns.items(): + match = re.search(pattern, text, re.I) + if match: + metrics[key] = match.group(1) + return metrics + + +def format_metrics_table(test_name, results_dict, expected_dict=None): + """Render a uniform, greppable per-node metrics block. + + Args: + test_name: label for the block header (e.g. 'gsm8k'). + results_dict: {node: {metric: value}}. + expected_dict: optional {metric: threshold}. When provided, each + metric that has a threshold gets a `[expected ] ` + suffix (PASS/FAIL, or UNPARSED if the value isn't numeric). + + Returns a multi-line string (one block per node). + """ + expected_dict = expected_dict or {} + lines = [] + for node in results_dict.keys(): + metrics = results_dict[node] + lines.append(f'================ METRICS [{test_name}] node={node} ================') + width = max((len(m) for m in metrics), default=0) + for metric in metrics: + value = metrics[metric] + line = f' {metric:<{width}} = {value}' + if metric in expected_dict: + lower_better = _is_lower_better(metric) + op = '<=' if lower_better else '>=' + try: + actual_f = float(value) + exp_f = float(expected_dict[metric]) + ok = actual_f <= exp_f if lower_better else actual_f >= exp_f + verdict = 'PASS' if ok else 'FAIL' + except (TypeError, ValueError): + verdict = 'UNPARSED' + line += f' [expected {op} {expected_dict[metric]}] {verdict}' + lines.append(line) + lines.append('=' * 66) + return '\n'.join(lines) + + +# Ordered (label, key) pairs for the full sglang.bench_serving "Serving +# Benchmark Result" block. Longer labels that are prefixes of shorter ones are +# listed first so the generic `