Skip to content

Commit f33ebea

Browse files
authored
Merge pull request #2207 from Jamesbarford/feat/split-jobs-and-enqueue
Feat; Split request job into benchmark jobs
2 parents e31c217 + c32b63a commit f33ebea

File tree

9 files changed

+346
-22
lines changed

9 files changed

+346
-22
lines changed

collector/src/bin/collector.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1042,7 +1042,12 @@ fn main_result() -> anyhow::Result<i32> {
10421042

10431043
let compile_config = CompileBenchmarkConfig {
10441044
benchmarks,
1045-
profiles: Profile::default_profiles(),
1045+
profiles: vec![
1046+
Profile::Check,
1047+
Profile::Debug,
1048+
Profile::Doc,
1049+
Profile::Opt,
1050+
],
10461051
scenarios: Scenario::all(),
10471052
backends,
10481053
iterations: runs.map(|v| v as usize),

collector/src/compile/benchmark/profile.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,6 @@ impl Profile {
3434
]
3535
}
3636

37-
/// Set of default profiles that should be benchmarked for a master/try artifact.
38-
pub fn default_profiles() -> Vec<Self> {
39-
vec![Profile::Check, Profile::Debug, Profile::Doc, Profile::Opt]
40-
}
41-
4237
pub fn is_doc(&self) -> bool {
4338
match self {
4439
Profile::Doc | Profile::DocJson => true,

collector/src/compile/benchmark/target.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,15 @@ impl Default for Target {
1414
Self::X86_64UnknownLinuxGnu
1515
}
1616
}
17+
18+
impl Target {
19+
pub fn all() -> Vec<Self> {
20+
vec![Self::X86_64UnknownLinuxGnu]
21+
}
22+
23+
pub fn from_db_target(target: &database::Target) -> Target {
24+
match target {
25+
database::Target::X86_64UnknownLinuxGnu => Self::X86_64UnknownLinuxGnu,
26+
}
27+
}
28+
}

database/src/lib.rs

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,11 @@ impl Profile {
227227
Profile::Clippy => "clippy",
228228
}
229229
}
230+
231+
/// Set of default profiles that should be benchmarked for a master/try artifact.
232+
pub fn default_profiles() -> Vec<Self> {
233+
vec![Profile::Check, Profile::Debug, Profile::Doc, Profile::Opt]
234+
}
230235
}
231236

232237
impl std::str::FromStr for Profile {
@@ -365,6 +370,10 @@ impl Target {
365370
Target::X86_64UnknownLinuxGnu => "x86_64-unknown-linux-gnu",
366371
}
367372
}
373+
374+
pub fn all() -> Vec<Self> {
375+
vec![Self::X86_64UnknownLinuxGnu]
376+
}
368377
}
369378

370379
impl FromStr for Target {
@@ -988,6 +997,35 @@ impl BenchmarkRequest {
988997
pub fn is_release(&self) -> bool {
989998
matches!(self.commit_type, BenchmarkRequestType::Release { .. })
990999
}
1000+
1001+
/// Get the codegen backends for the request
1002+
pub fn backends(&self) -> anyhow::Result<Vec<CodegenBackend>> {
1003+
// Empty string; default to LLVM.
1004+
if self.backends.trim().is_empty() {
1005+
return Ok(vec![CodegenBackend::Llvm]);
1006+
}
1007+
1008+
self.backends
1009+
.split(',')
1010+
.map(|s| {
1011+
CodegenBackend::from_str(s).map_err(|_| anyhow::anyhow!("Invalid backend: {s}"))
1012+
})
1013+
.collect()
1014+
}
1015+
1016+
/// Get the profiles for the request
1017+
pub fn profiles(&self) -> anyhow::Result<Vec<Profile>> {
1018+
// No profile string; fall back to the library defaults.
1019+
if self.profiles.trim().is_empty() {
1020+
return Ok(Profile::default_profiles());
1021+
}
1022+
1023+
self.profiles
1024+
.split(',')
1025+
.map(Profile::from_str)
1026+
.collect::<Result<Vec<_>, _>>()
1027+
.map_err(|e| anyhow::anyhow!("Invalid backend: {e}"))
1028+
}
9911029
}
9921030

9931031
/// Cached information about benchmark requests in the DB
@@ -1010,3 +1048,64 @@ impl BenchmarkRequestIndex {
10101048
&self.completed
10111049
}
10121050
}
1051+
1052+
#[derive(Debug, Clone, PartialEq)]
1053+
pub enum BenchmarkJobStatus {
1054+
Queued,
1055+
InProgress {
1056+
started_at: DateTime<Utc>,
1057+
},
1058+
Completed {
1059+
started_at: DateTime<Utc>,
1060+
completed_at: DateTime<Utc>,
1061+
success: bool,
1062+
},
1063+
}
1064+
1065+
const BENCHMARK_JOB_STATUS_QUEUED_STR: &str = "queued";
1066+
const BENCHMARK_JOB_STATUS_IN_PROGRESS_STR: &str = "in_progress";
1067+
const BENCHMARK_JOB_STATUS_SUCCESS_STR: &str = "success";
1068+
const BENCHMARK_JOB_STATUS_FAILURE_STR: &str = "failure";
1069+
1070+
impl BenchmarkJobStatus {
1071+
pub fn as_str(&self) -> &str {
1072+
match self {
1073+
BenchmarkJobStatus::Queued => BENCHMARK_JOB_STATUS_QUEUED_STR,
1074+
BenchmarkJobStatus::InProgress { .. } => BENCHMARK_JOB_STATUS_IN_PROGRESS_STR,
1075+
BenchmarkJobStatus::Completed { success, .. } => {
1076+
if *success {
1077+
BENCHMARK_JOB_STATUS_SUCCESS_STR
1078+
} else {
1079+
BENCHMARK_JOB_STATUS_FAILURE_STR
1080+
}
1081+
}
1082+
}
1083+
}
1084+
}
1085+
1086+
impl fmt::Display for BenchmarkJobStatus {
1087+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1088+
write!(f, "{}", self.as_str())
1089+
}
1090+
}
1091+
1092+
#[derive(Debug, Clone, PartialEq)]
1093+
pub struct BenchmarkSet(u32);
1094+
1095+
/// A single unit of work generated from a benchmark request. Split by profiles
1096+
/// and backends
1097+
///
1098+
/// Each request is split into several `BenchmarkJob`s. Collectors poll the
1099+
/// queue and claim a job only when its `benchmark_set` matches one of the sets
1100+
/// they are responsible for.
1101+
#[derive(Debug, Clone, PartialEq)]
1102+
pub struct BenchmarkJob {
1103+
target: Target,
1104+
backend: CodegenBackend,
1105+
profile: Profile,
1106+
request_tag: String,
1107+
benchmark_set: BenchmarkSet,
1108+
created_at: DateTime<Utc>,
1109+
status: BenchmarkJobStatus,
1110+
retry: u32,
1111+
}

database/src/pool.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,16 @@ pub trait Connection: Send + Sync {
210210
sha: &str,
211211
parent_sha: &str,
212212
) -> anyhow::Result<()>;
213+
214+
/// Add a benchmark job to the job queue.
215+
async fn enqueue_benchmark_job(
216+
&self,
217+
request_tag: &str,
218+
target: &Target,
219+
backend: &CodegenBackend,
220+
profile: &Profile,
221+
benchmark_set: u32,
222+
) -> anyhow::Result<()>;
213223
}
214224

215225
#[async_trait::async_trait]
@@ -584,4 +594,35 @@ mod tests {
584594
})
585595
.await;
586596
}
597+
598+
#[tokio::test]
599+
async fn enqueue_benchmark_job() {
600+
run_postgres_test(|ctx| async {
601+
let db = ctx.db_client();
602+
let db = db.connection().await;
603+
let time = chrono::DateTime::from_str("2021-09-01T00:00:00.000Z").unwrap();
604+
let benchmark_request =
605+
BenchmarkRequest::create_master("sha-1", "parent-sha-1", 42, time);
606+
607+
// Insert the request so we don't violate the foreign key
608+
db.insert_benchmark_request(&benchmark_request)
609+
.await
610+
.unwrap();
611+
612+
// Now we can insert the job
613+
let result = db
614+
.enqueue_benchmark_job(
615+
benchmark_request.tag().unwrap(),
616+
&Target::X86_64UnknownLinuxGnu,
617+
&CodegenBackend::Llvm,
618+
&Profile::Opt,
619+
0u32,
620+
)
621+
.await;
622+
assert!(result.is_ok());
623+
624+
Ok(ctx)
625+
})
626+
.await;
627+
}
587628
}

database/src/pool/postgres.rs

Lines changed: 80 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use crate::pool::{Connection, ConnectionManager, ManagedConnection, Transaction};
22
use crate::{
3-
ArtifactCollection, ArtifactId, ArtifactIdNumber, Benchmark, BenchmarkRequest,
4-
BenchmarkRequestIndex, BenchmarkRequestStatus, BenchmarkRequestType, CodegenBackend,
5-
CollectionId, Commit, CommitType, CompileBenchmark, Date, Index, Profile, QueuedCommit,
6-
Scenario, Target, BENCHMARK_REQUEST_MASTER_STR, BENCHMARK_REQUEST_RELEASE_STR,
3+
ArtifactCollection, ArtifactId, ArtifactIdNumber, Benchmark, BenchmarkJobStatus,
4+
BenchmarkRequest, BenchmarkRequestIndex, BenchmarkRequestStatus, BenchmarkRequestType,
5+
CodegenBackend, CollectionId, Commit, CommitType, CompileBenchmark, Date, Index, Profile,
6+
QueuedCommit, Scenario, Target, BENCHMARK_REQUEST_MASTER_STR, BENCHMARK_REQUEST_RELEASE_STR,
77
BENCHMARK_REQUEST_STATUS_ARTIFACTS_READY_STR, BENCHMARK_REQUEST_STATUS_COMPLETED_STR,
88
BENCHMARK_REQUEST_STATUS_IN_PROGRESS_STR, BENCHMARK_REQUEST_TRY_STR,
99
};
@@ -324,6 +324,42 @@ static MIGRATIONS: &[&str] = &[
324324
CREATE UNIQUE INDEX collector_config_target_bench_active_uniq ON collector_config
325325
(target, benchmark_set, is_active) WHERE is_active = TRUE;
326326
"#,
327+
r#"
328+
CREATE TABLE IF NOT EXISTS job_queue (
329+
id SERIAL PRIMARY KEY,
330+
request_tag TEXT NOT NULL,
331+
target TEXT NOT NULL,
332+
backend TEXT NOT NULL,
333+
profile TEXT NOT NULL,
334+
benchmark_set INTEGER NOT NULL,
335+
collector_id INTEGER,
336+
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
337+
started_at TIMESTAMPTZ,
338+
completed_at TIMESTAMPTZ,
339+
status TEXT NOT NULL,
340+
retry INTEGER DEFAULT 0,
341+
342+
CONSTRAINT job_queue_request_fk
343+
FOREIGN KEY (request_tag)
344+
REFERENCES benchmark_request(tag)
345+
ON DELETE CASCADE,
346+
347+
CONSTRAINT job_queue_collector
348+
FOREIGN KEY (collector_id)
349+
REFERENCES collector_config(id)
350+
ON DELETE CASCADE,
351+
352+
CONSTRAINT job_queue_unique
353+
UNIQUE (
354+
request_tag,
355+
target,
356+
backend,
357+
profile,
358+
benchmark_set
359+
)
360+
);
361+
CREATE INDEX IF NOT EXISTS job_queue_request_tag_idx ON job_queue (request_tag);
362+
"#,
327363
];
328364

329365
#[async_trait::async_trait]
@@ -1608,6 +1644,42 @@ where
16081644
.collect();
16091645
Ok(requests)
16101646
}
1647+
1648+
async fn enqueue_benchmark_job(
1649+
&self,
1650+
request_tag: &str,
1651+
target: &Target,
1652+
backend: &CodegenBackend,
1653+
profile: &Profile,
1654+
benchmark_set: u32,
1655+
) -> anyhow::Result<()> {
1656+
self.conn()
1657+
.execute(
1658+
r#"
1659+
INSERT INTO job_queue(
1660+
request_tag,
1661+
target,
1662+
backend,
1663+
profile,
1664+
benchmark_set,
1665+
status
1666+
)
1667+
VALUES ($1, $2, $3, $4, $5, $6)
1668+
ON CONFLICT DO NOTHING
1669+
"#,
1670+
&[
1671+
&request_tag,
1672+
&target,
1673+
&backend,
1674+
&profile,
1675+
&(benchmark_set as i32),
1676+
&BenchmarkJobStatus::Queued,
1677+
],
1678+
)
1679+
.await
1680+
.context("failed to insert benchmark_job")?;
1681+
Ok(())
1682+
}
16111683
}
16121684

16131685
fn parse_artifact_id(ty: &str, sha: &str, date: Option<DateTime<Utc>>) -> ArtifactId {
@@ -1653,6 +1725,10 @@ macro_rules! impl_to_postgresql_via_to_string {
16531725

16541726
impl_to_postgresql_via_to_string!(BenchmarkRequestType);
16551727
impl_to_postgresql_via_to_string!(BenchmarkRequestStatus);
1728+
impl_to_postgresql_via_to_string!(Target);
1729+
impl_to_postgresql_via_to_string!(CodegenBackend);
1730+
impl_to_postgresql_via_to_string!(Profile);
1731+
impl_to_postgresql_via_to_string!(BenchmarkJobStatus);
16561732

16571733
#[cfg(test)]
16581734
mod tests {

database/src/pool/sqlite.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1296,6 +1296,17 @@ impl Connection for SqliteConnection {
12961296
) -> anyhow::Result<()> {
12971297
no_queue_implementation_abort!()
12981298
}
1299+
1300+
async fn enqueue_benchmark_job(
1301+
&self,
1302+
_request_tag: &str,
1303+
_target: &Target,
1304+
_backend: &CodegenBackend,
1305+
_profile: &Profile,
1306+
_benchmark_set: u32,
1307+
) -> anyhow::Result<()> {
1308+
no_queue_implementation_abort!()
1309+
}
12991310
}
13001311

13011312
fn parse_artifact_id(ty: &str, sha: &str, date: Option<i64>) -> ArtifactId {

0 commit comments

Comments
 (0)