Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,26 @@ def _cost_or_task(cost_row: dict[str, Any] | None, key: str, default: float = 0.
return float(value)


def _summarize_missing_processes(
missing_process_counts: dict[str, int], preview_limit: int = 3
) -> tuple[list[dict[str, Any]], str]:
ordered = sorted(missing_process_counts.items(), key=lambda item: (-item[1], item[0]))
preview = [
{"process_short": process_short, "missing_tasks": missing_tasks}
for process_short, missing_tasks in ordered[:preview_limit]
]
parts = [
f"{item['process_short']} ({item['missing_tasks']})"
if item["missing_tasks"] != 1
else item["process_short"]
for item in preview
]
hidden_count = max(len(ordered) - preview_limit, 0)
if hidden_count:
parts.append(f"+{hidden_count} more")
return preview, ", ".join(parts)


def _is_highlight_process(process: str) -> bool:
process_lc = process.lower()
return any(keyword in process_lc for keyword in _HIGHLIGHT_KEYWORDS)
Expand Down Expand Up @@ -240,9 +260,11 @@ def build_report_data(jsonl_dir: Path, include_failed_runs: bool = False) -> dic
"unused_cost": 0.0,
}

costs_jsonl_path = jsonl_dir / "costs.jsonl"
cur_supplied = costs_jsonl_path.exists()
costs_index: dict[tuple[str, str, str], dict[str, Any]] = {}
has_cost_rows = False
for c in _iter_jsonl(jsonl_dir / "costs.jsonl"):
for c in _iter_jsonl(costs_jsonl_path):
has_cost_rows = True
run_id = str(c.get("run_id", ""))
process = str(c.get("process", ""))
Expand Down Expand Up @@ -289,6 +311,10 @@ def build_report_data(jsonl_dir: Path, include_failed_runs: bool = False) -> dic
task_run_acc: dict[str, dict[str, float]] = defaultdict(
lambda: {"requested_cpu_h": 0.0, "requested_mem_gib_h": 0.0, "real_cpu_h": 0.0, "real_mem_gib_h": 0.0}
)
cost_coverage_runs: dict[tuple[str, str], dict[str, Any]] = {}
total_cost_tasks = 0
matched_cost_tasks = 0
missing_cost_tasks = 0

for t in _iter_jsonl(jsonl_dir / "tasks.jsonl"):
run_id = str(t.get("run_id", ""))
Expand All @@ -312,6 +338,29 @@ def build_report_data(jsonl_dir: Path, include_failed_runs: bool = False) -> dic

cost_row = _lookup_cost(costs_index, run_id=run_id, process=process, process_short=process_short, hash_short=hash_short)

if cur_supplied:
total_cost_tasks += 1
coverage = cost_coverage_runs.setdefault(
run_group_key,
{
"run_id": run_id,
"group": group,
"total_tasks": 0,
"matched_tasks": 0,
"missing_tasks": 0,
"missing_process_counts": defaultdict(int),
},
)
coverage["total_tasks"] += 1
if cost_row:
matched_cost_tasks += 1
coverage["matched_tasks"] += 1
else:
missing_cost_tasks += 1
coverage["missing_tasks"] += 1
missing_process = process_short or process or "unknown"
coverage["missing_process_counts"][missing_process] += 1

if cost_row:
run_cost_acc[run_group_key]["cost"] += _cost_or_task(cost_row, "cost")
run_cost_acc[run_group_key]["used_cost"] += _cost_or_task(cost_row, "used_cost")
Expand Down Expand Up @@ -527,6 +576,34 @@ def build_report_data(jsonl_dir: Path, include_failed_runs: bool = False) -> dic
]
cost_overview.sort(key=lambda x: float(x.get("total_cost") or 0), reverse=True)

runs_with_missing_costs = []
for row in cost_coverage_runs.values():
if int(row["missing_tasks"]) <= 0:
continue
missing_processes, missing_process_summary = _summarize_missing_processes(row["missing_process_counts"])
runs_with_missing_costs.append(
{
"run_id": row["run_id"],
"group": row["group"],
"total_tasks": int(row["total_tasks"]),
"matched_tasks": int(row["matched_tasks"]),
"missing_tasks": int(row["missing_tasks"]),
"missing_processes": missing_processes,
"missing_process_summary": missing_process_summary,
}
)
runs_with_missing_costs.sort(key=lambda row: (-row["missing_tasks"], str(row["group"]), str(row["run_id"])))

cost_coverage = {
"cur_supplied": cur_supplied,
"has_any_cost_rows": has_cost_rows,
"total_included_tasks": total_cost_tasks,
"matched_task_count": matched_cost_tasks,
"missing_task_count": missing_cost_tasks,
"coverage_pct": _round((matched_cost_tasks / total_cost_tasks) * 100.0, 1) if total_cost_tasks else None,
"runs_with_missing_costs": runs_with_missing_costs,
}

combined_task_runtime = []
for (pipeline, group), panel_acc in sorted(combined_runtime_acc.items(), key=lambda x: (x[0][0], x[0][1])):
process_runtime_ms = panel_acc["process_runtime_ms"]
Expand Down Expand Up @@ -596,6 +673,7 @@ def build_report_data(jsonl_dir: Path, include_failed_runs: bool = False) -> dic
"task_table": task_table,
"task_scatter": task_scatter,
"cost_overview": cost_overview,
"cost_coverage": cost_coverage,
}


Expand Down
111 changes: 111 additions & 0 deletions modules/local/aggregate_benchmark_report_data/tests/test_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def test_build_report_data_has_all_sections(tmp_path, make_run, flat_task, write
"task_table",
"task_scatter",
"cost_overview",
"cost_coverage",
}


Expand All @@ -40,6 +41,15 @@ def test_run_costs_without_cur_are_zero(tmp_path, make_run, flat_task, write_run
data = build_report_data(jsonl_dir)
assert data["run_costs"][0]["cost"] == 0.0
assert data["run_costs"][0]["used_cost"] is None
assert data["cost_coverage"] == {
"cur_supplied": False,
"has_any_cost_rows": False,
"total_included_tasks": 0,
"matched_task_count": 0,
"missing_task_count": 0,
"coverage_pct": None,
"runs_with_missing_costs": [],
}


def test_cur_zero_costs_do_not_fall_back_to_task_cost(tmp_path):
Expand Down Expand Up @@ -101,6 +111,107 @@ def test_cur_zero_costs_do_not_fall_back_to_task_cost(tmp_path):
assert data["run_costs"][0]["unused_cost"] == 0.0
assert data["cost_overview"][0]["total_cost"] == 0.0
assert data["cost_overview"][0]["used_cost"] == 0.0
assert data["cost_coverage"]["cur_supplied"] is True
assert data["cost_coverage"]["coverage_pct"] == 100.0
assert data["cost_coverage"]["missing_task_count"] == 0


def test_partial_cur_coverage_is_reported_per_run_and_process(tmp_path):
jsonl_dir = tmp_path / "jsonl_bundle"
jsonl_dir.mkdir(parents=True)

runs = [
{
"run_id": "run1",
"group": "cpu",
"pipeline": "pipe",
"username": "u",
"pipeline_version": "main",
"nextflow_version": "24.10.0",
"platform_version": "x",
"succeeded": 2,
"failed": 0,
"cached": 0,
"executor": "awsbatch",
"region": "us-east-1",
"fusion_enabled": False,
"wave_enabled": False,
"container_engine": "docker",
"duration_ms": 10,
"cpu_time_ms": 1000,
"cpu_efficiency": 50.0,
"memory_efficiency": 50.0,
"read_bytes": 0,
"write_bytes": 0,
}
]
tasks = [
{
"run_id": "run1",
"group": "cpu",
"hash": "ab/cdef12",
"process": "foo:PROC_A",
"process_short": "PROC_A",
"name": "PROC_A",
"status": "COMPLETED",
"staging_ms": 0,
"realtime_ms": 1000,
"duration_ms": 1000,
"cost": None,
},
{
"run_id": "run1",
"group": "cpu",
"hash": "ab/cdef13",
"process": "foo:PROC_B",
"process_short": "PROC_B",
"name": "PROC_B",
"status": "COMPLETED",
"staging_ms": 0,
"realtime_ms": 1000,
"duration_ms": 1000,
"cost": None,
},
{
"run_id": "run1",
"group": "cpu",
"hash": "ab/cdef14",
"process": "foo:PROC_B",
"process_short": "PROC_B",
"name": "PROC_B_retry",
"status": "CACHED",
"staging_ms": 0,
"realtime_ms": 1000,
"duration_ms": 1000,
"cost": None,
},
]
costs = [
{"run_id": "run1", "process": "foo:PROC_A", "hash": "abcdef12", "cost": 5.0, "used_cost": 4.0, "unused_cost": 1.0}
]

(jsonl_dir / "runs.jsonl").write_text("".join(json.dumps(r) + "\n" for r in runs))
(jsonl_dir / "tasks.jsonl").write_text("".join(json.dumps(t) + "\n" for t in tasks))
(jsonl_dir / "costs.jsonl").write_text("".join(json.dumps(c) + "\n" for c in costs))

data = build_report_data(jsonl_dir)

assert data["run_costs"][0]["cost"] == 5.0
assert data["cost_coverage"]["cur_supplied"] is True
assert data["cost_coverage"]["has_any_cost_rows"] is True
assert data["cost_coverage"]["total_included_tasks"] == 3
assert data["cost_coverage"]["matched_task_count"] == 1
assert data["cost_coverage"]["missing_task_count"] == 2
assert data["cost_coverage"]["coverage_pct"] == 33.3

run_warning = data["cost_coverage"]["runs_with_missing_costs"][0]
assert run_warning["run_id"] == "run1"
assert run_warning["group"] == "cpu"
assert run_warning["total_tasks"] == 3
assert run_warning["matched_tasks"] == 1
assert run_warning["missing_tasks"] == 2
assert run_warning["missing_process_summary"] == "PROC_B (2)"
assert run_warning["missing_processes"] == [{"process_short": "PROC_B", "missing_tasks": 2}]


def test_task_table_includes_cached(tmp_path, make_run, flat_task, write_run_json):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,37 @@
.callout-header:hover { background: {{ brand_accent_surface }}; }
.callout-body { padding: 12px; display: none; }
.callout-body.show { display: block; }
.warning-card {
border: 1px solid #f4b548;
background: #fff8eb;
border-radius: 8px;
padding: 12px 14px;
margin-bottom: 18px;
}
.warning-card h3 {
margin: 0 0 6px;
font-size: 15px;
color: {{ brand_heading }};
}
.warning-card p {
margin: 0;
font-size: 13px;
color: {{ brand_heading }};
}
.warning-card ul {
margin: 10px 0 0 18px;
padding: 0;
}
.warning-card li {
margin: 3px 0;
color: {{ brand_heading }};
}
.warning-card code {
background: rgba(255,255,255,0.65);
border: 1px solid rgba(0,0,0,0.06);
border-radius: 4px;
padding: 0 4px;
}

.dl-row { display: flex; gap: 20px; font-size: 13px; margin-bottom: 8px; flex-wrap: wrap; }
.dl-row dt { font-weight: 600; color: {{ brand_heading }}; }
Expand Down Expand Up @@ -230,6 +261,8 @@ <h1><svg class="h-icon"><use href="#ic-run"/></svg> Run overview</h1>
task, and cost sections only use included runs.
</p>

<div id="cost-coverage-warning"></div>

<h2 id="run-summary"><svg class="h-icon sm"><use href="#ic-table"/></svg> Run summary</h2>
<p class="section-desc">Summary of pipeline execution and infrastructure settings.</p>
<div class="callout">
Expand Down Expand Up @@ -590,6 +623,37 @@ <h2 id="pg-mem-savings"><svg class="h-icon sm"><use href="#ic-chart"/></svg> Sav
const groupColor = {};
overviewGroups.forEach((g,i) => { groupColor[g] = COLORS[i % COLORS.length]; });

// 1.1 CUR coverage warning
(function() {
const coverage = DATA.cost_coverage || {};
if (!coverage.cur_supplied || (coverage.missing_task_count || 0) <= 0) return;
const mount = document.getElementById('cost-coverage-warning');
if (!mount) return;

const coveragePct = coverage.coverage_pct != null ? Number(coverage.coverage_pct).toFixed(1) + '%' : '\u2014';
const rows = (coverage.runs_with_missing_costs || []).slice(0, 6).map(row => {
const total = Number(row.total_tasks || 0);
const missing = Number(row.missing_tasks || 0);
const matched = Number(row.matched_tasks || 0);
const pct = total > 0 ? ((matched / total) * 100).toFixed(0) + '%' : '\u2014';
return '<li><strong>' + escapeHtml(row.group || row.run_id || 'run') + '</strong>: ' +
missing + ' missing of ' + total + ' task' + (total === 1 ? '' : 's') +
' (' + pct + ' matched)' +
(row.missing_process_summary ? ' - ' + escapeHtml(row.missing_process_summary) : '') +
'</li>';
}).join('');

mount.innerHTML =
'<div class="warning-card">' +
'<h3>Warning: CUR coverage is incomplete</h3>' +
'<p>' +
'A CUR file was supplied, but only ' + coveragePct + ' of included tasks matched CUR cost rows. ' +
'Unmatched tasks stay out of the cost totals, so treat the dollar values as incomplete.' +
'</p>' +
'<ul>' + rows + '</ul>' +
'</div>';
})();

// 2.1 Run summary table (with cached tasks)
(function() {
const runs = DATA.run_summary || [];
Expand Down
38 changes: 38 additions & 0 deletions modules/local/render_benchmark_report/tests/test_render.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ def test_render_html(tmp_path, minimal_report_data):
assert 'id="chart-pg-cpu-savings"' not in text
assert 'target="_blank" rel="noopener noreferrer"' in text
assert "r.runUrl ? '<a href=\"' + r.runUrl" in text
assert 'id="cost-coverage-warning"' in text
assert "if (!coverage.cur_supplied || (coverage.missing_task_count || 0) <= 0) return;" in text


def test_render_performance_gains_with_vm_data(tmp_path, minimal_report_data):
Expand All @@ -81,6 +83,42 @@ def test_render_performance_gains_with_vm_data(tmp_path, minimal_report_data):
assert "Savings attribution (CPU) by layer" in text


def test_render_cost_coverage_warning(tmp_path, minimal_report_data):
data = dict(minimal_report_data)
data["cost_coverage"] = {
"cur_supplied": True,
"has_any_cost_rows": True,
"total_included_tasks": 5,
"matched_task_count": 3,
"missing_task_count": 2,
"coverage_pct": 60.0,
"runs_with_missing_costs": [
{
"run_id": "run1",
"group": "cpu",
"total_tasks": 5,
"matched_tasks": 3,
"missing_tasks": 2,
"missing_processes": [
{"process_short": "PROC_B", "missing_tasks": 1},
{"process_short": "PROC_C", "missing_tasks": 1},
],
"missing_process_summary": "PROC_B, PROC_C",
}
],
}
out = tmp_path / "report.html"
render_html(data, out)
text = out.read_text()
assert 'id="cost-coverage-warning"' in text
assert "Warning: CUR coverage is incomplete" in text
assert "A CUR file was supplied, but only " in text
assert "Unmatched tasks stay out of the cost totals, so treat the dollar values as incomplete." in text
assert "const coverage = DATA.cost_coverage || {};" in text
assert "missing + ' missing of ' + total + ' task'" in text
assert "PROC_B, PROC_C" in text


def test_render_pr132_style_scheduler_vm_fixture(tmp_path, pr132_scheduler_vm_report_data):
out = tmp_path / "report.html"
render_html(pr132_scheduler_vm_report_data, out)
Expand Down
Loading