diff --git a/modules/local/aggregate_benchmark_report_data/bin/benchmark_report_aggregate.py b/modules/local/aggregate_benchmark_report_data/bin/benchmark_report_aggregate.py index 26b8427..9cc6f6f 100644 --- a/modules/local/aggregate_benchmark_report_data/bin/benchmark_report_aggregate.py +++ b/modules/local/aggregate_benchmark_report_data/bin/benchmark_report_aggregate.py @@ -62,17 +62,37 @@ def _lookup_cost( return None -def _cost_or_task(cost_row: dict[str, Any] | None, key: str, task_cost: float, default: float = 0.0) -> float: +def _cost_or_task(cost_row: dict[str, Any] | None, key: str, default: float = 0.0) -> float: if not cost_row: - return task_cost if key in {"cost", "used_cost"} else default + return default value = cost_row.get(key) if value is None: - return task_cost if key in {"cost", "used_cost"} else default + return default return float(value) +def _summarize_missing_processes( + missing_process_counts: dict[str, int], preview_limit: int = 3 +) -> tuple[list[dict[str, Any]], str]: + ordered = sorted(missing_process_counts.items(), key=lambda item: (-item[1], item[0])) + preview = [ + {"process_short": process_short, "missing_tasks": missing_tasks} + for process_short, missing_tasks in ordered[:preview_limit] + ] + parts = [ + f"{item['process_short']} ({item['missing_tasks']})" + if item["missing_tasks"] != 1 + else item["process_short"] + for item in preview + ] + hidden_count = max(len(ordered) - preview_limit, 0) + if hidden_count: + parts.append(f"+{hidden_count} more") + return preview, ", ".join(parts) + + def _is_highlight_process(process: str) -> bool: process_lc = process.lower() return any(keyword in process_lc for keyword in _HIGHLIGHT_KEYWORDS) @@ -240,9 +260,11 @@ def build_report_data(jsonl_dir: Path, include_failed_runs: bool = False) -> dic "unused_cost": 0.0, } + costs_jsonl_path = jsonl_dir / "costs.jsonl" + cur_supplied = costs_jsonl_path.exists() costs_index: dict[tuple[str, str, str], dict[str, Any]] = {} has_cost_rows = False - for c in _iter_jsonl(jsonl_dir / "costs.jsonl"): + for c in _iter_jsonl(costs_jsonl_path): has_cost_rows = True run_id = str(c.get("run_id", "")) process = str(c.get("process", "")) @@ -289,6 +311,10 @@ def build_report_data(jsonl_dir: Path, include_failed_runs: bool = False) -> dic task_run_acc: dict[str, dict[str, float]] = defaultdict( lambda: {"requested_cpu_h": 0.0, "requested_mem_gib_h": 0.0, "real_cpu_h": 0.0, "real_mem_gib_h": 0.0} ) + cost_coverage_runs: dict[tuple[str, str], dict[str, Any]] = {} + total_cost_tasks = 0 + matched_cost_tasks = 0 + missing_cost_tasks = 0 for t in _iter_jsonl(jsonl_dir / "tasks.jsonl"): run_id = str(t.get("run_id", "")) @@ -311,21 +337,40 @@ def build_report_data(jsonl_dir: Path, include_failed_runs: bool = False) -> dic } cost_row = _lookup_cost(costs_index, run_id=run_id, process=process, process_short=process_short, hash_short=hash_short) - task_cost = float(t.get("cost") or 0.0) + + if cur_supplied: + total_cost_tasks += 1 + coverage = cost_coverage_runs.setdefault( + run_group_key, + { + "run_id": run_id, + "group": group, + "total_tasks": 0, + "matched_tasks": 0, + "missing_tasks": 0, + "missing_process_counts": defaultdict(int), + }, + ) + coverage["total_tasks"] += 1 + if cost_row: + matched_cost_tasks += 1 + coverage["matched_tasks"] += 1 + else: + missing_cost_tasks += 1 + coverage["missing_tasks"] += 1 + missing_process = process_short or process or "unknown" + coverage["missing_process_counts"][missing_process] += 1 if cost_row: - run_cost_acc[run_group_key]["cost"] += _cost_or_task(cost_row, "cost", task_cost) - run_cost_acc[run_group_key]["used_cost"] += _cost_or_task(cost_row, "used_cost", task_cost) - run_cost_acc[run_group_key]["unused_cost"] += _cost_or_task(cost_row, "unused_cost", task_cost, default=0.0) - else: - run_cost_acc[run_group_key]["cost"] += task_cost - run_cost_acc[run_group_key]["used_cost"] += task_cost + run_cost_acc[run_group_key]["cost"] += _cost_or_task(cost_row, "cost") + run_cost_acc[run_group_key]["used_cost"] += _cost_or_task(cost_row, "used_cost") + run_cost_acc[run_group_key]["unused_cost"] += _cost_or_task(cost_row, "unused_cost") if has_cost_rows: overview_key = (group, process_short) - cost_group_acc[overview_key]["total_cost"] += _cost_or_task(cost_row, "cost", task_cost) - cost_group_acc[overview_key]["used_cost"] += _cost_or_task(cost_row, "used_cost", task_cost) - cost_group_acc[overview_key]["unused_cost"] += _cost_or_task(cost_row, "unused_cost", task_cost, default=0.0) + cost_group_acc[overview_key]["total_cost"] += _cost_or_task(cost_row, "cost") + cost_group_acc[overview_key]["used_cost"] += _cost_or_task(cost_row, "used_cost") + cost_group_acc[overview_key]["unused_cost"] += _cost_or_task(cost_row, "unused_cost") cost_group_acc[overview_key]["n_tasks"] += 1 status = t.get("status") @@ -531,6 +576,34 @@ def build_report_data(jsonl_dir: Path, include_failed_runs: bool = False) -> dic ] cost_overview.sort(key=lambda x: float(x.get("total_cost") or 0), reverse=True) + runs_with_missing_costs = [] + for row in cost_coverage_runs.values(): + if int(row["missing_tasks"]) <= 0: + continue + missing_processes, missing_process_summary = _summarize_missing_processes(row["missing_process_counts"]) + runs_with_missing_costs.append( + { + "run_id": row["run_id"], + "group": row["group"], + "total_tasks": int(row["total_tasks"]), + "matched_tasks": int(row["matched_tasks"]), + "missing_tasks": int(row["missing_tasks"]), + "missing_processes": missing_processes, + "missing_process_summary": missing_process_summary, + } + ) + runs_with_missing_costs.sort(key=lambda row: (-row["missing_tasks"], str(row["group"]), str(row["run_id"]))) + + cost_coverage = { + "cur_supplied": cur_supplied, + "has_any_cost_rows": has_cost_rows, + "total_included_tasks": total_cost_tasks, + "matched_task_count": matched_cost_tasks, + "missing_task_count": missing_cost_tasks, + "coverage_pct": _round((matched_cost_tasks / total_cost_tasks) * 100.0, 1) if total_cost_tasks else None, + "runs_with_missing_costs": runs_with_missing_costs, + } + combined_task_runtime = [] for (pipeline, group), panel_acc in sorted(combined_runtime_acc.items(), key=lambda x: (x[0][0], x[0][1])): process_runtime_ms = panel_acc["process_runtime_ms"] @@ -600,6 +673,7 @@ def build_report_data(jsonl_dir: Path, include_failed_runs: bool = False) -> dic "task_table": task_table, "task_scatter": task_scatter, "cost_overview": cost_overview, + "cost_coverage": cost_coverage, } diff --git a/modules/local/aggregate_benchmark_report_data/tests/test_aggregate.py b/modules/local/aggregate_benchmark_report_data/tests/test_aggregate.py index dfe7bbd..158145e 100644 --- a/modules/local/aggregate_benchmark_report_data/tests/test_aggregate.py +++ b/modules/local/aggregate_benchmark_report_data/tests/test_aggregate.py @@ -28,18 +28,28 @@ def test_build_report_data_has_all_sections(tmp_path, make_run, flat_task, write "task_table", "task_scatter", "cost_overview", + "cost_coverage", } -def test_run_costs_without_cur_uses_task_cost(tmp_path, make_run, flat_task, write_run_json): +def test_run_costs_without_cur_are_zero(tmp_path, make_run, flat_task, write_run_json): data_dir = tmp_path / "data" jsonl_dir = tmp_path / "jsonl_bundle" write_run_json(data_dir, [make_run(tasks=[flat_task(cost=4.2)])]) normalize_jsonl(data_dir, jsonl_dir) data = build_report_data(jsonl_dir) - assert data["run_costs"][0]["cost"] == 4.2 + assert data["run_costs"][0]["cost"] == 0.0 assert data["run_costs"][0]["used_cost"] is None + assert data["cost_coverage"] == { + "cur_supplied": False, + "has_any_cost_rows": False, + "total_included_tasks": 0, + "matched_task_count": 0, + "missing_task_count": 0, + "coverage_pct": None, + "runs_with_missing_costs": [], + } def test_cur_zero_costs_do_not_fall_back_to_task_cost(tmp_path): @@ -101,6 +111,107 @@ def test_cur_zero_costs_do_not_fall_back_to_task_cost(tmp_path): assert data["run_costs"][0]["unused_cost"] == 0.0 assert data["cost_overview"][0]["total_cost"] == 0.0 assert data["cost_overview"][0]["used_cost"] == 0.0 + assert data["cost_coverage"]["cur_supplied"] is True + assert data["cost_coverage"]["coverage_pct"] == 100.0 + assert data["cost_coverage"]["missing_task_count"] == 0 + + +def test_partial_cur_coverage_is_reported_per_run_and_process(tmp_path): + jsonl_dir = tmp_path / "jsonl_bundle" + jsonl_dir.mkdir(parents=True) + + runs = [ + { + "run_id": "run1", + "group": "cpu", + "pipeline": "pipe", + "username": "u", + "pipeline_version": "main", + "nextflow_version": "24.10.0", + "platform_version": "x", + "succeeded": 2, + "failed": 0, + "cached": 0, + "executor": "awsbatch", + "region": "us-east-1", + "fusion_enabled": False, + "wave_enabled": False, + "container_engine": "docker", + "duration_ms": 10, + "cpu_time_ms": 1000, + "cpu_efficiency": 50.0, + "memory_efficiency": 50.0, + "read_bytes": 0, + "write_bytes": 0, + } + ] + tasks = [ + { + "run_id": "run1", + "group": "cpu", + "hash": "ab/cdef12", + "process": "foo:PROC_A", + "process_short": "PROC_A", + "name": "PROC_A", + "status": "COMPLETED", + "staging_ms": 0, + "realtime_ms": 1000, + "duration_ms": 1000, + "cost": None, + }, + { + "run_id": "run1", + "group": "cpu", + "hash": "ab/cdef13", + "process": "foo:PROC_B", + "process_short": "PROC_B", + "name": "PROC_B", + "status": "COMPLETED", + "staging_ms": 0, + "realtime_ms": 1000, + "duration_ms": 1000, + "cost": None, + }, + { + "run_id": "run1", + "group": "cpu", + "hash": "ab/cdef14", + "process": "foo:PROC_B", + "process_short": "PROC_B", + "name": "PROC_B_retry", + "status": "CACHED", + "staging_ms": 0, + "realtime_ms": 1000, + "duration_ms": 1000, + "cost": None, + }, + ] + costs = [ + {"run_id": "run1", "process": "foo:PROC_A", "hash": "abcdef12", "cost": 5.0, "used_cost": 4.0, "unused_cost": 1.0} + ] + + (jsonl_dir / "runs.jsonl").write_text("".join(json.dumps(r) + "\n" for r in runs)) + (jsonl_dir / "tasks.jsonl").write_text("".join(json.dumps(t) + "\n" for t in tasks)) + (jsonl_dir / "costs.jsonl").write_text("".join(json.dumps(c) + "\n" for c in costs)) + + data = build_report_data(jsonl_dir) + + assert data["run_costs"][0]["cost"] == 5.0 + assert data["cost_coverage"]["cur_supplied"] is True + assert data["cost_coverage"]["has_any_cost_rows"] is True + assert data["cost_coverage"]["total_included_tasks"] == 3 + assert data["cost_coverage"]["matched_task_count"] == 1 + assert data["cost_coverage"]["missing_task_count"] == 2 + assert data["cost_coverage"]["coverage_pct"] == 33.3 + + run_warning = data["cost_coverage"]["runs_with_missing_costs"][0] + assert run_warning["run_id"] == "run1" + assert run_warning["group"] == "cpu" + assert run_warning["total_tasks"] == 3 + assert run_warning["matched_tasks"] == 1 + assert run_warning["missing_tasks"] == 2 + assert run_warning["missing_process_summary"] == "PROC_B (2)" + assert run_warning["missing_processes"] == [{"process_short": "PROC_B", "missing_tasks": 2}] def test_task_table_includes_cached(tmp_path, make_run, flat_task, write_run_json): diff --git a/modules/local/normalize_benchmark_jsonl/bin/benchmark_report_normalize.py b/modules/local/normalize_benchmark_jsonl/bin/benchmark_report_normalize.py index 02e2066..e8a3849 100644 --- a/modules/local/normalize_benchmark_jsonl/bin/benchmark_report_normalize.py +++ b/modules/local/normalize_benchmark_jsonl/bin/benchmark_report_normalize.py @@ -179,7 +179,7 @@ def extract_tasks(runs: list[dict[str, Any]]) -> list[dict[str, Any]]: "peak_rss": task.get("peakRss", 0), "read_bytes": task.get("readBytes", 0), "write_bytes": task.get("writeBytes", 0), - "cost": task.get("cost"), + "cost": None, "executor": task.get("executor", ""), "machine_type": task.get("machineType", ""), "cloud_zone": task.get("cloudZone", ""), diff --git a/modules/local/normalize_benchmark_jsonl/tests/test_normalize.py b/modules/local/normalize_benchmark_jsonl/tests/test_normalize.py index d278a17..47d3f20 100644 --- a/modules/local/normalize_benchmark_jsonl/tests/test_normalize.py +++ b/modules/local/normalize_benchmark_jsonl/tests/test_normalize.py @@ -24,7 +24,7 @@ def test_cached_count_extracted(make_run, flat_task): def test_nested_tasks_unwrapped(make_run, nested_task): run = make_run(tasks=[nested_task(cost=2.0), nested_task(cost=3.0)]) rows = extract_tasks([run]) - assert sum(r["cost"] for r in rows) == 5.0 + assert all(r["cost"] is None for r in rows) def test_failed_tasks_filtered(make_run, flat_task): diff --git a/modules/local/render_benchmark_report/bin/benchmark_report_template.html b/modules/local/render_benchmark_report/bin/benchmark_report_template.html index f6a2bc1..10d426b 100644 --- a/modules/local/render_benchmark_report/bin/benchmark_report_template.html +++ b/modules/local/render_benchmark_report/bin/benchmark_report_template.html @@ -53,6 +53,37 @@ .callout-header:hover { background: {{ brand_accent_surface }}; } .callout-body { padding: 12px; display: none; } .callout-body.show { display: block; } + .warning-card { + border: 1px solid #f4b548; + background: #fff8eb; + border-radius: 8px; + padding: 12px 14px; + margin-bottom: 18px; + } + .warning-card h3 { + margin: 0 0 6px; + font-size: 15px; + color: {{ brand_heading }}; + } + .warning-card p { + margin: 0; + font-size: 13px; + color: {{ brand_heading }}; + } + .warning-card ul { + margin: 10px 0 0 18px; + padding: 0; + } + .warning-card li { + margin: 3px 0; + color: {{ brand_heading }}; + } + .warning-card code { + background: rgba(255,255,255,0.65); + border: 1px solid rgba(0,0,0,0.06); + border-radius: 4px; + padding: 0 4px; + } .dl-row { display: flex; gap: 20px; font-size: 13px; margin-bottom: 8px; flex-wrap: wrap; } .dl-row dt { font-weight: 600; color: {{ brand_heading }}; } @@ -230,6 +261,8 @@
Summary of pipeline execution and infrastructure settings.
' + + 'A CUR file was supplied, but only ' + coveragePct + ' of included tasks matched CUR cost rows. ' + + 'Unmatched tasks stay out of the cost totals, so treat the dollar values as incomplete.' + + '
' + + '