Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,37 @@ def _lookup_cost(
return None


def _cost_or_task(cost_row: dict[str, Any] | None, key: str, task_cost: float, default: float = 0.0) -> float:
def _cost_or_task(cost_row: dict[str, Any] | None, key: str, default: float = 0.0) -> float:
if not cost_row:
return task_cost if key in {"cost", "used_cost"} else default
return default

value = cost_row.get(key)
if value is None:
return task_cost if key in {"cost", "used_cost"} else default
return default

return float(value)


def _summarize_missing_processes(
missing_process_counts: dict[str, int], preview_limit: int = 3
) -> tuple[list[dict[str, Any]], str]:
ordered = sorted(missing_process_counts.items(), key=lambda item: (-item[1], item[0]))
preview = [
{"process_short": process_short, "missing_tasks": missing_tasks}
for process_short, missing_tasks in ordered[:preview_limit]
]
parts = [
f"{item['process_short']} ({item['missing_tasks']})"
if item["missing_tasks"] != 1
else item["process_short"]
for item in preview
]
hidden_count = max(len(ordered) - preview_limit, 0)
if hidden_count:
parts.append(f"+{hidden_count} more")
return preview, ", ".join(parts)


def _is_highlight_process(process: str) -> bool:
process_lc = process.lower()
return any(keyword in process_lc for keyword in _HIGHLIGHT_KEYWORDS)
Expand Down Expand Up @@ -240,9 +260,11 @@ def build_report_data(jsonl_dir: Path, include_failed_runs: bool = False) -> dic
"unused_cost": 0.0,
}

costs_jsonl_path = jsonl_dir / "costs.jsonl"
cur_supplied = costs_jsonl_path.exists()
costs_index: dict[tuple[str, str, str], dict[str, Any]] = {}
has_cost_rows = False
for c in _iter_jsonl(jsonl_dir / "costs.jsonl"):
for c in _iter_jsonl(costs_jsonl_path):
has_cost_rows = True
run_id = str(c.get("run_id", ""))
process = str(c.get("process", ""))
Expand Down Expand Up @@ -289,6 +311,10 @@ def build_report_data(jsonl_dir: Path, include_failed_runs: bool = False) -> dic
task_run_acc: dict[str, dict[str, float]] = defaultdict(
lambda: {"requested_cpu_h": 0.0, "requested_mem_gib_h": 0.0, "real_cpu_h": 0.0, "real_mem_gib_h": 0.0}
)
cost_coverage_runs: dict[tuple[str, str], dict[str, Any]] = {}
total_cost_tasks = 0
matched_cost_tasks = 0
missing_cost_tasks = 0

for t in _iter_jsonl(jsonl_dir / "tasks.jsonl"):
run_id = str(t.get("run_id", ""))
Expand All @@ -311,21 +337,40 @@ def build_report_data(jsonl_dir: Path, include_failed_runs: bool = False) -> dic
}

cost_row = _lookup_cost(costs_index, run_id=run_id, process=process, process_short=process_short, hash_short=hash_short)
task_cost = float(t.get("cost") or 0.0)

if cur_supplied:
total_cost_tasks += 1
coverage = cost_coverage_runs.setdefault(
run_group_key,
{
"run_id": run_id,
"group": group,
"total_tasks": 0,
"matched_tasks": 0,
"missing_tasks": 0,
"missing_process_counts": defaultdict(int),
},
)
coverage["total_tasks"] += 1
if cost_row:
matched_cost_tasks += 1
coverage["matched_tasks"] += 1
else:
missing_cost_tasks += 1
coverage["missing_tasks"] += 1
missing_process = process_short or process or "unknown"
coverage["missing_process_counts"][missing_process] += 1

if cost_row:
run_cost_acc[run_group_key]["cost"] += _cost_or_task(cost_row, "cost", task_cost)
run_cost_acc[run_group_key]["used_cost"] += _cost_or_task(cost_row, "used_cost", task_cost)
run_cost_acc[run_group_key]["unused_cost"] += _cost_or_task(cost_row, "unused_cost", task_cost, default=0.0)
else:
run_cost_acc[run_group_key]["cost"] += task_cost
run_cost_acc[run_group_key]["used_cost"] += task_cost
run_cost_acc[run_group_key]["cost"] += _cost_or_task(cost_row, "cost")
run_cost_acc[run_group_key]["used_cost"] += _cost_or_task(cost_row, "used_cost")
run_cost_acc[run_group_key]["unused_cost"] += _cost_or_task(cost_row, "unused_cost")

if has_cost_rows:
overview_key = (group, process_short)
cost_group_acc[overview_key]["total_cost"] += _cost_or_task(cost_row, "cost", task_cost)
cost_group_acc[overview_key]["used_cost"] += _cost_or_task(cost_row, "used_cost", task_cost)
cost_group_acc[overview_key]["unused_cost"] += _cost_or_task(cost_row, "unused_cost", task_cost, default=0.0)
cost_group_acc[overview_key]["total_cost"] += _cost_or_task(cost_row, "cost")
cost_group_acc[overview_key]["used_cost"] += _cost_or_task(cost_row, "used_cost")
cost_group_acc[overview_key]["unused_cost"] += _cost_or_task(cost_row, "unused_cost")
cost_group_acc[overview_key]["n_tasks"] += 1

status = t.get("status")
Expand Down Expand Up @@ -531,6 +576,34 @@ def build_report_data(jsonl_dir: Path, include_failed_runs: bool = False) -> dic
]
cost_overview.sort(key=lambda x: float(x.get("total_cost") or 0), reverse=True)

runs_with_missing_costs = []
for row in cost_coverage_runs.values():
if int(row["missing_tasks"]) <= 0:
continue
missing_processes, missing_process_summary = _summarize_missing_processes(row["missing_process_counts"])
runs_with_missing_costs.append(
{
"run_id": row["run_id"],
"group": row["group"],
"total_tasks": int(row["total_tasks"]),
"matched_tasks": int(row["matched_tasks"]),
"missing_tasks": int(row["missing_tasks"]),
"missing_processes": missing_processes,
"missing_process_summary": missing_process_summary,
}
)
runs_with_missing_costs.sort(key=lambda row: (-row["missing_tasks"], str(row["group"]), str(row["run_id"])))

cost_coverage = {
"cur_supplied": cur_supplied,
"has_any_cost_rows": has_cost_rows,
"total_included_tasks": total_cost_tasks,
"matched_task_count": matched_cost_tasks,
"missing_task_count": missing_cost_tasks,
"coverage_pct": _round((matched_cost_tasks / total_cost_tasks) * 100.0, 1) if total_cost_tasks else None,
"runs_with_missing_costs": runs_with_missing_costs,
}

combined_task_runtime = []
for (pipeline, group), panel_acc in sorted(combined_runtime_acc.items(), key=lambda x: (x[0][0], x[0][1])):
process_runtime_ms = panel_acc["process_runtime_ms"]
Expand Down Expand Up @@ -600,6 +673,7 @@ def build_report_data(jsonl_dir: Path, include_failed_runs: bool = False) -> dic
"task_table": task_table,
"task_scatter": task_scatter,
"cost_overview": cost_overview,
"cost_coverage": cost_coverage,
}


Expand Down
115 changes: 113 additions & 2 deletions modules/local/aggregate_benchmark_report_data/tests/test_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,28 @@ def test_build_report_data_has_all_sections(tmp_path, make_run, flat_task, write
"task_table",
"task_scatter",
"cost_overview",
"cost_coverage",
}


def test_run_costs_without_cur_uses_task_cost(tmp_path, make_run, flat_task, write_run_json):
def test_run_costs_without_cur_are_zero(tmp_path, make_run, flat_task, write_run_json):
data_dir = tmp_path / "data"
jsonl_dir = tmp_path / "jsonl_bundle"
write_run_json(data_dir, [make_run(tasks=[flat_task(cost=4.2)])])
normalize_jsonl(data_dir, jsonl_dir)

data = build_report_data(jsonl_dir)
assert data["run_costs"][0]["cost"] == 4.2
assert data["run_costs"][0]["cost"] == 0.0
assert data["run_costs"][0]["used_cost"] is None
assert data["cost_coverage"] == {
"cur_supplied": False,
"has_any_cost_rows": False,
"total_included_tasks": 0,
"matched_task_count": 0,
"missing_task_count": 0,
"coverage_pct": None,
"runs_with_missing_costs": [],
}


def test_cur_zero_costs_do_not_fall_back_to_task_cost(tmp_path):
Expand Down Expand Up @@ -101,6 +111,107 @@ def test_cur_zero_costs_do_not_fall_back_to_task_cost(tmp_path):
assert data["run_costs"][0]["unused_cost"] == 0.0
assert data["cost_overview"][0]["total_cost"] == 0.0
assert data["cost_overview"][0]["used_cost"] == 0.0
assert data["cost_coverage"]["cur_supplied"] is True
assert data["cost_coverage"]["coverage_pct"] == 100.0
assert data["cost_coverage"]["missing_task_count"] == 0


def test_partial_cur_coverage_is_reported_per_run_and_process(tmp_path):
jsonl_dir = tmp_path / "jsonl_bundle"
jsonl_dir.mkdir(parents=True)

runs = [
{
"run_id": "run1",
"group": "cpu",
"pipeline": "pipe",
"username": "u",
"pipeline_version": "main",
"nextflow_version": "24.10.0",
"platform_version": "x",
"succeeded": 2,
"failed": 0,
"cached": 0,
"executor": "awsbatch",
"region": "us-east-1",
"fusion_enabled": False,
"wave_enabled": False,
"container_engine": "docker",
"duration_ms": 10,
"cpu_time_ms": 1000,
"cpu_efficiency": 50.0,
"memory_efficiency": 50.0,
"read_bytes": 0,
"write_bytes": 0,
}
]
tasks = [
{
"run_id": "run1",
"group": "cpu",
"hash": "ab/cdef12",
"process": "foo:PROC_A",
"process_short": "PROC_A",
"name": "PROC_A",
"status": "COMPLETED",
"staging_ms": 0,
"realtime_ms": 1000,
"duration_ms": 1000,
"cost": None,
},
{
"run_id": "run1",
"group": "cpu",
"hash": "ab/cdef13",
"process": "foo:PROC_B",
"process_short": "PROC_B",
"name": "PROC_B",
"status": "COMPLETED",
"staging_ms": 0,
"realtime_ms": 1000,
"duration_ms": 1000,
"cost": None,
},
{
"run_id": "run1",
"group": "cpu",
"hash": "ab/cdef14",
"process": "foo:PROC_B",
"process_short": "PROC_B",
"name": "PROC_B_retry",
"status": "CACHED",
"staging_ms": 0,
"realtime_ms": 1000,
"duration_ms": 1000,
"cost": None,
},
]
costs = [
{"run_id": "run1", "process": "foo:PROC_A", "hash": "abcdef12", "cost": 5.0, "used_cost": 4.0, "unused_cost": 1.0}
]

(jsonl_dir / "runs.jsonl").write_text("".join(json.dumps(r) + "\n" for r in runs))
(jsonl_dir / "tasks.jsonl").write_text("".join(json.dumps(t) + "\n" for t in tasks))
(jsonl_dir / "costs.jsonl").write_text("".join(json.dumps(c) + "\n" for c in costs))

data = build_report_data(jsonl_dir)

assert data["run_costs"][0]["cost"] == 5.0
assert data["cost_coverage"]["cur_supplied"] is True
assert data["cost_coverage"]["has_any_cost_rows"] is True
assert data["cost_coverage"]["total_included_tasks"] == 3
assert data["cost_coverage"]["matched_task_count"] == 1
assert data["cost_coverage"]["missing_task_count"] == 2
assert data["cost_coverage"]["coverage_pct"] == 33.3

run_warning = data["cost_coverage"]["runs_with_missing_costs"][0]
assert run_warning["run_id"] == "run1"
assert run_warning["group"] == "cpu"
assert run_warning["total_tasks"] == 3
assert run_warning["matched_tasks"] == 1
assert run_warning["missing_tasks"] == 2
assert run_warning["missing_process_summary"] == "PROC_B (2)"
assert run_warning["missing_processes"] == [{"process_short": "PROC_B", "missing_tasks": 2}]


def test_task_table_includes_cached(tmp_path, make_run, flat_task, write_run_json):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def extract_tasks(runs: list[dict[str, Any]]) -> list[dict[str, Any]]:
"peak_rss": task.get("peakRss", 0),
"read_bytes": task.get("readBytes", 0),
"write_bytes": task.get("writeBytes", 0),
"cost": task.get("cost"),
"cost": None,
"executor": task.get("executor", ""),
"machine_type": task.get("machineType", ""),
"cloud_zone": task.get("cloudZone", ""),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def test_cached_count_extracted(make_run, flat_task):
def test_nested_tasks_unwrapped(make_run, nested_task):
run = make_run(tasks=[nested_task(cost=2.0), nested_task(cost=3.0)])
rows = extract_tasks([run])
assert sum(r["cost"] for r in rows) == 5.0
assert all(r["cost"] is None for r in rows)


def test_failed_tasks_filtered(make_run, flat_task):
Expand Down
Loading
Loading