-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathbenchmark_runner.py
More file actions
175 lines (152 loc) · 6.96 KB
/
benchmark_runner.py
File metadata and controls
175 lines (152 loc) · 6.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import statistics
import time
from dataclasses import dataclass
from typing import Callable, Dict, List, Optional
from hw_monitor import HardwareSampler
from models import ModelSummary, RunResult
from ollama_client import OllamaClient, OllamaClientError
@dataclass
class BenchmarkConfig:
prompt: str
runs_per_model: int
seed: int
num_predict: int
temperature: float = 0.0
top_p: float = 1.0
collect_hw: bool = False
hw_interval_s: float = 0.5
warmup: bool = True
def _ns_to_s(value: Optional[int]) -> float:
if value is None:
return 0.0
try:
return float(value) / 1_000_000_000.0
except (TypeError, ValueError):
return 0.0
def _tokens_per_second(eval_count: int, eval_duration_ns: int) -> float:
if not eval_duration_ns or eval_duration_ns <= 0:
return 0.0
return float(eval_count) * 1_000_000_000.0 / float(eval_duration_ns)
class BenchmarkRunner:
def __init__(self, client: OllamaClient):
self.client = client
def run(
self,
models: List[str],
config: BenchmarkConfig,
on_progress: Optional[Callable[[str], None]] = None,
on_hw_sample: Optional[Callable[[Dict[str, Optional[float]]], None]] = None,
should_cancel: Optional[Callable[[], bool]] = None,
) -> Dict[str, List]:
should_cancel = should_cancel or (lambda: False)
all_runs: List[RunResult] = []
summaries: List[ModelSummary] = []
total_jobs = len(models) * config.runs_per_model
current_job = 0
for model in models:
if should_cancel():
break
if config.warmup:
if on_progress:
on_progress(f"Warmup model {model}...")
try:
self.client.generate(
model=model,
prompt="",
seed=config.seed,
num_predict=1,
temperature=config.temperature,
top_p=config.top_p,
)
except OllamaClientError:
# Warmup errors are not fatal; actual runs still try.
pass
model_runs: List[RunResult] = []
for run_idx in range(1, config.runs_per_model + 1):
if should_cancel():
break
current_job += 1
if on_progress:
on_progress(f"[{current_job}/{total_jobs}] {model} run {run_idx}/{config.runs_per_model}")
sampler = None
hw_data: Dict[str, Optional[float]] = {}
if config.collect_hw:
sampler = HardwareSampler(interval_s=config.hw_interval_s, on_sample=on_hw_sample)
sampler.start()
started = time.time()
try:
response = self.client.generate(
model=model,
prompt=config.prompt,
seed=config.seed,
num_predict=config.num_predict,
temperature=config.temperature,
top_p=config.top_p,
)
error = None
except OllamaClientError as exc:
response = {}
error = str(exc)
finally:
if sampler is not None:
hw_data = sampler.stop()
elapsed = time.time() - started
total_ns = int(response.get("total_duration", 0) or 0)
load_ns = int(response.get("load_duration", 0) or 0)
prompt_ns = int(response.get("prompt_eval_duration", 0) or 0)
eval_ns = int(response.get("eval_duration", 0) or 0)
prompt_eval_count = int(response.get("prompt_eval_count", 0) or 0)
eval_count = int(response.get("eval_count", 0) or 0)
run = RunResult(
model=model,
run_index=run_idx,
total_duration_s=_ns_to_s(total_ns) if total_ns > 0 else elapsed,
load_duration_s=_ns_to_s(load_ns),
prompt_eval_duration_s=_ns_to_s(prompt_ns),
eval_duration_s=_ns_to_s(eval_ns),
prompt_eval_count=prompt_eval_count,
eval_count=eval_count,
tokens_per_second=_tokens_per_second(eval_count, eval_ns),
response_preview=(response.get("response", "") or "")[:120].replace("\n", " "),
error=error,
avg_cpu_percent=hw_data.get("avg_cpu_percent"),
peak_ram_mb=hw_data.get("peak_ram_mb"),
avg_ram_mb=hw_data.get("avg_ram_mb"),
peak_gpu_percent=hw_data.get("peak_gpu_percent"),
avg_gpu_percent=hw_data.get("avg_gpu_percent"),
peak_gpu_mem_mb=hw_data.get("peak_gpu_mem_mb"),
)
model_runs.append(run)
all_runs.append(run)
summaries.append(self._summarize(model, model_runs))
summaries.sort(key=lambda s: s.avg_tokens_per_second, reverse=True)
return {
"runs": all_runs,
"summaries": summaries,
}
def _summarize(self, model: str, runs: List[RunResult]) -> ModelSummary:
ok_runs = [r for r in runs if r.error is None]
tps = [r.tokens_per_second for r in ok_runs]
totals = [r.total_duration_s for r in ok_runs]
loads = [r.load_duration_s for r in ok_runs]
avg_cpu = [r.avg_cpu_percent for r in ok_runs if r.avg_cpu_percent is not None]
peak_ram = [r.peak_ram_mb for r in ok_runs if r.peak_ram_mb is not None]
avg_ram = [r.avg_ram_mb for r in ok_runs if r.avg_ram_mb is not None]
peak_gpu = [r.peak_gpu_percent for r in ok_runs if r.peak_gpu_percent is not None]
avg_gpu = [r.avg_gpu_percent for r in ok_runs if r.avg_gpu_percent is not None]
peak_gpu_mem = [r.peak_gpu_mem_mb for r in ok_runs if r.peak_gpu_mem_mb is not None]
return ModelSummary(
model=model,
runs_ok=len(ok_runs),
runs_failed=len(runs) - len(ok_runs),
avg_tokens_per_second=statistics.mean(tps) if tps else 0.0,
std_tokens_per_second=statistics.stdev(tps) if len(tps) > 1 else 0.0,
avg_total_duration_s=statistics.mean(totals) if totals else 0.0,
avg_load_duration_s=statistics.mean(loads) if loads else 0.0,
avg_cpu_percent=statistics.mean(avg_cpu) if avg_cpu else None,
peak_ram_mb=max(peak_ram) if peak_ram else None,
avg_ram_mb=statistics.mean(avg_ram) if avg_ram else None,
peak_gpu_percent=max(peak_gpu) if peak_gpu else None,
avg_gpu_percent=statistics.mean(avg_gpu) if avg_gpu else None,
peak_gpu_mem_mb=max(peak_gpu_mem) if peak_gpu_mem else None,
)