-
Notifications
You must be signed in to change notification settings - Fork 28
Expand file tree
/
Copy pathbackend.py
More file actions
232 lines (208 loc) · 7.68 KB
/
backend.py
File metadata and controls
232 lines (208 loc) · 7.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
import asyncio
import copy
from datetime import datetime
from types import SimpleNamespace
from typing import Optional
from libkernelbot.consts import GPU, GPU_TO_SM, SubmissionMode, get_gpu_by_name
from libkernelbot.launchers import Launcher
from libkernelbot.leaderboard_db import LeaderboardDB
from libkernelbot.report import (
MultiProgressReporter,
RunProgressReporter,
generate_report,
make_short_report,
)
from libkernelbot.run_eval import FullResult
from libkernelbot.submission import ProcessedSubmissionRequest, compute_score
from libkernelbot.task import LeaderboardTask, build_task_config
from libkernelbot.utils import setup_logging
logger = setup_logging(__name__)
class KernelBackend:
def __init__(
self,
env: SimpleNamespace,
debug_mode=False,
):
self.debug_mode = debug_mode
self.db = LeaderboardDB(
env.POSTGRES_HOST,
env.POSTGRES_DATABASE,
env.POSTGRES_USER,
env.POSTGRES_PASSWORD,
env.POSTGRES_PORT,
url=env.DATABASE_URL,
ssl_mode="require" if not env.DISABLE_SSL else "disable",
)
try:
if not self.db.connect():
logger.error("Could not connect to database, shutting down")
exit(1)
finally:
self.db.disconnect()
self.accepts_jobs = True
self.launcher_map = {}
def register_launcher(self, launcher: Launcher):
for gpu in launcher.gpus:
self.launcher_map[gpu.value] = launcher
async def submit_full(
self, req: ProcessedSubmissionRequest, mode: SubmissionMode, reporter: MultiProgressReporter,
pre_sub_id: Optional[int] = None
):
"""
pre_sub_id is used to pass the submission id which is created beforehand.
"""
if pre_sub_id is not None:
sub_id = pre_sub_id
else:
with self.db as db:
sub_id = db.create_submission(
leaderboard=req.leaderboard,
file_name=req.file_name,
code=req.code,
user_id=req.user_id,
time=datetime.now(),
user_name=req.user_name,
)
selected_gpus = [get_gpu_by_name(gpu) for gpu in req.gpus]
try:
tasks = [
self.submit_leaderboard(
sub_id,
req.code,
req.file_name,
gpu,
reporter.add_run(f"{gpu.name} on {gpu.runner}"),
req.task,
mode,
None,
)
for gpu in selected_gpus
]
if mode == SubmissionMode.LEADERBOARD:
tasks += [
self.submit_leaderboard(
sub_id,
req.code,
req.file_name,
gpu,
reporter.add_run(f"{gpu.name} on {gpu.runner} (secret)"),
req.task,
SubmissionMode.PRIVATE,
req.secret_seed,
)
for gpu in selected_gpus
]
await reporter.show(
f"Submission **{sub_id}**: `{req.file_name}` for `{req.leaderboard}`"
)
results = await asyncio.gather(*tasks)
finally:
with self.db as db:
db.mark_submission_done(sub_id)
return sub_id, results
async def submit_leaderboard( # noqa: C901
self,
submission_id: int,
code: str,
name: str,
gpu_type: GPU,
reporter: RunProgressReporter,
task: LeaderboardTask,
mode: SubmissionMode,
seed: Optional[int],
) -> Optional[FullResult]:
"""
Function invoked by `leaderboard_cog` to handle a leaderboard run.
"""
if seed is not None:
# careful, we've got a reference here
# that is shared with the other run
# invocations.
task = copy.copy(task)
task.seed = seed
result = await self.handle_submission(
gpu_type,
reporter,
code=code,
name=name,
task=task,
mode=mode,
submission_id=submission_id,
)
if result.success:
score = None
if (
"leaderboard" in result.runs
and result.runs["leaderboard"].run.success
and result.runs["leaderboard"].run.passed
):
score = compute_score(result, task, submission_id)
# verifyruns uses a fake submission id of -1
if submission_id != -1:
with self.db as db:
for key, value in result.runs.items():
db.create_submission_run(
submission=submission_id,
start=value.start,
end=value.end,
mode=key,
runner=gpu_type.name,
score=None if key != "leaderboard" else score,
secret=mode == SubmissionMode.PRIVATE,
compilation=value.compilation,
result=value.run,
system=result.system,
)
return result
async def handle_submission(
self,
gpu_type: GPU,
reporter: RunProgressReporter,
code: str,
name: str,
task: Optional[LeaderboardTask],
mode: SubmissionMode,
submission_id: int = -1,
) -> Optional[FullResult]:
"""
Generic function to handle code submissions.
Args:
gpu_type: Which GPU to run on.
code: Submitted code
name: File name of the submission; used to infer code's language
task: Task specification, of provided
submission_id: ID of the submission, only used for display purposes
Returns:
if successful, returns the result of the run.
"""
launcher = self.launcher_map[gpu_type.value]
config = build_task_config(
task=task, submission_content=code, arch=self._get_arch(gpu_type), mode=mode
)
logger.info("submitting task to runner %s", launcher.name)
result = await launcher.run_submission(config, gpu_type, reporter)
if not result.success:
await reporter.update_title(reporter.title + " ❌ failure")
await reporter.push(result.error)
return result
else:
await reporter.update_title(reporter.title + " ✅ success")
short_report = make_short_report(
result.runs, full=mode in [SubmissionMode.PRIVATE, SubmissionMode.LEADERBOARD]
)
await reporter.push(short_report)
if mode != SubmissionMode.PRIVATE:
try:
# does the last message of the short report start with ✅ or ❌?
verdict = short_report[-1][0]
id_str = f"{verdict}" if submission_id == -1 else f"{verdict} #{submission_id}"
await reporter.display_report(
f"{id_str} {name} on {gpu_type.name} ({launcher.name})",
generate_report(result),
)
except Exception as E:
logger.error("Error generating report. Result: %s", result, exc_info=E)
raise
return result
def _get_arch(self, gpu_type: GPU):
return GPU_TO_SM[gpu_type.name]